#!/usr/bin/env python3 # SPDX-License-Identifier: MIT import sys, pathlib sys.path.append(str(pathlib.Path(__file__).resolve().parents[1])) import struct import traceback from construct import * from m1n1.setup import * from m1n1.shell import run_shell from m1n1.hw.dart import DART, DARTRegs from m1n1.fw.asc import StandardASC, ASCDummyEndpoint from m1n1.fw.asc.base import * from m1n1.fw.aop import * from m1n1.fw.aop.ipc import * from m1n1.fw.afk.rbep import * from m1n1.fw.afk.epic import * # Set up a secondary proxy channel so that we can stream # the microphone samples p.usb_iodev_vuart_setup(p.iodev_whoami()) p.iodev_set_usage(IODEV.USB_VUART, USAGE.UARTPROXY) p.pmgr_adt_clocks_enable("/arm-io/dart-aop") adt_dc = u.adt["/arm-io/aop/iop-aop-nub/aop-audio/dc-2400000"] pdm_config = Container( unk1=2, clockSource=u'pll ', pdmFrequency=2400000, unk3_clk=24000000, unk4_clk=24000000, unk5_clk=24000000, channelPolaritySelect=256, unk7=99, unk8=1013248, unk9=0, ratios=Container( r1=15, r2=5, r3=2, ), filterLengths=0x542c47, coeff_bulk=120, coefficients=GreedyRange(Int32sl).parse(adt_dc.coefficients), unk10=1, micTurnOnTimeMs=20, unk11=1, micSettleTimeMs=50, ) decimator_config = Container( latency=15, ratios=Container( r1=15, r2=5, r3=2, ), filterLengths=0x542c47, coeff_bulk=120, coefficients=GreedyRange(Int32sl).parse(adt_dc.coefficients), ) class AFKEP_Hello(AFKEPMessage): TYPE = 63, 48, Constant(0x80) UNK = 7, 0 class AFKEP_Hello_Ack(AFKEPMessage): TYPE = 63, 48, Constant(0xa0) class EPICEndpoint(AFKRingBufEndpoint): BUFSIZE = 0x1000 def __init__(self, *args, **kwargs): self.seq = 0x0 self.wait_reply = False self.ready = False super().__init__(*args, **kwargs) @msg_handler(0x80, AFKEP_Hello) def Hello(self, msg): self.rxbuf, self.rxbuf_dva = self.asc.ioalloc(self.BUFSIZE) self.txbuf, self.txbuf_dva = self.asc.ioalloc(self.BUFSIZE) self.send(AFKEP_Hello_Ack()) def handle_hello(self, hdr, sub, fd): if sub.type != 0xc0: return False payload = fd.read() name = payload.split(b"\0")[0].decode("ascii") self.log(f"Hello! (endpoint {name})") self.ready = True return True def handle_reply(self, hdr, sub, fd): if self.wait_reply: self.pending_call.read_resp(fd) self.wait_reply = False return True return False def handle_ipc(self, data): fd = BytesIO(data) hdr = EPICHeader.parse_stream(fd) sub = EPICSubHeaderVer2.parse_stream(fd) handled = False if sub.category == EPICCategory.REPORT: handled = self.handle_hello(hdr, sub, fd) if sub.category == EPICCategory.REPLY: handled = self.handle_reply(hdr, sub, fd) if not handled and getattr(self, 'VERBOSE', False): self.log(f"< 0x{hdr.channel:x} Type {hdr.type} Ver {hdr.version} Tag {hdr.seq}") self.log(f" Len {sub.length} Ver {sub.version} Cat {sub.category} Type {sub.type:#x} Ts {sub.timestamp:#x}") self.log(f" Unk1 {sub.unk1:#x} Unk2 {sub.unk2:#x}") chexdump(fd.read()) def indirect(self, call, chan=0x1000000d, timeout=0.1): tx = call.ARGS.build(call.args) self.asc.iface.writemem(self.txbuf, tx[4:]) cmd = self.roundtrip(IndirectCall( txbuf=self.txbuf_dva, txlen=len(tx) - 4, rxbuf=self.rxbuf_dva, rxlen=self.BUFSIZE, retcode=0, ), category=EPICCategory.COMMAND, typ=call.TYPE) fd = BytesIO() fd.write(struct.pack("