From 9c9e2820a815dde246328fda5e0f60a7be241fdb Mon Sep 17 00:00:00 2001 From: marios8543 Date: Thu, 15 May 2025 23:01:40 +0300 Subject: [PATCH] final fixes --- tunerlistd/arp.py | 58 +++++++++++++++++++++++++++++++++++++ tunerlistd/buttondecoder.py | 53 +++++++++++++++++++-------------- tunerlistd/main.py | 38 +++++++++++++++++++++--- tunerlistd/test.py | 35 ++++++++++++++++++++++ tunerlistd/tunerlist.py | 4 +++ 5 files changed, 163 insertions(+), 25 deletions(-) create mode 100644 tunerlistd/arp.py create mode 100644 tunerlistd/test.py diff --git a/tunerlistd/arp.py b/tunerlistd/arp.py new file mode 100644 index 0000000..f9443a7 --- /dev/null +++ b/tunerlistd/arp.py @@ -0,0 +1,58 @@ +import asyncio +import re +from enum import Enum +import subprocess + +IP_REGEX = re.compile(r"\d+\.\d+\.\d+\.\d+") + +class EventType(Enum): + CONNECTED = 0 + DISCONNECTED = 1 + +class HostapdEvent: + def __init__(self, ip, type): + self.ip = ip + self.type: EventType = type + +class ArpWatcher: + def __init__(self, interface="wlan0"): + self.queue = asyncio.Queue() + self.interface = interface + + async def _read_arp_table(self): + def _parse_arp(): + try: + output = subprocess.check_output(['ip', 'neigh', 'show', 'dev', self.interface], text=True) + return set( + m.group(0) + for line in output.splitlines() + if "lladdr" in line and "REACHABLE" in line + for m in [IP_REGEX.search(line)] if m + ) + except Exception as e: + print(f"[!] Error reading ARP table: {e}") + return set() + + return await asyncio.to_thread(_parse_arp) + + async def run(self, poll_interval=5): + print(f"Monitoring ARP table on {self.interface}...") + + known_ips = await self._read_arp_table() + + while True: + await asyncio.sleep(poll_interval) + current_ips = await self._read_arp_table() + + new_ips = current_ips - known_ips + left_ips = known_ips - current_ips + + for ip in new_ips: + await self.queue.put(HostapdEvent(ip, EventType.CONNECTED)) + print(f"[+] Device connected: {ip}") + + for ip in left_ips: + await self.queue.put(HostapdEvent(ip, EventType.DISCONNECTED)) + print(f"[-] Device disconnected: {ip}") + + known_ips = current_ips diff --git a/tunerlistd/buttondecoder.py b/tunerlistd/buttondecoder.py index 80a21fb..00a2b96 100644 --- a/tunerlistd/buttondecoder.py +++ b/tunerlistd/buttondecoder.py @@ -1,7 +1,7 @@ from pigpio import pi, error from asyncio import Queue, sleep, CancelledError -RX_PIN = 23 # OR 24 +RX_PIN = 24 # OR 24 BAUD_RATE = 600 DEBUG = False @@ -9,13 +9,19 @@ PRESSED = 0 RELEASED = 1 BUTTONS = [ ["OK", "VOL_UP", "VOL_DOWN"], - ["SRC_DOWN", "PAUSE", "SRC_UP"] + ["SRC_DOWN", "PAUSE_BTN", "SRC_UP"] ] class ButtonEvent: def __init__(self, btn, state=None): self.button = btn self.state = state + + def __repr__(self): + return str(self) + + def __str__(self): + return f"{self.button} {self.state}" class ButtonDecoder: def __init__(self): @@ -32,33 +38,38 @@ class ButtonDecoder: self.queue = Queue() def _process_line(self, line): - if len(line) == 1: - if line[0] == 7: - self.queue.put_nowait(ButtonEvent("WHEEL_UP")) - elif line[0] == 6: - self.queue.put_nowait(ButtonEvent("WHEEL_DOWN")) - elif len(line) == 3: - btn_name = BUTTONS[line[0]][line[1]] - action = line[2] - self.queue.put_nowait(ButtonEvent(btn_name, action)) + try: + if len(line) == 1: + if line[0] == 7: + self.queue.put_nowait(ButtonEvent("WHEEL_UP")) + elif line[0] == 6: + self.queue.put_nowait(ButtonEvent("WHEEL_DOWN")) + elif len(line) == 3: + btn_name = BUTTONS[line[0]][line[1]] + action = line[2] + self.queue.put_nowait(ButtonEvent(btn_name, action)) + except Exception as e: + print(e) async def run(self): - buffer = b'' + buffer = bytearray() try: while True: (count, data) = self.pi.bb_serial_read(RX_PIN) if count > 0: - buffer += data - while 0xFF in buffer: - line, buffer = buffer.split(0xFF, 1) - if not line == self.last_buffer: - self.last_buffer = line - for c in line: - print(hex(c), " ", end="") - print() - self._process_line(line) + for byte in data: + if byte == 0xFF: + self._process_line(buffer) + print(buffer) + buffer.clear() + else: + buffer.append(byte) await sleep(0.1) except CancelledError: self.pi.bb_serial_read_close(RX_PIN) self.pi.stop() return + +if __name__ == "__main__": + from asyncio import get_event_loop + get_event_loop().run_until_complete(ButtonDecoder().run()) \ No newline at end of file diff --git a/tunerlistd/main.py b/tunerlistd/main.py index 959d0f5..f3fd47a 100644 --- a/tunerlistd/main.py +++ b/tunerlistd/main.py @@ -1,7 +1,6 @@ # ------------------------------ BUTTON MAPPINGS ------------------------------------------ -# BUTTON MATRIX: OK, VOL_UP, VOL_DOWN, SRC_UP, SRC_DOWN, PAUSE, WHEEL_UP, WHEEL_DOWN +# BUTTON MATRIX: OK, VOL_UP, VOL_DOWN, SRC_UP, SRC_DOWN, PAUSE_BTN, WHEEL_UP, WHEEL_DOWN # CD CHANGER: NEXT, PREV, FAST_FORW, PLAY, PAUSE, RESUME, STOP, FAST_BACK, CD_(1-6), STALK, RAND_ON, RAND_OFF -# RADIO: VOL_UP, VOL_DOWN, PAUSE # ANDROID AUTO: ENTER, LEFT, RIGHT, UP, DOWN, BACK, HOME, PHONE, CALL_END, PLAY, PAUSE, PREV_TRACK, NEXT_TRACK, TOGGLE_PLAY, VOICE, WHEEL_LEFT, WHEEL_RIGHT def send_to_radio(btn): @@ -13,7 +12,23 @@ def send_to_aa(btn): BUTTON_MAPPINGS = { "VOL_UP": (send_to_radio, "VOL_UP"), "VOL_DOWN": (send_to_radio, "VOL_DOWN"), - "PAUSE": (send_to_radio, "PAUSE") + "PAUSE_BTN": (send_to_radio, "PAUSE"), + + "OK": (send_to_aa, "ENTER"), + "WHEEL_UP": (send_to_aa, "WHEEL_LEFT"), + "WHEEL_DOWN": (send_to_aa, "WHEEL_RIGHT"), + "PAUSE": (send_to_aa, "PAUSE"), + "STOP": (send_to_aa, "PAUSE"), + "RESUME": (send_to_aa, "PLAY"), + "SRC_UP": (send_to_aa, "UP"), + "SRC_DOWN": (send_to_aa, "DOWN"), + "NEXT": (send_to_aa, "NEXT_TRACK"), + "PREV": (send_to_aa, "PREV_TRACK"), + "CD_1": (send_to_aa, "BACK"), + "CD_2": (send_to_aa, "HOME"), + "CD_3": (send_to_aa, "CALL_END"), + "CD_4": (send_to_aa, "PHONE"), + "CD_5": (send_to_aa, "VOICE") } # ----------------------------------------------------------------------------------------- @@ -21,6 +36,7 @@ BUTTON_MAPPINGS = { from tunerlist import TunerList, TunerListState from bluetooth import Bluetooth, BluetoothState from buttondecoder import ButtonDecoder +from arp import ArpWatcher, EventType from json import dumps from fastapi import FastAPI, WebSocket, Request @@ -37,6 +53,7 @@ background_tasks = [] tunerlist = TunerList() bluetooth = Bluetooth() buttons = ButtonDecoder() +arp = ArpWatcher() last_tunerlist_state: TunerListState = None last_bluetooth_state: BluetoothState = None @@ -79,20 +96,33 @@ async def button_listener(): try: while True: button = await buttons.queue.get() + print(button) if button in BUTTON_MAPPINGS: func, arg = BUTTON_MAPPINGS[button] func(arg) except CancelledError: pass +async def arp_listener(): + try: + while True: + ev = await arp.queue.get() + if ev.type == EventType.CONNECTED: + queue.put_nowait({"wireless_aa_ip": ev.ip}) + except CancelledError: + pass + @app.on_event("startup") async def startup_event(): for i in [ tunerlist.run(), # bluetooth.run(), buttons.run(), + arp.run(), tunerlist_listener(), - bluetooth_listener() + bluetooth_listener(), + button_listener(), + arp_listener() ]: background_tasks.append(create_task(i)) diff --git a/tunerlistd/test.py b/tunerlistd/test.py new file mode 100644 index 0000000..ceaff3c --- /dev/null +++ b/tunerlistd/test.py @@ -0,0 +1,35 @@ +import pigpio +import time + +PIN = 24 +BAUD = 600 + +pi = pigpio.pi() +if not pi.connected: + raise RuntimeError("pigpio daemon not running") + +# Open bit-bang serial read on GPIO 24 at 600 baud +pi.bb_serial_read_close(PIN) +pi.bb_serial_read_open(PIN, BAUD) + +try: + buffer = bytearray() + print("Listening on GPIO 24 for serial data...") + + while True: + (count, data) = pi.bb_serial_read(PIN) + if count > 0: + for byte in data: + if byte == 0xFF: + print("Received:", buffer) + buffer.clear() + else: + buffer.append(byte) + time.sleep(0.01) # Slight delay to avoid CPU hogging + +except KeyboardInterrupt: + print("\nStopped by user.") + +finally: + pi.bb_serial_read_close(PIN) + pi.stop() diff --git a/tunerlistd/tunerlist.py b/tunerlistd/tunerlist.py index 318bee7..be6989e 100644 --- a/tunerlistd/tunerlist.py +++ b/tunerlistd/tunerlist.py @@ -82,6 +82,10 @@ class TunerList: self.text = text except Exception as e: print(e) + txt = "" + for c in line[5:]: + txt += str(hex(c)) + self.text = txt finally: if DEBUG: print("[",self.text,"]", f" [P {self.preset}]")