From 5306c671c6a233bf747a4be5c9cb25becf1007f7 Mon Sep 17 00:00:00 2001 From: marios8543 Date: Sun, 27 Apr 2025 16:50:16 +0300 Subject: [PATCH] backup --- tunerlistd/.vscode/launch.json | 19 ++++++++ tunerlistd/test.py | 19 +++++--- tunerlistd/tunerlist.py | 86 +++++++++++++++++++--------------- 3 files changed, 80 insertions(+), 44 deletions(-) create mode 100644 tunerlistd/.vscode/launch.json diff --git a/tunerlistd/.vscode/launch.json b/tunerlistd/.vscode/launch.json new file mode 100644 index 0000000..b5b3322 --- /dev/null +++ b/tunerlistd/.vscode/launch.json @@ -0,0 +1,19 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + + { + "name": "Run", + "type": "debugpy", + "request": "launch", + "program": "main.py", + "console": "integratedTerminal", + "env": { + "TL_DEBUG": "true" + } + } + ] +} \ No newline at end of file diff --git a/tunerlistd/test.py b/tunerlistd/test.py index a16f02a..5f15e8a 100644 --- a/tunerlistd/test.py +++ b/tunerlistd/test.py @@ -11,9 +11,14 @@ GPIO_PIN = 27 # GPIO pin 27 BAUD_RATE = 4800 # 4800 baud # Start serial bit bang -pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8) +try: + pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8) +except: + pi.bb_serial_read_close(GPIO_PIN) + pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8) buffer = b'' # Buffer for incoming data +last_buffer = b'' try: while True: @@ -21,12 +26,14 @@ try: if count > 0: buffer += data - while b'\n' in buffer: - line, buffer = buffer.split(b'\n', 1) # Split at newline + while b'\r\n' in buffer: + line, buffer = buffer.split(b'\r\n', 1) # Split at newline # Print each byte in hex, separated by spaces - hex_line = ' '.join(f'{byte:02X}' for byte in line) - print("Received line (hex):", hex_line) - + if not line == last_buffer: + last_buffer = line + for c in line: + print(hex(c), " ", end="") + print() time.sleep(0.01) except KeyboardInterrupt: diff --git a/tunerlistd/tunerlist.py b/tunerlistd/tunerlist.py index 8ff6adc..e12db5c 100644 --- a/tunerlistd/tunerlist.py +++ b/tunerlistd/tunerlist.py @@ -1,11 +1,11 @@ -from pigpio import pi -from asyncio import Queue, get_event_loop, sleep, CancelledError +from pigpio import pi, error +from asyncio import Queue, get_event_loop, sleep, CancelledError, create_task from concurrent.futures import ThreadPoolExecutor from os import getenv -GPIO_PIN = 27 +RX_PIN = 27 BAUD_RATE = 4800 -DEBUG = getenv("TL_DEBUG", "false") == "true" +DEBUG = False# getenv("TL_DEBUG", "false") == "true" BUTTONS = { "ok": [0x00, 0x00], @@ -23,17 +23,6 @@ BUTTONS = { "wheel_down": [0x01, 0x41], } -async def serial_wait_for_line(pi): - buffer = b'' - while True: - (count, data) = pi.bb_serial_read(GPIO_PIN) - if count > 0: - buffer += data - while b'\n' in buffer: - line, buffer = buffer.split(b'\n', 1) - return line - await sleep(0) - class TunerListState: def __init__(self, text, preset): self.text = text @@ -41,7 +30,7 @@ class TunerListState: @property def is_temp_view(self): - return "VOL" in self.text or "PAUSE" in self.text + return "VOL" in self.text or "PAUSE" in self.text or "TREB" in self.text or "BASS" in self.text @property def is_playing_ext(self): @@ -49,8 +38,8 @@ class TunerListState: def to_dict(self): return { - "screen": self.text, - "preset": self.preset, + "screen": f"{self.text}".strip(), + "preset": f"{self.preset}" if self.preset and not self.is_temp_view else None, "second": "Tuner List", "is_temp_view": self.is_temp_view, "is_playing_ext": self.is_playing_ext @@ -65,35 +54,56 @@ class TunerListState: class TunerList: def __init__(self): self.pi = pi() - self.pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8) - self.queue = Queue() + while True: + try: + self.pi.bb_serial_read_open(RX_PIN, BAUD_RATE, 8) + break + except error as e: + print(e) + self.pi.bb_serial_read_close(RX_PIN) + self.last_buffer = b'' + self.queue = Queue() self.text = "" self.preset = None async def run(self): + buffer = b'' try: while True: - line = await serial_wait_for_line(self.pi) - if line[0] == 0x0F or line[0] == 0x0C: - self.text = line[(8 if line[0] == 0x0F else 5):].decode("ascii") - if (line[3] != 0x20): - self.preset = line[3] & 0x0F - else: - self.preset = None - elif line[0] == 0x39: - self.preset = None - self.text = "" - else: - continue - self.queue.put_nowait(TunerListState(self.text, self.preset)) - if DEBUG: - for c in line: - print(hex(c), " ", end="") - print(" [",self.text,"]", f" [P {self.preset}]") + (count, data) = self.pi.bb_serial_read(RX_PIN) + if count > 0: + buffer += data + while b'\r\n' in buffer: + line, buffer = buffer.split(b'\r\n', 1) + if not line == self.last_buffer: + self.last_buffer = line + print(line) + self.process_line(line) + await sleep(0) except CancelledError: - self.pi.bb_serial_read_close(GPIO_PIN) + self.pi.bb_serial_read_close(RX_PIN) self.pi.stop() + return + + def process_line(self, line): + try: + if line[0] == 0x0F or line[0] == 0x0C: + self.text = line[(8 if line[0] == 0x0F else 5):].decode("ascii") + if (line[3] != 0x20): + self.preset = line[3] & 0x0F + else: + self.preset = None + elif line[0] == 0x39: + self.preset = None + self.text = "" + else: + pass + except Exception as e: + print(e) + finally: + print("[",self.text,"]", f" [P {self.preset}]") + self.queue.put_nowait(TunerListState(self.text, self.preset)) async def yield_new_state(self): while True: