X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=myq_renderer.py;h=ca405346dc10f88d98689886ae198ecd98195e8b;hb=d6990436e08a57ce211b10058dc61fb223cb94ec;hp=fa60642fb283ef080a50e2d52304450b29b7db5e;hpb=75b27cc68871343681f01e3f5b04cae84b1b7b2a;p=kiosk.git diff --git a/myq_renderer.py b/myq_renderer.py index fa60642..ca40534 100644 --- a/myq_renderer.py +++ b/myq_renderer.py @@ -1,255 +1,95 @@ -#!/usr/local/bin/python - -import requests -import os -import json -import time +import pymyq +from aiohttp import ClientSession +import asyncio import constants import datetime +from dateutil.parser import parse import file_writer -import globals -import http.client -import json import renderer import secrets -APP_ID = "Vj8pQggXLhLy0WHahglCD4N1nAkkXQtGYpq2HrHD7H1nvmbT55KqtN6RSF4ILB/i" -HOST_URI = "myqexternal.myqdevice.com" -LOGIN_ENDPOINT = "api/v4/User/Validate" -DEVICE_LIST_ENDPOINT = "api/v4/UserDeviceDetails/Get" -DEVICE_SET_ENDPOINT = "api/v4/DeviceAttribute/PutDeviceAttribute" - -class MyQDoor: - myq_device_id = None - myq_lamp_device_id = None - myq_device_descr = None - myq_device_update_ts = None - myq_device_state = None - myq_security_token = None - - def __init__(self, device_id, lamp_id, descr, update_ts, state, token): - self.myq_device_id = device_id - self.myq_lamp_device_id = lamp_id - self.myq_device_descr = descr - self.myq_device_update_ts = update_ts - self.myq_device_state = state - self.myq_security_token = token - - def update(self, update_ts, state): - self.myq_device_update_ts = update_ts - self.myq_device_state = state - - def get_name(self): - return self.myq_device_descr - - def get_update_ts(self): - return self.myq_device_update_ts - - def open(self): - self.change_door_state("open") - - def close(self): - self.change_door_state("close") - - def lamp_on(self): - self.change_lamp_state("on") - - def lamp_off(self): - self.change_lamp_state("off") - - def get_status(self): - state = self.myq_device_state - if state == "1": - return "open" - elif state == "2": - return "closed" - elif state == "3": - return "stopped" - elif state == "4": - return "opening" - elif state == "5": - return "closing" - elif state == "8": - return "moving" - elif state == "9": - return "open" - else: - return str(state) + ", an unknown state for the door." - - def get_state_icon(self): - state = self.myq_device_state - if (state == "1" or state == "9"): - return "/icons/garage_open.png" - elif (state == "2"): - return "/icons/garage_closed.png" - elif (state == "4"): - return "/icons/garage_opening.png" - elif (state == "5" or state == "8"): - return "/icons/garage_closing.png" - else: - return str(state) + ", an unknown state for the door." - - def get_lamp_status(self): - state = self.check_lamp_state() - if state == "0": - return "off" - elif state == "1": - return "on" - else: - return "unknown" - - def change_device_state(self, payload): - device_action = requests.put( - 'https://{host_uri}/{device_set_endpoint}'.format( - host_uri=HOST_URI, - device_set_endpoint=DEVICE_SET_ENDPOINT), - data=payload, - headers={ - 'MyQApplicationId': APP_ID, - 'SecurityToken': self.myq_security_token - } - ) - return device_action.status_code == 200 - - def change_lamp_state(self, command): - newstate = 1 if command.lower() == "on" else 0 - payload = { - "attributeName": "desiredlightstate", - "myQDeviceId": self.myq_lamp_device_id, - "AttributeValue": newstate - } - return self.change_device_state(payload) - - def change_door_state(self, command): - open_close_state = 1 if command.lower() == "open" else 0 - payload = { - 'attributeName': 'desireddoorstate', - 'myQDeviceId': self.myq_device_id, - 'AttributeValue': open_close_state, - } - return self.change_device_state(payload) - - def check_door_state(self): - return self.myq_device_state - -# def check_lamp_state(self): -# return self.check_device_state(self.myq_lamp_device_id, "lightstate") - - def __repr__(self): - return "MyQ device(%s/%s), last update %s, current state %s" % ( - self.myq_device_descr, - self.myq_device_id, - self.myq_device_update_ts, - self.get_status()); - -class MyQService: - myq_security_token = "" - myq_device_list = {} - - def __init__(self, username, password): - self.username = username - self.password = password - - def login(self): - params = { - 'username': self.username, - 'password': self.password - } - login = requests.post( - 'https://{host_uri}/{login_endpoint}'.format( - host_uri=HOST_URI, - login_endpoint=LOGIN_ENDPOINT), - json=params, - headers={ - 'MyQApplicationId': APP_ID - } - ) - auth = login.json() - self.myq_security_token = auth.get('SecurityToken') - return True - - def get_raw_device_list(self): - devices = requests.get( - 'https://{host_uri}/{device_list_endpoint}'.format( - host_uri=HOST_URI, - device_list_endpoint=DEVICE_LIST_ENDPOINT), - headers={ - 'MyQApplicationId': APP_ID, - 'SecurityToken': self.myq_security_token - }) - return devices.json().get('Devices') - - def get_devices(self): - return self.myq_device_list - - def update_devices(self): - devices = self.get_raw_device_list() - if devices is None: - return False - - for dev in devices: - if dev["MyQDeviceTypeId"] != 7: continue - identifier = str(dev["MyQDeviceId"]) - update_ts = None - state = None - name = None - for attr in dev["Attributes"]: - key = attr["AttributeDisplayName"] - value = attr["Value"] - if (key == "doorstate"): - state = value - ts = int(attr["UpdatedTime"]) / 1000.0 - update_ts = datetime.datetime.fromtimestamp(ts) - elif (key == "desc"): - name = value - if (identifier in self.myq_device_list): - self.myq_device_list[identifier].update( - update_ts, state) - else: - device = MyQDoor(identifier, - None, - name, - update_ts, - state, - self.myq_security_token) - self.myq_device_list[identifier] = device - return True - class garage_door_renderer(renderer.debuggable_abstaining_renderer): def __init__(self, name_to_timeout_dict): super(garage_door_renderer, self).__init__(name_to_timeout_dict, False) - self.myq_service = MyQService(secrets.myq_username, - secrets.myq_password) - self.myq_service.login() - self.myq_service.update_devices() + self.doors = None + self.last_update = None def debug_prefix(self): return "myq" def periodic_render(self, key): - self.debug_print("*** Executing action %s ***" % key) if key == "Poll MyQ": - return self.myq_service.update_devices() + self.last_update = datetime.datetime.now() + return asyncio.run(self.poll_myq()) elif key == "Update Page": return self.update_page() else: raise error("Unknown operaiton") + async def poll_myq(self): + async with ClientSession() as websession: + myq = await pymyq.login(secrets.myq_username, + secrets.myq_password, + websession) + self.doors = myq.devices + return len(self.doors) > 0 + + def update_page(self): + f = file_writer.file_writer(constants.myq_pagename) + f.write(""" +