#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Plugin installation: # wget "https://pasternok.org/sources/tuya_powermon.py?raw" -O /etc/munin/plugins/tuya_powermon # chmod +x /etc/munin/plugins/tuya_powermon # systemctl restart munin-node ### config DEVICES = [ { "name": "Smart Socket 1", "id": "01234567890123456789", "ip": "10.1.1.201", "key": "0123456789abcdef", }, { "name": "Smart Socket 2", "id": "12345678901234567890", "ip": "10.1.1.202", "key": "1234567890abcdef", }, ] PROTOCOL = 3.3 ### code - do not change anything below here __author__ = "Damian Pasternok " __version__ = "0.2.0" __date__ = "2021/10/22" __copyright__ = "Copyright (C) 2021 Damian Pasternok" __license__ = "GPL" import argparse import tinytuya from time import sleep class Tuya: def __init__(self): self.data = list() for device in DEVICES: d = tinytuya.OutletDevice( device.get("id"), device.get("ip"), device.get("key") ) d.set_version(PROTOCOL) # gather data i = 0 while i < 10: data = d.status() if data.get("Error"): i += 1 sleep(i) continue else: device_object = device device_object["data"] = data self.data.append(device_object) break class Munin(Tuya): def __init__(self): pass def config(self): super(Munin, self).__init__() out = """graph_title Tuya WiFi Smart Plug power monitor graph_args --logarithmic --units=si graph_scale no graph_category smarthome graph_vlabel power parameters """ for param in (("current", "mA"), ("power", "W"), ("voltage", "V")): for i, device in enumerate(self.data): out += f"""{param[0]}{i}.label {self.data[i].get("name") or f"Generic Smart Plug #{i}"} {param[0]} [{param[1]}]\n""" for param in (("current", "mA"), ("power", "W")): out += f"total_{param[0]}.label Total {param[0]} [{param[1]}]\n" out += f"total_{param[0]}.draw LINE2\n" return out.rstrip() def data(self): super(Munin, self).__init__() out = "" total_current = 0 total_power = 0 for param in (("current", "4", 1), ("power", "5", 10), ("voltage", "6", 10)): for i, device in enumerate(self.data): out += f"""{param[0]}{i}.value {self.data[i].get("data").get("dps").get(param[1]) / param[2]}\n""" if param[0] == "current": total_current += ( self.data[i].get("data").get("dps").get(param[1]) / param[2] ) elif param[0] == "power": total_power += ( self.data[i].get("data").get("dps").get(param[1]) / param[2] ) out += f"total_current.value {total_current}\n" out += f"total_power.value {total_power}\n" return out.rstrip() def main(): munin = Munin() parser = argparse.ArgumentParser(description="Tuya WiFi Smart Plug Munin Plugin.") parser.add_argument("config", help="print graph configuration", nargs="?") args = parser.parse_args() if args.config == "config": print(munin.config()) elif not args.config: print(munin.data()) else: parser.print_help() if __name__ == "__main__": main()