#!/usr/bin/env python3 from binascii import a2b_hex from smllib.sml_frame import SmlFrame from smllib.const import OBIS_NAMES, UNITS file = "test-data.txt" class TasmotaSMLParser: def __init__(self): self.obis_decoded = [] self.obis_errors = [] self.parse_errors = [] def parse_input(self, input): """Parse a line into a frame""" if " : 77 " in input: # Remove the spaces and return a bytestring frame = a2b_hex("".join((input.split(" : ", 1)[1]).split())) elif input.startswith("77 0"): frame = a2b_hex("".join(input.split())) else: self.parse_errors.append(input) return return frame def decode_frame(self, frame): f = SmlFrame(frame) try: msgs = f.get_obis() if len(msgs) == 0: return False except: self.obis_errors.append(frame) return None return msgs def decode_messages(self, input): messages = [] for line in input: frame = self.parse_input(line) if frame is None: continue msgs = self.decode_frame(frame) if msgs in [None, False]: continue for msg in msgs: if msg.obis in self.obis_decoded: continue else: self.obis_decoded.append(msg.obis) messages.append(msg) return messages def get_message_details(self, msg): name = OBIS_NAMES.get(msg.obis, "Unbekannter Datentyp") unit = UNITS.get(msg.unit, "Unbekannte Einheit") mqtt_topic = ( "_".join(OBIS_NAMES.get(msg.obis, "Unbekanntes MQTT Topic").split()) .replace("/", "-") .lower() ) try: precision = msg.scaler * -1 except TypeError: precision = 0 try: human_readable = f"{msg.value * pow(10, msg.scaler)}{unit} ({name})" except TypeError: if msg.unit in UNITS and msg.name in OBIS_NAMES: human_readable = f"{msg.value}{unit} ({name})" elif msg.unit in UNITS and msg.name not in OBIS_NAMES: human_readable = f"{msg.value}{unit}" else: human_readable = f"{msg.value}" data = { "obis": msg.obis, "obis_code": msg.obis.obis_code, "obis_short": msg.obis.obis_short, "name": name, "unit": unit, "topic": mqtt_topic, "precision": precision, "value": msg.value, "status": msg.status, "val_time": msg.val_time, "unit_raw": msg.unit, "scaler": msg.scaler, "value_signature": msg.value_signature, "human_readable": human_readable, } return data def build_meter_def(self, msg): data = self.get_message_details(msg) return f"1,7707{data['obis'].upper()}@1,{data['name']},{data['unit']},{data['topic']},{data['precision']}" def pretty_print(self, msg): print(msg.format_msg()) def main(): tas = TasmotaSMLParser() msgs = [] with open(file, "r") as fp: msgs = tas.decode_messages(fp.read().splitlines()) for msg in msgs: tas.pretty_print(msg) details = tas.get_message_details(msg) print("Tasmota SML Script meter definition:") print(tas.build_meter_def(msg)) print(80 * "#") if __name__ == "__main__": main()