Fixes
This commit is contained in:
parent
7fdd5c3fdf
commit
ae666a53bd
@ -10,7 +10,7 @@ from uvicorn import run
|
|||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
event = Event()
|
event = Event()
|
||||||
|
|
||||||
tunerlist = TunerList("")
|
tunerlist = TunerList()
|
||||||
bluetooth = Bluetooth()
|
bluetooth = Bluetooth()
|
||||||
|
|
||||||
last_tunerlist_state: TunerListState = None
|
last_tunerlist_state: TunerListState = None
|
||||||
@ -19,15 +19,21 @@ state_to_send: dict = {}
|
|||||||
|
|
||||||
async def tunerlist_listener():
|
async def tunerlist_listener():
|
||||||
global last_tunerlist_state
|
global last_tunerlist_state
|
||||||
while True:
|
try:
|
||||||
last_tunerlist_state = await tunerlist.queue.get()
|
while True:
|
||||||
event.set()
|
last_tunerlist_state = await tunerlist.queue.get()
|
||||||
|
event.set()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
async def bluetooth_listener():
|
async def bluetooth_listener():
|
||||||
global last_bluetooth_state
|
global last_bluetooth_state
|
||||||
while True:
|
try:
|
||||||
last_bluetooth_state = await bluetooth.queue.get()
|
while True:
|
||||||
event.set()
|
last_bluetooth_state = await bluetooth.queue.get()
|
||||||
|
event.set()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
@app.on_event("startup")
|
@app.on_event("startup")
|
||||||
async def startup_event():
|
async def startup_event():
|
||||||
@ -39,7 +45,7 @@ async def startup_event():
|
|||||||
]:
|
]:
|
||||||
create_task(i)
|
create_task(i)
|
||||||
|
|
||||||
@app.websocket("/")
|
@app.websocket("/ws")
|
||||||
async def websocket_endpoint(websocket: WebSocket):
|
async def websocket_endpoint(websocket: WebSocket):
|
||||||
await websocket.accept()
|
await websocket.accept()
|
||||||
while True:
|
while True:
|
||||||
@ -67,4 +73,4 @@ async def button_endpoint(request: Request):
|
|||||||
return btn
|
return btn
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
run(app=app, host="0.0.0.0", port=5000, log_level="info")
|
run(app=app, host="0.0.0.0", port=5959, log_level="info")
|
||||||
|
37
tunerlistd/test.py
Normal file
37
tunerlistd/test.py
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
import pigpio
|
||||||
|
import time
|
||||||
|
|
||||||
|
# Connect to pigpio daemon
|
||||||
|
pi = pigpio.pi()
|
||||||
|
|
||||||
|
if not pi.connected:
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
GPIO_PIN = 27 # GPIO pin 27
|
||||||
|
BAUD_RATE = 4800 # 4800 baud
|
||||||
|
|
||||||
|
# Start serial bit bang
|
||||||
|
pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8)
|
||||||
|
|
||||||
|
buffer = b'' # Buffer for incoming data
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
(count, data) = pi.bb_serial_read(GPIO_PIN)
|
||||||
|
if count > 0:
|
||||||
|
buffer += data
|
||||||
|
|
||||||
|
while b'\n' in buffer:
|
||||||
|
line, buffer = buffer.split(b'\n', 1) # Split at newline
|
||||||
|
# Print each byte in hex, separated by spaces
|
||||||
|
hex_line = ' '.join(f'{byte:02X}' for byte in line)
|
||||||
|
print("Received line (hex):", hex_line)
|
||||||
|
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
pi.bb_serial_read_close(GPIO_PIN)
|
||||||
|
pi.stop()
|
@ -1,4 +1,4 @@
|
|||||||
from pigpip import pi
|
from pigpio import pi
|
||||||
from asyncio import Queue, get_event_loop, sleep
|
from asyncio import Queue, get_event_loop, sleep
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
|
||||||
@ -49,7 +49,8 @@ class TunerListState:
|
|||||||
return {
|
return {
|
||||||
"text": self.text,
|
"text": self.text,
|
||||||
"preset": self.preset,
|
"preset": self.preset,
|
||||||
"should_show": self.should_show
|
"is_temp_view": self.is_temp_view,
|
||||||
|
"is_playing_ext": self.is_playing_ext
|
||||||
}
|
}
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
@ -67,6 +68,10 @@ class TunerList:
|
|||||||
self.text = ""
|
self.text = ""
|
||||||
self.preset = None
|
self.preset = None
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.pi.bb_serial_read_close(GPIO_PIN)
|
||||||
|
self.pi.stop()
|
||||||
|
|
||||||
async def run(self):
|
async def run(self):
|
||||||
while True:
|
while True:
|
||||||
line = await serial_wait_for_line(self.pi)
|
line = await serial_wait_for_line(self.pi)
|
||||||
@ -93,4 +98,4 @@ class TunerList:
|
|||||||
async def send_button(self, btn):
|
async def send_button(self, btn):
|
||||||
with ThreadPoolExecutor() as executor:
|
with ThreadPoolExecutor() as executor:
|
||||||
loop = get_event_loop()
|
loop = get_event_loop()
|
||||||
await loop.run_in_executor(executor, self.serial.write, bytearray(BUTTONS[btn]))
|
await loop.run_in_executor(executor, self.serial.write, bytearray(BUTTONS[btn]))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user