From 236c3076bb5d516f43f92f8a7b9506d2dcebd9b7 Mon Sep 17 00:00:00 2001 From: marios8543 Date: Fri, 9 May 2025 18:00:40 +0300 Subject: [PATCH] add buttondecoder. change tunerlist to use hardware uart 2 --- tunerlistd/.vscode/launch.json | 7 ++++ tunerlistd/buttondecoder.py | 60 +++++++++++++++++++++++++++ tunerlistd/main.py | 51 +++++++++++++++-------- tunerlistd/test.py | 46 -------------------- tunerlistd/tunerlist.py | 76 ++++++++++++---------------------- 5 files changed, 128 insertions(+), 112 deletions(-) create mode 100644 tunerlistd/buttondecoder.py delete mode 100644 tunerlistd/test.py diff --git a/tunerlistd/.vscode/launch.json b/tunerlistd/.vscode/launch.json index b5b3322..3bb872a 100644 --- a/tunerlistd/.vscode/launch.json +++ b/tunerlistd/.vscode/launch.json @@ -14,6 +14,13 @@ "env": { "TL_DEBUG": "true" } + }, + { + "name": "Test 2", + "type": "debugpy", + "request": "launch", + "program": "test2.py", + "console": "integratedTerminal", } ] } \ No newline at end of file diff --git a/tunerlistd/buttondecoder.py b/tunerlistd/buttondecoder.py new file mode 100644 index 0000000..eb06c90 --- /dev/null +++ b/tunerlistd/buttondecoder.py @@ -0,0 +1,60 @@ +from pigpio import pi, error +from asyncio import Queue, sleep, CancelledError + +RX_PIN = 23 # OR 24 +BAUD_RATE = 600 +DEBUG = False + +PRESSED = 0 +RELEASED = 1 +BUTTONS = [ + ["OK", "VOL_UP", "VOL_DOWN"], + ["SRC_DOWN", "PAUSE", "SRC_UP"] +] + +class ButtonDecoder: + def __init__(self): + self.pi = pi() + while True: + try: + self.pi.bb_serial_read_open(RX_PIN, BAUD_RATE, 8) + break + except error as e: + print(e) + self.pi.bb_serial_read_close(RX_PIN) + self.last_buffer = b'' + + self.queue = Queue() + + def _process_line(self, line): + if len(line) == 1: + if line[0] == 7: + self.queue.put_nowait("WHEEL_UP") + elif line[0] == 6: + self.queue.put_nowait("WHEEL_DOWN") + elif len(line) == 3: + btn_name = BUTTONS[line[0]][line[1]] + action = line[2] + if action == PRESSED: + self.queue.put_nowait(btn_name) + + async def run(self): + buffer = b'' + try: + while True: + (count, data) = self.pi.bb_serial_read(RX_PIN) + if count > 0: + buffer += data + while 0xFF in buffer: + line, buffer = buffer.split(0xFF, 1) + if not line == self.last_buffer: + self.last_buffer = line + for c in line: + print(hex(c), " ", end="") + print() + self._process_line(line) + await sleep(0.1) + except CancelledError: + self.pi.bb_serial_read_close(RX_PIN) + self.pi.stop() + return diff --git a/tunerlistd/main.py b/tunerlistd/main.py index e78e20a..1fb5c2a 100644 --- a/tunerlistd/main.py +++ b/tunerlistd/main.py @@ -1,10 +1,11 @@ from tunerlist import TunerList, TunerListState from bluetooth import Bluetooth, BluetoothState +from buttondecoder import ButtonDecoder from json import dumps from fastapi import FastAPI, WebSocket, Request from fastapi.responses import PlainTextResponse, JSONResponse -from asyncio import Event, create_task, CancelledError +from asyncio import Event, create_task, CancelledError, Queue from uvicorn import run app = FastAPI() @@ -13,17 +14,33 @@ background_tasks = [] tunerlist = TunerList() bluetooth = Bluetooth() +buttons = ButtonDecoder() last_tunerlist_state: TunerListState = None last_bluetooth_state: BluetoothState = None state_to_send: dict = {} +queue = Queue() + +def build_state_to_send(): + if not last_tunerlist_state: + return + if not last_bluetooth_state: + state_to_send= {"radio": last_tunerlist_state.to_dict()} + elif last_tunerlist_state.is_temp_view and last_bluetooth_state.status == "playing": + state_to_send = {"bluetooth": last_bluetooth_state.to_dict()} + state_to_send["bluetooth"]["screen"] = last_tunerlist_state.text + elif last_tunerlist_state.is_playing_ext: + state_to_send = {"bluetooth": last_bluetooth_state.to_dict()} + else: + state_to_send = {"radio": last_tunerlist_state.to_dict()} + queue.put_nowait(state_to_send) async def tunerlist_listener(): global last_tunerlist_state try: while True: last_tunerlist_state = await tunerlist.queue.get() - event.set() + build_state_to_send() except CancelledError: pass @@ -32,7 +49,18 @@ async def bluetooth_listener(): try: while True: last_bluetooth_state = await bluetooth.queue.get() - event.set() + build_state_to_send() + except CancelledError: + pass + +async def button_listener(): + try: + while True: + button = await buttons.queue.get() + if button in ("VOL_UP", "VOL_DOWN", "PAUSE"): + create_task(tunerlist.send_button(button)) + else: + queue.put_nowait(dumps({"button": button})) except CancelledError: pass @@ -41,6 +69,7 @@ async def startup_event(): for i in [ tunerlist.run(), # bluetooth.run(), + buttons.run(), tunerlist_listener(), bluetooth_listener() ]: @@ -56,20 +85,8 @@ async def websocket_endpoint(websocket: WebSocket): global state_to_send await websocket.accept() while True: - await event.wait() - if not last_tunerlist_state: - continue - if not last_bluetooth_state: - state_to_send= {"radio": last_tunerlist_state.to_dict()} - elif last_tunerlist_state.is_temp_view and last_bluetooth_state.status == "playing": - state_to_send = {"bluetooth": last_bluetooth_state.to_dict()} - state_to_send["bluetooth"]["screen"] = last_tunerlist_state.text - elif last_tunerlist_state.is_playing_ext: - state_to_send = {"bluetooth": last_bluetooth_state.to_dict()} - else: - state_to_send = {"radio": last_tunerlist_state.to_dict()} - await websocket.send_text(dumps(state_to_send)) - event.clear() + payload = await queue.get() + await websocket.send_text(dumps(payload)) @app.get("/state", response_class=JSONResponse) async def state_endpoint(): diff --git a/tunerlistd/test.py b/tunerlistd/test.py deleted file mode 100644 index 2ea7432..0000000 --- a/tunerlistd/test.py +++ /dev/null @@ -1,46 +0,0 @@ -import pigpio -import time - -# Connect to pigpio daemon -pi = pigpio.pi() - -if not pi.connected: - print("not connected") - exit(0) - -GPIO_PIN = 27 # GPIO pin 27 -BAUD_RATE = 4800 # 4800 baud - -# Start serial bit bang -try: - pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8) -except: - pi.bb_serial_read_close(GPIO_PIN) - pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8) - -buffer = b'' # Buffer for incoming data -last_buffer = b'' - -try: - while True: - (count, data) = pi.bb_serial_read(GPIO_PIN) - if count > 0: - buffer += data - while b'\r\n' in buffer: - line, buffer = buffer.split(b'\r\n', 1) # Split at newline - # Print each byte in hex, separated by spaces - if not line is last_buffer: - last_buffer = line - for c in line: - try: - print(c.decode("ascii"), " ", end="") - except: - print(hex(c), " ", end="") - print() - -except KeyboardInterrupt: - pass - -# Cleanup -pi.bb_serial_read_close(GPIO_PIN) -pi.stop() diff --git a/tunerlistd/tunerlist.py b/tunerlistd/tunerlist.py index bb4a37b..645d3b0 100644 --- a/tunerlistd/tunerlist.py +++ b/tunerlistd/tunerlist.py @@ -1,12 +1,10 @@ -from pigpio import pi, error, OUTPUT -from asyncio import Queue, get_event_loop, sleep, CancelledError, create_task +from asyncio import Queue, get_event_loop, CancelledError from concurrent.futures import ThreadPoolExecutor -from os import getenv +from serial import Serial -RX_PIN = 27 -TX_PIN = 17 -BAUD_RATE = 4800 -DEBUG = False# getenv("TL_DEBUG", "false") == "true" +BAUD_RATE = 9600 +PORT = "/dev/ttyAMA2" +DEBUG = False BUTTONS = { "ok": [0x00, 0x00], @@ -15,11 +13,11 @@ BUTTONS = { "source_r_hold": [0x00, 0x81], "source_l": [0x00, 0x02], "source_l_hold": [0x00, 0x82], - "volume_up": [0x00, 0x03], + "VOL_UP": [0x00, 0x03], "volume_up_hold": [0x00, 0x43], - "volume_down": [0x00, 0x04], + "VOL_DOWN": [0x00, 0x04], "volume_down_hold": [0x00, 0x44], - "pause": [0x00, 0x05], + "PAUSE": [0x00, 0x05], "wheel_up": [0x01, 0x01], "wheel_down": [0x01, 0x41], } @@ -54,17 +52,8 @@ class TunerListState: class TunerList: def __init__(self): - self.pi = pi() - self.pi.set_mode(TX_PIN, OUTPUT) - while True: - try: - self.pi.bb_serial_read_open(RX_PIN, BAUD_RATE, 8) - break - except error as e: - print(e) - self.pi.bb_serial_read_close(RX_PIN) - self.last_buffer = b'' - + self.executor = ThreadPoolExecutor() + self.serial = Serial(PORT, BAUD_RATE) self.queue = Queue() self.text = "" self.preset = None @@ -98,36 +87,25 @@ class TunerList: self.queue.put_nowait(TunerListState(self.text, self.preset)) async def send_button(self, btn): - data = bytearray(BUTTONS[btn]) - self.pi.wave_clear() - self.pi.wave_add_serial(TX_PIN, BAUD_RATE, data) - wave = self.pi.wave_create() - self.pi.wave_send_once(wave) - while self.pi.wave_tx_busy(): - await sleep(0) - self.pi.wave_delete(wave) + return await get_event_loop().run_in_executor( + self.executor, self.serial.write, bytearray(BUTTONS[btn]) + ) - async def run(self): - buffer = b'' + def _run(self): + last_line = "" try: while True: - (count, data) = self.pi.bb_serial_read(RX_PIN) - if count > 0: - buffer += data - while b'\r\n' in buffer: - line, buffer = buffer.split(b'\r\n', 1) - if not line == self.last_buffer: - self.last_buffer = line - for c in line: - print(hex(c), " ", end="") - print() - self._process_line(line) - await sleep(0.1) + line = self.serial.readline() + if last_line == line: + continue + if DEBUG: + for c in line: + print(hex(c), " ", end="") + print() + last_line = line + self._process_line(line) except CancelledError: - self.pi.bb_serial_read_close(RX_PIN) - self.pi.stop() return - - async def yield_new_state(self): - while True: - yield await self.queue.get() + + async def run(self): + self.future = await get_event_loop().run_in_executor(self.executor, self._run) \ No newline at end of file