This commit is contained in:
marios8543 2025-04-27 16:50:16 +03:00
parent 8941c60ce5
commit 5306c671c6
3 changed files with 80 additions and 44 deletions

19
tunerlistd/.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,19 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Run",
"type": "debugpy",
"request": "launch",
"program": "main.py",
"console": "integratedTerminal",
"env": {
"TL_DEBUG": "true"
}
}
]
}

View File

@ -11,9 +11,14 @@ GPIO_PIN = 27 # GPIO pin 27
BAUD_RATE = 4800 # 4800 baud BAUD_RATE = 4800 # 4800 baud
# Start serial bit bang # Start serial bit bang
pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8) try:
pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8)
except:
pi.bb_serial_read_close(GPIO_PIN)
pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8)
buffer = b'' # Buffer for incoming data buffer = b'' # Buffer for incoming data
last_buffer = b''
try: try:
while True: while True:
@ -21,12 +26,14 @@ try:
if count > 0: if count > 0:
buffer += data buffer += data
while b'\n' in buffer: while b'\r\n' in buffer:
line, buffer = buffer.split(b'\n', 1) # Split at newline line, buffer = buffer.split(b'\r\n', 1) # Split at newline
# Print each byte in hex, separated by spaces # Print each byte in hex, separated by spaces
hex_line = ' '.join(f'{byte:02X}' for byte in line) if not line == last_buffer:
print("Received line (hex):", hex_line) last_buffer = line
for c in line:
print(hex(c), " ", end="")
print()
time.sleep(0.01) time.sleep(0.01)
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@ -1,11 +1,11 @@
from pigpio import pi from pigpio import pi, error
from asyncio import Queue, get_event_loop, sleep, CancelledError from asyncio import Queue, get_event_loop, sleep, CancelledError, create_task
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from os import getenv from os import getenv
GPIO_PIN = 27 RX_PIN = 27
BAUD_RATE = 4800 BAUD_RATE = 4800
DEBUG = getenv("TL_DEBUG", "false") == "true" DEBUG = False# getenv("TL_DEBUG", "false") == "true"
BUTTONS = { BUTTONS = {
"ok": [0x00, 0x00], "ok": [0x00, 0x00],
@ -23,17 +23,6 @@ BUTTONS = {
"wheel_down": [0x01, 0x41], "wheel_down": [0x01, 0x41],
} }
async def serial_wait_for_line(pi):
buffer = b''
while True:
(count, data) = pi.bb_serial_read(GPIO_PIN)
if count > 0:
buffer += data
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
return line
await sleep(0)
class TunerListState: class TunerListState:
def __init__(self, text, preset): def __init__(self, text, preset):
self.text = text self.text = text
@ -41,7 +30,7 @@ class TunerListState:
@property @property
def is_temp_view(self): def is_temp_view(self):
return "VOL" in self.text or "PAUSE" in self.text return "VOL" in self.text or "PAUSE" in self.text or "TREB" in self.text or "BASS" in self.text
@property @property
def is_playing_ext(self): def is_playing_ext(self):
@ -49,8 +38,8 @@ class TunerListState:
def to_dict(self): def to_dict(self):
return { return {
"screen": self.text, "screen": f"{self.text}".strip(),
"preset": self.preset, "preset": f"{self.preset}" if self.preset and not self.is_temp_view else None,
"second": "Tuner List", "second": "Tuner List",
"is_temp_view": self.is_temp_view, "is_temp_view": self.is_temp_view,
"is_playing_ext": self.is_playing_ext "is_playing_ext": self.is_playing_ext
@ -65,35 +54,56 @@ class TunerListState:
class TunerList: class TunerList:
def __init__(self): def __init__(self):
self.pi = pi() self.pi = pi()
self.pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8) while True:
self.queue = Queue() try:
self.pi.bb_serial_read_open(RX_PIN, BAUD_RATE, 8)
break
except error as e:
print(e)
self.pi.bb_serial_read_close(RX_PIN)
self.last_buffer = b''
self.queue = Queue()
self.text = "" self.text = ""
self.preset = None self.preset = None
async def run(self): async def run(self):
buffer = b''
try: try:
while True: while True:
line = await serial_wait_for_line(self.pi) (count, data) = self.pi.bb_serial_read(RX_PIN)
if line[0] == 0x0F or line[0] == 0x0C: if count > 0:
self.text = line[(8 if line[0] == 0x0F else 5):].decode("ascii") buffer += data
if (line[3] != 0x20): while b'\r\n' in buffer:
self.preset = line[3] & 0x0F line, buffer = buffer.split(b'\r\n', 1)
else: if not line == self.last_buffer:
self.preset = None self.last_buffer = line
elif line[0] == 0x39: print(line)
self.preset = None self.process_line(line)
self.text = "" await sleep(0)
else:
continue
self.queue.put_nowait(TunerListState(self.text, self.preset))
if DEBUG:
for c in line:
print(hex(c), " ", end="")
print(" [",self.text,"]", f" [P {self.preset}]")
except CancelledError: except CancelledError:
self.pi.bb_serial_read_close(GPIO_PIN) self.pi.bb_serial_read_close(RX_PIN)
self.pi.stop() self.pi.stop()
return
def process_line(self, line):
try:
if line[0] == 0x0F or line[0] == 0x0C:
self.text = line[(8 if line[0] == 0x0F else 5):].decode("ascii")
if (line[3] != 0x20):
self.preset = line[3] & 0x0F
else:
self.preset = None
elif line[0] == 0x39:
self.preset = None
self.text = ""
else:
pass
except Exception as e:
print(e)
finally:
print("[",self.text,"]", f" [P {self.preset}]")
self.queue.put_nowait(TunerListState(self.text, self.preset))
async def yield_new_state(self): async def yield_new_state(self):
while True: while True: