diff --git a/tunerlistd/tunerlist.py b/tunerlistd/tunerlist.py index e12db5c..f0aedb6 100644 --- a/tunerlistd/tunerlist.py +++ b/tunerlistd/tunerlist.py @@ -1,9 +1,10 @@ -from pigpio import pi, error +from pigpio import pi, error, OUTPUT from asyncio import Queue, get_event_loop, sleep, CancelledError, create_task from concurrent.futures import ThreadPoolExecutor from os import getenv RX_PIN = 27 +TX_PIN = 17 BAUD_RATE = 4800 DEBUG = False# getenv("TL_DEBUG", "false") == "true" @@ -54,6 +55,7 @@ class TunerListState: class TunerList: def __init__(self): self.pi = pi() + self.pi.set_mode(TX_PIN, OUTPUT) while True: try: self.pi.bb_serial_read_open(RX_PIN, BAUD_RATE, 8) @@ -67,29 +69,18 @@ class TunerList: self.text = "" self.preset = None - async def run(self): - buffer = b'' + def _process_line(self, line): try: - while True: - (count, data) = self.pi.bb_serial_read(RX_PIN) - if count > 0: - buffer += data - while b'\r\n' in buffer: - line, buffer = buffer.split(b'\r\n', 1) - if not line == self.last_buffer: - self.last_buffer = line - print(line) - self.process_line(line) - await sleep(0) - except CancelledError: - self.pi.bb_serial_read_close(RX_PIN) - self.pi.stop() - return - - def process_line(self, line): - try: - if line[0] == 0x0F or line[0] == 0x0C: - self.text = line[(8 if line[0] == 0x0F else 5):].decode("ascii") + if line[0] == 0x0F and line[1] == 0xFF: + pass + elif line[0] == 0x0F: + self.text = line[8:int(line[0]) + 1].decode("ascii").replace("\n", "") + if (line[6] & 0xF0) == 0x70: + self.preset = line[6] & 0x0F + else: + self.preset = None + elif line[0] == 0x0C: + self.text = line[5:int(line[0]) + 1].decode("ascii").replace("\n", "") if (line[3] != 0x20): self.preset = line[3] & 0x0F else: @@ -105,11 +96,36 @@ class TunerList: print("[",self.text,"]", f" [P {self.preset}]") self.queue.put_nowait(TunerListState(self.text, self.preset)) + async def send_button(self, btn): + self.pi.wave_clear() + self.pi.wave_add_serial(TX_PIN, BAUD_RATE, bytearray(BUTTONS[btn])) + wave = self.pi.wave_create() + self.pi.wave_send_once(wave) + while self.pi.wave_tx_busy(): + await sleep(0) + self.pi.wave_delete(wave) + + async def run(self): + buffer = b'' + try: + while True: + (count, data) = self.pi.bb_serial_read(RX_PIN) + if count > 0: + buffer += data + while b'\r\n' in buffer: + line, buffer = buffer.split(b'\r\n', 1) + if not line == self.last_buffer: + self.last_buffer = line + for c in line: + print(hex(c), " ", end="") + print() + self._process_line(line) + await sleep(0) + except CancelledError: + self.pi.bb_serial_read_close(RX_PIN) + self.pi.stop() + return + async def yield_new_state(self): while True: yield await self.queue.get() - - async def send_button(self, btn): - with ThreadPoolExecutor() as executor: - loop = get_event_loop() - await loop.run_in_executor(executor, self.serial.write, bytearray(BUTTONS[btn]))