add buttondecoder. change tunerlist to use hardware uart 2

This commit is contained in:
marios8543 2025-05-09 18:00:40 +03:00
parent 7b6bfb9657
commit 236c3076bb
5 changed files with 128 additions and 112 deletions

View File

@ -14,6 +14,13 @@
"env": { "env": {
"TL_DEBUG": "true" "TL_DEBUG": "true"
} }
},
{
"name": "Test 2",
"type": "debugpy",
"request": "launch",
"program": "test2.py",
"console": "integratedTerminal",
} }
] ]
} }

View File

@ -0,0 +1,60 @@
from pigpio import pi, error
from asyncio import Queue, sleep, CancelledError
RX_PIN = 23 # OR 24
BAUD_RATE = 600
DEBUG = False
PRESSED = 0
RELEASED = 1
BUTTONS = [
["OK", "VOL_UP", "VOL_DOWN"],
["SRC_DOWN", "PAUSE", "SRC_UP"]
]
class ButtonDecoder:
def __init__(self):
self.pi = pi()
while True:
try:
self.pi.bb_serial_read_open(RX_PIN, BAUD_RATE, 8)
break
except error as e:
print(e)
self.pi.bb_serial_read_close(RX_PIN)
self.last_buffer = b''
self.queue = Queue()
def _process_line(self, line):
if len(line) == 1:
if line[0] == 7:
self.queue.put_nowait("WHEEL_UP")
elif line[0] == 6:
self.queue.put_nowait("WHEEL_DOWN")
elif len(line) == 3:
btn_name = BUTTONS[line[0]][line[1]]
action = line[2]
if action == PRESSED:
self.queue.put_nowait(btn_name)
async def run(self):
buffer = b''
try:
while True:
(count, data) = self.pi.bb_serial_read(RX_PIN)
if count > 0:
buffer += data
while 0xFF in buffer:
line, buffer = buffer.split(0xFF, 1)
if not line == self.last_buffer:
self.last_buffer = line
for c in line:
print(hex(c), " ", end="")
print()
self._process_line(line)
await sleep(0.1)
except CancelledError:
self.pi.bb_serial_read_close(RX_PIN)
self.pi.stop()
return

View File

@ -1,10 +1,11 @@
from tunerlist import TunerList, TunerListState from tunerlist import TunerList, TunerListState
from bluetooth import Bluetooth, BluetoothState from bluetooth import Bluetooth, BluetoothState
from buttondecoder import ButtonDecoder
from json import dumps from json import dumps
from fastapi import FastAPI, WebSocket, Request from fastapi import FastAPI, WebSocket, Request
from fastapi.responses import PlainTextResponse, JSONResponse from fastapi.responses import PlainTextResponse, JSONResponse
from asyncio import Event, create_task, CancelledError from asyncio import Event, create_task, CancelledError, Queue
from uvicorn import run from uvicorn import run
app = FastAPI() app = FastAPI()
@ -13,17 +14,33 @@ background_tasks = []
tunerlist = TunerList() tunerlist = TunerList()
bluetooth = Bluetooth() bluetooth = Bluetooth()
buttons = ButtonDecoder()
last_tunerlist_state: TunerListState = None last_tunerlist_state: TunerListState = None
last_bluetooth_state: BluetoothState = None last_bluetooth_state: BluetoothState = None
state_to_send: dict = {} state_to_send: dict = {}
queue = Queue()
def build_state_to_send():
if not last_tunerlist_state:
return
if not last_bluetooth_state:
state_to_send= {"radio": last_tunerlist_state.to_dict()}
elif last_tunerlist_state.is_temp_view and last_bluetooth_state.status == "playing":
state_to_send = {"bluetooth": last_bluetooth_state.to_dict()}
state_to_send["bluetooth"]["screen"] = last_tunerlist_state.text
elif last_tunerlist_state.is_playing_ext:
state_to_send = {"bluetooth": last_bluetooth_state.to_dict()}
else:
state_to_send = {"radio": last_tunerlist_state.to_dict()}
queue.put_nowait(state_to_send)
async def tunerlist_listener(): async def tunerlist_listener():
global last_tunerlist_state global last_tunerlist_state
try: try:
while True: while True:
last_tunerlist_state = await tunerlist.queue.get() last_tunerlist_state = await tunerlist.queue.get()
event.set() build_state_to_send()
except CancelledError: except CancelledError:
pass pass
@ -32,7 +49,18 @@ async def bluetooth_listener():
try: try:
while True: while True:
last_bluetooth_state = await bluetooth.queue.get() last_bluetooth_state = await bluetooth.queue.get()
event.set() build_state_to_send()
except CancelledError:
pass
async def button_listener():
try:
while True:
button = await buttons.queue.get()
if button in ("VOL_UP", "VOL_DOWN", "PAUSE"):
create_task(tunerlist.send_button(button))
else:
queue.put_nowait(dumps({"button": button}))
except CancelledError: except CancelledError:
pass pass
@ -41,6 +69,7 @@ async def startup_event():
for i in [ for i in [
tunerlist.run(), tunerlist.run(),
# bluetooth.run(), # bluetooth.run(),
buttons.run(),
tunerlist_listener(), tunerlist_listener(),
bluetooth_listener() bluetooth_listener()
]: ]:
@ -56,20 +85,8 @@ async def websocket_endpoint(websocket: WebSocket):
global state_to_send global state_to_send
await websocket.accept() await websocket.accept()
while True: while True:
await event.wait() payload = await queue.get()
if not last_tunerlist_state: await websocket.send_text(dumps(payload))
continue
if not last_bluetooth_state:
state_to_send= {"radio": last_tunerlist_state.to_dict()}
elif last_tunerlist_state.is_temp_view and last_bluetooth_state.status == "playing":
state_to_send = {"bluetooth": last_bluetooth_state.to_dict()}
state_to_send["bluetooth"]["screen"] = last_tunerlist_state.text
elif last_tunerlist_state.is_playing_ext:
state_to_send = {"bluetooth": last_bluetooth_state.to_dict()}
else:
state_to_send = {"radio": last_tunerlist_state.to_dict()}
await websocket.send_text(dumps(state_to_send))
event.clear()
@app.get("/state", response_class=JSONResponse) @app.get("/state", response_class=JSONResponse)
async def state_endpoint(): async def state_endpoint():

View File

@ -1,46 +0,0 @@
import pigpio
import time
# Connect to pigpio daemon
pi = pigpio.pi()
if not pi.connected:
print("not connected")
exit(0)
GPIO_PIN = 27 # GPIO pin 27
BAUD_RATE = 4800 # 4800 baud
# Start serial bit bang
try:
pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8)
except:
pi.bb_serial_read_close(GPIO_PIN)
pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8)
buffer = b'' # Buffer for incoming data
last_buffer = b''
try:
while True:
(count, data) = pi.bb_serial_read(GPIO_PIN)
if count > 0:
buffer += data
while b'\r\n' in buffer:
line, buffer = buffer.split(b'\r\n', 1) # Split at newline
# Print each byte in hex, separated by spaces
if not line is last_buffer:
last_buffer = line
for c in line:
try:
print(c.decode("ascii"), " ", end="")
except:
print(hex(c), " ", end="")
print()
except KeyboardInterrupt:
pass
# Cleanup
pi.bb_serial_read_close(GPIO_PIN)
pi.stop()

View File

@ -1,12 +1,10 @@
from pigpio import pi, error, OUTPUT from asyncio import Queue, get_event_loop, CancelledError
from asyncio import Queue, get_event_loop, sleep, CancelledError, create_task
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from os import getenv from serial import Serial
RX_PIN = 27 BAUD_RATE = 9600
TX_PIN = 17 PORT = "/dev/ttyAMA2"
BAUD_RATE = 4800 DEBUG = False
DEBUG = False# getenv("TL_DEBUG", "false") == "true"
BUTTONS = { BUTTONS = {
"ok": [0x00, 0x00], "ok": [0x00, 0x00],
@ -15,11 +13,11 @@ BUTTONS = {
"source_r_hold": [0x00, 0x81], "source_r_hold": [0x00, 0x81],
"source_l": [0x00, 0x02], "source_l": [0x00, 0x02],
"source_l_hold": [0x00, 0x82], "source_l_hold": [0x00, 0x82],
"volume_up": [0x00, 0x03], "VOL_UP": [0x00, 0x03],
"volume_up_hold": [0x00, 0x43], "volume_up_hold": [0x00, 0x43],
"volume_down": [0x00, 0x04], "VOL_DOWN": [0x00, 0x04],
"volume_down_hold": [0x00, 0x44], "volume_down_hold": [0x00, 0x44],
"pause": [0x00, 0x05], "PAUSE": [0x00, 0x05],
"wheel_up": [0x01, 0x01], "wheel_up": [0x01, 0x01],
"wheel_down": [0x01, 0x41], "wheel_down": [0x01, 0x41],
} }
@ -54,17 +52,8 @@ class TunerListState:
class TunerList: class TunerList:
def __init__(self): def __init__(self):
self.pi = pi() self.executor = ThreadPoolExecutor()
self.pi.set_mode(TX_PIN, OUTPUT) self.serial = Serial(PORT, BAUD_RATE)
while True:
try:
self.pi.bb_serial_read_open(RX_PIN, BAUD_RATE, 8)
break
except error as e:
print(e)
self.pi.bb_serial_read_close(RX_PIN)
self.last_buffer = b''
self.queue = Queue() self.queue = Queue()
self.text = "" self.text = ""
self.preset = None self.preset = None
@ -98,36 +87,25 @@ class TunerList:
self.queue.put_nowait(TunerListState(self.text, self.preset)) self.queue.put_nowait(TunerListState(self.text, self.preset))
async def send_button(self, btn): async def send_button(self, btn):
data = bytearray(BUTTONS[btn]) return await get_event_loop().run_in_executor(
self.pi.wave_clear() self.executor, self.serial.write, bytearray(BUTTONS[btn])
self.pi.wave_add_serial(TX_PIN, BAUD_RATE, data) )
wave = self.pi.wave_create()
self.pi.wave_send_once(wave)
while self.pi.wave_tx_busy():
await sleep(0)
self.pi.wave_delete(wave)
async def run(self): def _run(self):
buffer = b'' last_line = ""
try: try:
while True: while True:
(count, data) = self.pi.bb_serial_read(RX_PIN) line = self.serial.readline()
if count > 0: if last_line == line:
buffer += data continue
while b'\r\n' in buffer: if DEBUG:
line, buffer = buffer.split(b'\r\n', 1)
if not line == self.last_buffer:
self.last_buffer = line
for c in line: for c in line:
print(hex(c), " ", end="") print(hex(c), " ", end="")
print() print()
last_line = line
self._process_line(line) self._process_line(line)
await sleep(0.1)
except CancelledError: except CancelledError:
self.pi.bb_serial_read_close(RX_PIN)
self.pi.stop()
return return
async def yield_new_state(self): async def run(self):
while True: self.future = await get_event_loop().run_in_executor(self.executor, self._run)
yield await self.queue.get()