# https://tlcdcemu.sourceforge.net/python.html # https://sourceforge.net/projects/tlcdcemu/files/tlcdcpy/ from serial import Serial, EIGHTBITS, PARITY_EVEN, STOPBITS_ONE import requests import time def handle_button_press(btn): requests.get("http://127.0.0.1:5959/btn", params={ "btn": btn }) NOISE = 30 ERROR = 20 def bcd(number): dec = number // 10 unit = number % 10 return bytes([dec * 16 + unit]) class cdc(Serial): _commands = { b'\x17\x01': ('NEXT', 0, 1), b'\x22\x01\x02': ('PREV', 0, -1), b'\x20\x0a': ('FAST_FORW', 0, 0), b'\x13': ('PLAY', 0, 0), b'\x86': ('RESUME', 0, 0), b'\x19': ('STOP', 0, 0), b'\x1c': ('PAUSE', 0, 0), b'\x93': ('HU_ON', 0, 0), b'\x21\x0a': ('FAST_BACK', 0, 0), b'\x26\x01': ('CD_1', 1, 0), b'\x26\x02': ('CD_2', 2, 0), b'\x26\x03': ('CD_3', 3, 0), b'\x26\x04': ('CD_4', 4, 0), b'\x26\x05': ('CD_5', 5, 0), b'\x26\x06': ('CD_6', 6, 0), b'\x24': ('STALK', 0, 0), b'\x27\x07\x0a': ('RAND_ON', 0, 0), b'\x27\x03\x0a': ('RAND_OFF', 0, 0), } def __init__(self, port, debuglevel=0): self._sendsequence = 0 self._recsequence = 0 self._debuglevel = debuglevel self._cd = 1 self._try = 0 self._track = 1 self.errors = 0 super().__init__(port, baudrate=9600, bytesize=EIGHTBITS, parity=PARITY_EVEN, stopbits=STOPBITS_ONE, timeout=0.2) self.setRTS(True) self.setDTR(False) time.sleep(0.5) self.reset_input_buffer() def checksum(self, packet): result = 0 for c in packet: result ^= c return bytes([result]) def _debug(self, level, message, packet=None): if self._debuglevel >= level: s = '' if packet: s = ', '.join([hex(c) for c in packet]) print(f"{message} {s}") def sendPacket(self, packet, maxtries=5): retries = 0 while retries < maxtries: pkt = b'\x3d' + bytes([self._sendsequence, len(packet)]) + packet pkt += self.checksum(pkt) self._debug(NOISE, "Sending", pkt) self.write(pkt) reply = self.read(1) if reply: self._debug(NOISE, "Reply", reply) else: self._debug(ERROR, "No reply") while reply == b'\x3d': self._debug(ERROR, "New packet instead of ACK") self.receivePacket(start_received=True) reply = self.read(1) if reply == b'\xc5': self._sendsequence = (self._sendsequence + 1) % 256 self.errors = 0 if retries > 0: self._debug(ERROR, f"Needed {retries} retries to send") return True retries += 1 self.errors += 1 self._debug(ERROR, "Not acknowledged") return False def receivePacket(self, start_received=False): if not start_received: startchar = self.read(1) if startchar != b'\x3d': if startchar: self._debug(ERROR, "Bad startchar", startchar) return else: startchar = b'\x3d' seqno = self.read(1) if not seqno: self._debug(ERROR, "No sequence") return packetlen = self.read(1) if not packetlen: self._debug(ERROR, "No packetlen") return plen = packetlen[0] packet = self.read(plen) if len(packet) != plen: self._debug(ERROR, f"Expected len {plen}, got {len(packet)}", packet) return chk = self.read(1) if not chk: self._debug(ERROR, "No checksum") return full_packet = startchar + seqno + packetlen + packet if self.checksum(full_packet) != chk: self._debug(ERROR, "Bad checksum") return self.write(b'\xc5') self._debug(NOISE, "Received", packet) if seqno != bytes([self._recsequence]): self._recsequence = seqno[0] decoded = self.decodePacket(packet) handle_button_press(decoded) print(decoded) if decoded == 'Stalk pull': print(f"Sending test command {self._try}") if self.sendPacket(bytes([self._try])): print("Acknowledged") self._try += 1 else: self._debug(ERROR, "Same sequence as previous packet") def decodePacket(self, packet): if packet in self._commands: text, newcd, newtrack = self._commands[packet] if newcd: self._cd = newcd self._track = 1 self.playingCD(self._cd) else: self._track += newtrack if self._track < 1: self._track = 99 elif self._track > 99: self._track = 1 return text return 'Unknown %s' % ', '.join([hex(c) for c in packet]) def playingCD(self, number): return self.sendPacket(b'\x20\x01\x03\x09\x05' + bytes([number])) def startPacket(self): time.sleep(0.5) self.reset_input_buffer() self._sendsequence = 0 self._recsequence = 0 if self.sendPacket(b'\x11\x60\x06', 1): self.sendPacket(b'\x25\x03') # random off if self.playingCD(self._cd): return True return False def fakePlaying(self): return self.sendPacket(b'\x47' + bcd(self._track) + b'\x01\x00\x00\x01\x00\x00\x00\x00\x00') def loop(self): while True: print("Trying to send start sequence") if self.startPacket(): print("Start sequence sent, entering fake playing loop") delay = time.time() while self.errors < 3: self.receivePacket() now = time.time() if now - delay >= 1: self.fakePlaying() delay = now print("Too many errors, probably HU off") else: time.sleep(0.5) if __name__ == '__main__': # port = "/dev/ttyS0" port = "/dev/serial0" debuglevel = 99 p = cdc(port, debuglevel) p.loop()