tunerlistd/tunerlistd/tunerlist.py
2025-04-27 20:02:00 +03:00

132 lines
4.1 KiB
Python

from pigpio import pi, error, OUTPUT
from asyncio import Queue, get_event_loop, sleep, CancelledError, create_task
from concurrent.futures import ThreadPoolExecutor
from os import getenv
RX_PIN = 27
TX_PIN = 17
BAUD_RATE = 4800
DEBUG = False# getenv("TL_DEBUG", "false") == "true"
BUTTONS = {
"ok": [0x00, 0x00],
"ok_hold": [0x00, 0x40],
"source_r": [0x00, 0x01],
"source_r_hold": [0x00, 0x81],
"source_l": [0x00, 0x02],
"source_l_hold": [0x00, 0x82],
"volume_up": [0x00, 0x03],
"volume_up_hold": [0x00, 0x43],
"volume_down": [0x00, 0x04],
"volume_down_hold": [0x00, 0x44],
"pause": [0x00, 0x05],
"wheel_up": [0x01, 0x01],
"wheel_down": [0x01, 0x41],
}
class TunerListState:
def __init__(self, text, preset):
self.text = text
self.preset = preset
@property
def is_temp_view(self):
return "VOL" in self.text or "PAUSE" in self.text or "TREB" in self.text or "BASS" in self.text
@property
def is_playing_ext(self):
return self.text == "TR 1 CD " and self.preset is not None
def to_dict(self):
return {
"screen": f"{self.text}".strip(),
"preset": f"{self.preset}" if self.preset and not self.is_temp_view else None,
"second": "Tuner List",
"is_temp_view": self.is_temp_view,
"is_playing_ext": self.is_playing_ext
}
def __str__(self):
return f"[{self.text}] [P {self.preset}]"
def __repr__(self):
return self.__str__()
class TunerList:
def __init__(self):
self.pi = pi()
self.pi.set_mode(TX_PIN, OUTPUT)
while True:
try:
self.pi.bb_serial_read_open(RX_PIN, BAUD_RATE, 8)
break
except error as e:
print(e)
self.pi.bb_serial_read_close(RX_PIN)
self.last_buffer = b''
self.queue = Queue()
self.text = ""
self.preset = None
def _process_line(self, line):
try:
if line[0] == 0x0F and line[1] == 0xFF:
pass
elif line[0] == 0x0F:
self.text = line[8:int(line[0]) + 1].decode("ascii").replace("\n", "")
if (line[6] & 0xF0) == 0x70:
self.preset = line[6] & 0x0F
else:
self.preset = None
elif line[0] == 0x0C:
self.text = line[5:int(line[0]) + 1].decode("ascii").replace("\n", "")
if (line[3] != 0x20):
self.preset = line[3] & 0x0F
else:
self.preset = None
elif line[0] == 0x39:
self.preset = None
self.text = ""
else:
pass
except Exception as e:
print(e)
finally:
print("[",self.text,"]", f" [P {self.preset}]")
self.queue.put_nowait(TunerListState(self.text, self.preset))
async def send_button(self, btn):
self.pi.wave_clear()
self.pi.wave_add_serial(TX_PIN, BAUD_RATE, bytearray(BUTTONS[btn]))
wave = self.pi.wave_create()
self.pi.wave_send_once(wave)
while self.pi.wave_tx_busy():
await sleep(0)
self.pi.wave_delete(wave)
async def run(self):
buffer = b''
try:
while True:
(count, data) = self.pi.bb_serial_read(RX_PIN)
if count > 0:
buffer += data
while b'\r\n' in buffer:
line, buffer = buffer.split(b'\r\n', 1)
if not line == self.last_buffer:
self.last_buffer = line
for c in line:
print(hex(c), " ", end="")
print()
self._process_line(line)
await sleep(0)
except CancelledError:
self.pi.bb_serial_read_close(RX_PIN)
self.pi.stop()
return
async def yield_new_state(self):
while True:
yield await self.queue.get()