106 lines
3.1 KiB
Python
106 lines
3.1 KiB
Python
from pigpio import pi
|
|
from asyncio import Queue, get_event_loop, sleep, CancelledError
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from os import getenv
|
|
|
|
GPIO_PIN = 27
|
|
BAUD_RATE = 4800
|
|
DEBUG = getenv("TL_DEBUG", "false") == "true"
|
|
|
|
BUTTONS = {
|
|
"ok": [0x00, 0x00],
|
|
"ok_hold": [0x00, 0x40],
|
|
"source_r": [0x00, 0x01],
|
|
"source_r_hold": [0x00, 0x81],
|
|
"source_l": [0x00, 0x02],
|
|
"source_l_hold": [0x00, 0x82],
|
|
"volume_up": [0x00, 0x03],
|
|
"volume_up_hold": [0x00, 0x43],
|
|
"volume_down": [0x00, 0x04],
|
|
"volume_down_hold": [0x00, 0x44],
|
|
"pause": [0x00, 0x05],
|
|
"wheel_up": [0x01, 0x01],
|
|
"wheel_down": [0x01, 0x41],
|
|
}
|
|
|
|
async def serial_wait_for_line(pi):
|
|
buffer = b''
|
|
while True:
|
|
(count, data) = pi.bb_serial_read(GPIO_PIN)
|
|
if count > 0:
|
|
buffer += data
|
|
while b'\n' in buffer:
|
|
line, buffer = buffer.split(b'\n', 1)
|
|
return line
|
|
await sleep(0)
|
|
|
|
class TunerListState:
|
|
def __init__(self, text, preset):
|
|
self.text = text
|
|
self.preset = preset
|
|
|
|
@property
|
|
def is_temp_view(self):
|
|
return "VOL" in self.text or "PAUSE" in self.text
|
|
|
|
@property
|
|
def is_playing_ext(self):
|
|
return self.text == "TR 1 CD " and self.preset is not None
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"screen": self.text,
|
|
"preset": self.preset,
|
|
"second": "Tuner List",
|
|
"is_temp_view": self.is_temp_view,
|
|
"is_playing_ext": self.is_playing_ext
|
|
}
|
|
|
|
def __str__(self):
|
|
return f"[{self.text}] [P {self.preset}]"
|
|
|
|
def __repr__(self):
|
|
return self.__str__()
|
|
|
|
class TunerList:
|
|
def __init__(self):
|
|
self.pi = pi()
|
|
self.pi.bb_serial_read_open(GPIO_PIN, BAUD_RATE, 8)
|
|
self.queue = Queue()
|
|
|
|
self.text = ""
|
|
self.preset = None
|
|
|
|
async def run(self):
|
|
try:
|
|
while True:
|
|
line = await serial_wait_for_line(self.pi)
|
|
if line[0] == 0x0F or line[0] == 0x0C:
|
|
self.text = line[(8 if line[0] == 0x0F else 5):].decode("ascii")
|
|
if (line[3] != 0x20):
|
|
self.preset = line[3] & 0x0F
|
|
else:
|
|
self.preset = None
|
|
elif line[0] == 0x39:
|
|
self.preset = None
|
|
self.text = ""
|
|
else:
|
|
continue
|
|
self.queue.put_nowait(TunerListState(self.text, self.preset))
|
|
if DEBUG:
|
|
for c in line:
|
|
print(hex(c), " ", end="")
|
|
print(" [",self.text,"]", f" [P {self.preset}]")
|
|
except CancelledError:
|
|
self.pi.bb_serial_read_close(GPIO_PIN)
|
|
self.pi.stop()
|
|
|
|
async def yield_new_state(self):
|
|
while True:
|
|
yield await self.queue.get()
|
|
|
|
async def send_button(self, btn):
|
|
with ThreadPoolExecutor() as executor:
|
|
loop = get_event_loop()
|
|
await loop.run_in_executor(executor, self.serial.write, bytearray(BUTTONS[btn]))
|