final fixes

This commit is contained in:
marios8543 2025-05-15 23:01:40 +03:00
parent ed173849b1
commit 9c9e2820a8
5 changed files with 163 additions and 25 deletions

58
tunerlistd/arp.py Normal file
View File

@ -0,0 +1,58 @@
import asyncio
import re
from enum import Enum
import subprocess
IP_REGEX = re.compile(r"\d+\.\d+\.\d+\.\d+")
class EventType(Enum):
CONNECTED = 0
DISCONNECTED = 1
class HostapdEvent:
def __init__(self, ip, type):
self.ip = ip
self.type: EventType = type
class ArpWatcher:
def __init__(self, interface="wlan0"):
self.queue = asyncio.Queue()
self.interface = interface
async def _read_arp_table(self):
def _parse_arp():
try:
output = subprocess.check_output(['ip', 'neigh', 'show', 'dev', self.interface], text=True)
return set(
m.group(0)
for line in output.splitlines()
if "lladdr" in line and "REACHABLE" in line
for m in [IP_REGEX.search(line)] if m
)
except Exception as e:
print(f"[!] Error reading ARP table: {e}")
return set()
return await asyncio.to_thread(_parse_arp)
async def run(self, poll_interval=5):
print(f"Monitoring ARP table on {self.interface}...")
known_ips = await self._read_arp_table()
while True:
await asyncio.sleep(poll_interval)
current_ips = await self._read_arp_table()
new_ips = current_ips - known_ips
left_ips = known_ips - current_ips
for ip in new_ips:
await self.queue.put(HostapdEvent(ip, EventType.CONNECTED))
print(f"[+] Device connected: {ip}")
for ip in left_ips:
await self.queue.put(HostapdEvent(ip, EventType.DISCONNECTED))
print(f"[-] Device disconnected: {ip}")
known_ips = current_ips

View File

@ -1,7 +1,7 @@
from pigpio import pi, error from pigpio import pi, error
from asyncio import Queue, sleep, CancelledError from asyncio import Queue, sleep, CancelledError
RX_PIN = 23 # OR 24 RX_PIN = 24 # OR 24
BAUD_RATE = 600 BAUD_RATE = 600
DEBUG = False DEBUG = False
@ -9,7 +9,7 @@ PRESSED = 0
RELEASED = 1 RELEASED = 1
BUTTONS = [ BUTTONS = [
["OK", "VOL_UP", "VOL_DOWN"], ["OK", "VOL_UP", "VOL_DOWN"],
["SRC_DOWN", "PAUSE", "SRC_UP"] ["SRC_DOWN", "PAUSE_BTN", "SRC_UP"]
] ]
class ButtonEvent: class ButtonEvent:
@ -17,6 +17,12 @@ class ButtonEvent:
self.button = btn self.button = btn
self.state = state self.state = state
def __repr__(self):
return str(self)
def __str__(self):
return f"{self.button} {self.state}"
class ButtonDecoder: class ButtonDecoder:
def __init__(self): def __init__(self):
self.pi = pi() self.pi = pi()
@ -32,33 +38,38 @@ class ButtonDecoder:
self.queue = Queue() self.queue = Queue()
def _process_line(self, line): def _process_line(self, line):
if len(line) == 1: try:
if line[0] == 7: if len(line) == 1:
self.queue.put_nowait(ButtonEvent("WHEEL_UP")) if line[0] == 7:
elif line[0] == 6: self.queue.put_nowait(ButtonEvent("WHEEL_UP"))
self.queue.put_nowait(ButtonEvent("WHEEL_DOWN")) elif line[0] == 6:
elif len(line) == 3: self.queue.put_nowait(ButtonEvent("WHEEL_DOWN"))
btn_name = BUTTONS[line[0]][line[1]] elif len(line) == 3:
action = line[2] btn_name = BUTTONS[line[0]][line[1]]
self.queue.put_nowait(ButtonEvent(btn_name, action)) action = line[2]
self.queue.put_nowait(ButtonEvent(btn_name, action))
except Exception as e:
print(e)
async def run(self): async def run(self):
buffer = b'' buffer = bytearray()
try: try:
while True: while True:
(count, data) = self.pi.bb_serial_read(RX_PIN) (count, data) = self.pi.bb_serial_read(RX_PIN)
if count > 0: if count > 0:
buffer += data for byte in data:
while 0xFF in buffer: if byte == 0xFF:
line, buffer = buffer.split(0xFF, 1) self._process_line(buffer)
if not line == self.last_buffer: print(buffer)
self.last_buffer = line buffer.clear()
for c in line: else:
print(hex(c), " ", end="") buffer.append(byte)
print()
self._process_line(line)
await sleep(0.1) await sleep(0.1)
except CancelledError: except CancelledError:
self.pi.bb_serial_read_close(RX_PIN) self.pi.bb_serial_read_close(RX_PIN)
self.pi.stop() self.pi.stop()
return return
if __name__ == "__main__":
from asyncio import get_event_loop
get_event_loop().run_until_complete(ButtonDecoder().run())

View File

@ -1,7 +1,6 @@
# ------------------------------ BUTTON MAPPINGS ------------------------------------------ # ------------------------------ BUTTON MAPPINGS ------------------------------------------
# BUTTON MATRIX: OK, VOL_UP, VOL_DOWN, SRC_UP, SRC_DOWN, PAUSE, WHEEL_UP, WHEEL_DOWN # BUTTON MATRIX: OK, VOL_UP, VOL_DOWN, SRC_UP, SRC_DOWN, PAUSE_BTN, WHEEL_UP, WHEEL_DOWN
# CD CHANGER: NEXT, PREV, FAST_FORW, PLAY, PAUSE, RESUME, STOP, FAST_BACK, CD_(1-6), STALK, RAND_ON, RAND_OFF # CD CHANGER: NEXT, PREV, FAST_FORW, PLAY, PAUSE, RESUME, STOP, FAST_BACK, CD_(1-6), STALK, RAND_ON, RAND_OFF
# RADIO: VOL_UP, VOL_DOWN, PAUSE
# ANDROID AUTO: ENTER, LEFT, RIGHT, UP, DOWN, BACK, HOME, PHONE, CALL_END, PLAY, PAUSE, PREV_TRACK, NEXT_TRACK, TOGGLE_PLAY, VOICE, WHEEL_LEFT, WHEEL_RIGHT # ANDROID AUTO: ENTER, LEFT, RIGHT, UP, DOWN, BACK, HOME, PHONE, CALL_END, PLAY, PAUSE, PREV_TRACK, NEXT_TRACK, TOGGLE_PLAY, VOICE, WHEEL_LEFT, WHEEL_RIGHT
def send_to_radio(btn): def send_to_radio(btn):
@ -13,7 +12,23 @@ def send_to_aa(btn):
BUTTON_MAPPINGS = { BUTTON_MAPPINGS = {
"VOL_UP": (send_to_radio, "VOL_UP"), "VOL_UP": (send_to_radio, "VOL_UP"),
"VOL_DOWN": (send_to_radio, "VOL_DOWN"), "VOL_DOWN": (send_to_radio, "VOL_DOWN"),
"PAUSE": (send_to_radio, "PAUSE") "PAUSE_BTN": (send_to_radio, "PAUSE"),
"OK": (send_to_aa, "ENTER"),
"WHEEL_UP": (send_to_aa, "WHEEL_LEFT"),
"WHEEL_DOWN": (send_to_aa, "WHEEL_RIGHT"),
"PAUSE": (send_to_aa, "PAUSE"),
"STOP": (send_to_aa, "PAUSE"),
"RESUME": (send_to_aa, "PLAY"),
"SRC_UP": (send_to_aa, "UP"),
"SRC_DOWN": (send_to_aa, "DOWN"),
"NEXT": (send_to_aa, "NEXT_TRACK"),
"PREV": (send_to_aa, "PREV_TRACK"),
"CD_1": (send_to_aa, "BACK"),
"CD_2": (send_to_aa, "HOME"),
"CD_3": (send_to_aa, "CALL_END"),
"CD_4": (send_to_aa, "PHONE"),
"CD_5": (send_to_aa, "VOICE")
} }
# ----------------------------------------------------------------------------------------- # -----------------------------------------------------------------------------------------
@ -21,6 +36,7 @@ BUTTON_MAPPINGS = {
from tunerlist import TunerList, TunerListState from tunerlist import TunerList, TunerListState
from bluetooth import Bluetooth, BluetoothState from bluetooth import Bluetooth, BluetoothState
from buttondecoder import ButtonDecoder from buttondecoder import ButtonDecoder
from arp import ArpWatcher, EventType
from json import dumps from json import dumps
from fastapi import FastAPI, WebSocket, Request from fastapi import FastAPI, WebSocket, Request
@ -37,6 +53,7 @@ background_tasks = []
tunerlist = TunerList() tunerlist = TunerList()
bluetooth = Bluetooth() bluetooth = Bluetooth()
buttons = ButtonDecoder() buttons = ButtonDecoder()
arp = ArpWatcher()
last_tunerlist_state: TunerListState = None last_tunerlist_state: TunerListState = None
last_bluetooth_state: BluetoothState = None last_bluetooth_state: BluetoothState = None
@ -79,20 +96,33 @@ async def button_listener():
try: try:
while True: while True:
button = await buttons.queue.get() button = await buttons.queue.get()
print(button)
if button in BUTTON_MAPPINGS: if button in BUTTON_MAPPINGS:
func, arg = BUTTON_MAPPINGS[button] func, arg = BUTTON_MAPPINGS[button]
func(arg) func(arg)
except CancelledError: except CancelledError:
pass pass
async def arp_listener():
try:
while True:
ev = await arp.queue.get()
if ev.type == EventType.CONNECTED:
queue.put_nowait({"wireless_aa_ip": ev.ip})
except CancelledError:
pass
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
for i in [ for i in [
tunerlist.run(), tunerlist.run(),
# bluetooth.run(), # bluetooth.run(),
buttons.run(), buttons.run(),
arp.run(),
tunerlist_listener(), tunerlist_listener(),
bluetooth_listener() bluetooth_listener(),
button_listener(),
arp_listener()
]: ]:
background_tasks.append(create_task(i)) background_tasks.append(create_task(i))

35
tunerlistd/test.py Normal file
View File

@ -0,0 +1,35 @@
import pigpio
import time
PIN = 24
BAUD = 600
pi = pigpio.pi()
if not pi.connected:
raise RuntimeError("pigpio daemon not running")
# Open bit-bang serial read on GPIO 24 at 600 baud
pi.bb_serial_read_close(PIN)
pi.bb_serial_read_open(PIN, BAUD)
try:
buffer = bytearray()
print("Listening on GPIO 24 for serial data...")
while True:
(count, data) = pi.bb_serial_read(PIN)
if count > 0:
for byte in data:
if byte == 0xFF:
print("Received:", buffer)
buffer.clear()
else:
buffer.append(byte)
time.sleep(0.01) # Slight delay to avoid CPU hogging
except KeyboardInterrupt:
print("\nStopped by user.")
finally:
pi.bb_serial_read_close(PIN)
pi.stop()

View File

@ -82,6 +82,10 @@ class TunerList:
self.text = text self.text = text
except Exception as e: except Exception as e:
print(e) print(e)
txt = ""
for c in line[5:]:
txt += str(hex(c))
self.text = txt
finally: finally:
if DEBUG: if DEBUG:
print("[",self.text,"]", f" [P {self.preset}]") print("[",self.text,"]", f" [P {self.preset}]")