From 5665d57191745413dc0570545a80ba7d6f724d2c Mon Sep 17 00:00:00 2001 From: Evan Fiordeliso Date: Sat, 18 Dec 2021 20:52:51 -0500 Subject: [PATCH] Finish discovery method and rename all methods --- .vscode/settings.json | 1 - flickerstrip_py/discovery.py | 23 ++++++-- flickerstrip_py/flickerstrip.py | 96 ++++++++++++++++----------------- flickerstrip_py/pattern.py | 14 +++-- flickerstrip_py/ssdp.py | 66 ----------------------- flickerstrip_py/status.py | 39 ++++++++++---- 6 files changed, 104 insertions(+), 135 deletions(-) delete mode 100644 flickerstrip_py/ssdp.py diff --git a/.vscode/settings.json b/.vscode/settings.json index a022376..290193e 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,7 +4,6 @@ ], "python.testing.unittestEnabled": false, "python.testing.pytestEnabled": true, - "python.pythonPath": "/home/evanf/.cache/pypoetry/virtualenvs/flickerstrip-py-zxDi65bL-py3.9/bin/python", "python.linting.flake8Enabled": true, "python.linting.enabled": true } \ No newline at end of file diff --git a/flickerstrip_py/discovery.py b/flickerstrip_py/discovery.py index 2bba902..b51933c 100644 --- a/flickerstrip_py/discovery.py +++ b/flickerstrip_py/discovery.py @@ -1,18 +1,31 @@ from ssdpy import SSDPClient +import re +import json +from flickerstrip import Flickerstrip class FlickerstripDiscoveryClient: def __init__(self): - self.client = SSDPClient() + self.client = SSDPClient(timeout=10) + + def __discovered(self, device): + loc = device["location"] + result = re.search("http://(.*):80/description.xml", loc) + ip_address = result.group(1) + return Flickerstrip(ip_address) def discover(self): print("Discovering devices...") devices = self.client.m_search("ssdp:all") - print(f"Discovered {len(devices)} devices.") - for device in devices: - print(device) + print(f"Discovered {len(devices)} device(s).") + filtered = [d for d in devices if "Flickerstrip" in d["server"]] + print(f"Discovered {len(filtered)} flickerstrip(s).") + mapped = [self.__discovered(d) for d in filtered] + return mapped if __name__ == "__main__": client = FlickerstripDiscoveryClient() - client.discover() + flickerstrips = client.discover() + flickerstrips[0].force_update() + print(json.dumps(flickerstrips[0].status.to_json())) diff --git a/flickerstrip_py/flickerstrip.py b/flickerstrip_py/flickerstrip.py index 1044c1c..83f8d45 100644 --- a/flickerstrip_py/flickerstrip.py +++ b/flickerstrip_py/flickerstrip.py @@ -1,6 +1,6 @@ import requests -from pattern import PatternMeta from status import Status +from pattern import PatternMeta class Flickerstrip: @@ -13,7 +13,7 @@ class Flickerstrip: self.ip_address = ip_address self.status = None - def __checkResponse(self, response): + def __check_response(self, response): """Check if the request was succesful. Args: @@ -22,22 +22,22 @@ class Flickerstrip: Returns: boolean: If the request was successful """ - json = response.json() - respType = json["type"] + data = response.json() + respType = data["type"] if respType == "error": - message = json["message"] + message = data["message"] print(f"ERROR: Request failed with message \"{message}\".") return False elif respType == "OK": return True elif respType == "status": - self.status = Status.fromJSON(json) + self.status = Status.from_json(data) return True else: print(f"ERROR: Unrecognized response type: {respType}.") return False - def __getRequest(self, path, params=None): + def __get_request(self, path, params=None): """Send a GET request to the strip. Args: @@ -49,9 +49,9 @@ class Flickerstrip: boolean: If the request was successful """ resp = requests.get(f"http://{self.ip_address}/{path}", params=params) - return self.__checkResponse(resp) + return self.__check_response(resp) - def __postRequest(self, path, json=None, data=None): + def __post_request(self, path, json=None, data=None): """Send a POST request to the strip. Args: @@ -65,49 +65,49 @@ class Flickerstrip: """ resp = requests.post(f"http://{self.ip_address}/{path}", json=json, data=data) - return self.__checkResponse(resp) + return self.__check_response(resp) - def forceUpdate(self): + def force_update(self): """Force updates the status of the strip. Returns: boolean: If the request was succesful """ - return self.__getRequest("status") + return self.__get_request("status") - def powerOn(self): + def power_on(self): """Trigger the strip to turn the lights on. Returns: boolean: If the request was succesful """ - return self.__getRequest("power/on") + return self.__get_request("power/on") - def powerOff(self): + def power_off(self): """Trigger the strip to turn the lights off. Returns: boolean: If the request was succesful """ - return self.__getRequest("power/off") + return self.__get_request("power/off") - def powerToggle(self): + def power_toggle(self): """Trigger the strip to toggle the power status. Returns: boolean: If the request was succesful """ - return self.__getRequest("power/toggle") + return self.__get_request("power/toggle") - def nextPattern(self): + def next_pattern(self): """Trigger the strip to switch to the next pattern in memory. Returns: boolean: If the request was succesful """ - return self.__getRequest("pattern/next") + return self.__get_request("pattern/next") - def freezeFrame(self, frame): + def freeze_frame(self, frame): """Freeze the animation at the specified frame. Args: @@ -116,9 +116,9 @@ class Flickerstrip: Returns: boolean: If the request was succesful """ - return self.__getRequest("pattern/frame", {"value": frame}) + return self.__get_request("pattern/frame", {"value": frame}) - def setPattern(self, index): + def select_pattern(self, index): """Change the current pattern to the pattern at the specified index. Args: @@ -127,23 +127,23 @@ class Flickerstrip: Returns: boolean: If the request was succesful """ - return self.__getRequest("pattern/select", {"index": index}) + return self.__get_request("pattern/select", {"index": index}) - def deletePattern(self, pattern=None, index=None, id=None): + def delete_pattern(self, pattern=None, index=None, id=None): if pattern is not None: if not isinstance(pattern, PatternMeta): raise TypeError( "Expected a PatternMeta object for the pattern arg.") - return self.deletePattern(id=pattern.id) + return self.delete_pattern(id=pattern.id) elif index is not None: - return self.__getRequest("pattern/forget", {"index": index}) + return self.__get_request("pattern/forget", {"index": index}) elif id is not None: - return self.__getRequest("pattern/forget", {"id": id}) + return self.__get_request("pattern/forget", {"id": id}) else: raise TypeError( "Deleting a pattern requires one of the three args.") - def setBrightness(self, value): + def set_brightness(self, value): """Set the brightness of the flickerstrip. Args: @@ -152,9 +152,9 @@ class Flickerstrip: Returns: boolean: If the request was succesful """ - return self.__getRequest("brightness", {"value": value}) + return self.__get_request("brightness", {"value": value}) - def setName(self, value): + def set_name(self, value): """Set the name of the flickerstrip. Args: @@ -163,9 +163,9 @@ class Flickerstrip: Returns: boolean: If the request was succesful """ - return self.__postRequest("config/name", {"name": value}) + return self.__post_request("config/name", {"name": value}) - def setGroup(self, value): + def set_group(self, value): """Set the group of the flickerstrip. Args: @@ -174,9 +174,9 @@ class Flickerstrip: Returns: boolean: If the request was succesful """ - return self.__postRequest("config/group", {"name": value}) + return self.__post_request("config/group", {"name": value}) - def setCycle(self, value): + def set_cycle(self, value): """Set the cycle timer of the flickerstrip. Args: @@ -186,9 +186,9 @@ class Flickerstrip: Returns: boolean: If the request was succesful """ - return self.__getRequest("config/cycle", {"value": value}) + return self.__get_request("config/cycle", {"value": value}) - def setFadeDuration(self, value): + def set_fade_duration(self, value): """Set the fade timer of the flickerstrip. Args: @@ -198,20 +198,20 @@ class Flickerstrip: Returns: boolean: If the request was succesful """ - return self.__getRequest("config/fade", {"value": value}) + return self.__get_request("config/fade", {"value": value}) - def setStripLength(self, value): + def set_strip_length(self, value): """Set the length of the flickerstrip. Args: - value (int): The length of the strip. + value (int): The length of the strip in pixels. Returns: boolean: If the request was succesful """ - return self.__getRequest("config/length", {"value": value}) + return self.__get_request("config/length", {"value": value}) - def setStripStart(self, value): + def set_strip_start(self, value): """Set the start pixel of the strip. This will make the flickerstrip ignore all pixels before @@ -223,9 +223,9 @@ class Flickerstrip: Returns: boolean: If the request was successful. """ - return self.__getRequest("config/start", {"value": value}) + return self.__get_request("config/start", {"value": value}) - def setStripEnd(self, value): + def set_strip_end(self, value): """Set the end pixel of the strip. This will make the flickerstrip ignore all pixels after @@ -237,9 +237,9 @@ class Flickerstrip: Returns: boolean: If the request was successful. """ - return self.__getRequest("config/end", {"value": value}) + return self.__get_request("config/end", {"value": value}) - def setReversed(self, value): + def set_reversed(self, value): """Set the reversed state of the flickerstrip. Args: @@ -249,5 +249,5 @@ class Flickerstrip: Returns: boolean: If the request was succesful """ - return self.__getRequest("config/reversed", - {"value": 1 if value else 0}) + return self.__get_request("config/reversed", + {"value": 1 if value else 0}) diff --git a/flickerstrip_py/pattern.py b/flickerstrip_py/pattern.py index dd7fac2..7cb4921 100644 --- a/flickerstrip_py/pattern.py +++ b/flickerstrip_py/pattern.py @@ -7,11 +7,15 @@ class PatternMeta: self.flags = flags self.fps = fps - def fromJSON(json): - return PatternMeta( + def to_json(self): + return { + "id": self.id, "name": self.name, "frames": self.frames, + "pixels": self.pixels, "flags": self.flags, "fps": self.fps + } + + @classmethod + def from_json(cls, json): + return cls( json["id"], json["name"], json["frames"], json["pixels"], json["flags"], json["fps"] ) - - def fromJSONArray(array): - return map(PatternMeta.fromJSON, array) diff --git a/flickerstrip_py/ssdp.py b/flickerstrip_py/ssdp.py deleted file mode 100644 index 9bfe9bb..0000000 --- a/flickerstrip_py/ssdp.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2014 Dan Krause, Python 3 hack 2016 Adam Baxter -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import socket -import http.client -import io - - -class SSDPResponse(object): - class _FakeSocket(io.BytesIO): - def makefile(self, *args, **kw): - return self - - def __init__(self, response): - r = http.client.HTTPResponse(self._FakeSocket(response)) - r.begin() - self.location = r.getheader("location") - self.usn = r.getheader("usn") - self.st = r.getheader("st") - self.cache = r.getheader("cache-control").split("=")[1] - - def __repr__(self): - return ""\ - .format(**self.__dict__) - - -def discover(service, timeout=5, retries=1, mx=3): - group = ("239.255.255.250", 1900) - message = "\r\n".join([ - 'M-SEARCH * HTTP/1.1', - 'HOST: {0}:{1}', - 'MAN: "ssdp:discover"', - 'ST: {st}', 'MX: {mx}', '', '']) - socket.setdefaulttimeout(timeout) - responses = {} - for _ in range(retries): - sock = socket.socket( - socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) - message_bytes = message.format( - *group, st=service, mx=mx).encode('utf-8') - sock.sendto(message_bytes, group) - - while True: - try: - response = SSDPResponse(sock.recv(1024)) - responses[response.location] = response - except socket.timeout: - break - return list(responses.values()) - -# Example: -# import ssdp -# ssdp.discover("roku:ecp") diff --git a/flickerstrip_py/status.py b/flickerstrip_py/status.py index a5ba03a..b0319b8 100644 --- a/flickerstrip_py/status.py +++ b/flickerstrip_py/status.py @@ -3,12 +3,18 @@ from pattern import PatternMeta class MemoryUsage: def __init__(self, used, free, total): - self.used - self.free - self.total + self.used = used + self.free = free + self.total = total - def fromJSON(json): - return MemoryUsage(json["used"], json["free"], json["total"]) + def to_json(self): + return { + "used": self.used, "free": self.free, "total": self.total + } + + @classmethod + def from_json(cls, json): + return cls(json["used"], json["free"], json["total"]) class Status: @@ -35,12 +41,25 @@ class Status: self.memory = memory self.patterns = patterns - def fromJSON(json): - return Status( + def to_json(self): + return { + "ap": self.ap, "name": self.name, "group": self.group, + "firmware": self.firmware, "power": 1 if self.power else 0, + "mac": self.mac, "selectedPattern": self.selectedPattern, + "brightness": self.brightness, "length": self.length, + "start": self.start, "end": self.end, "fade": self.fade, + "reversed": 1 if self.isReversed else 0, "cycle": self.cycle, + "uptime": self.uptime, "memory": self.memory.to_json(), + "patterns": [p.to_json() for p in self.patterns] + } + + @classmethod + def from_json(cls, json): + return cls( json["ap"], json["name"], json["group"], json["firmware"], json["power"] == 1, json["mac"], json["selectedPattern"], json["brightness"], json["length"], json["start"], json["end"], - json["fade"], json["reversed"] == 1, json["cycle"] == 1, - json["uptime"], MemoryUsage.fromJSON(json["memory"]), - PatternMeta.fromJSONArray(json["patterns"]) + json["fade"], json["reversed"] == 1, json["cycle"], + json["uptime"], MemoryUsage.from_json(json["memory"]), + [PatternMeta.from_json(p) for p in json["patterns"]] )