diff --git a/proxyclient/hv/trace_mtp.py b/proxyclient/hv/trace_mtp.py new file mode 100644 index 00000000..6589b55a --- /dev/null +++ b/proxyclient/hv/trace_mtp.py @@ -0,0 +1,152 @@ +# SPDX-License-Identifier: MIT +import struct + +from construct import * +from m1n1.utils import * +from m1n1.proxyutils import * +from m1n1.constructutils import * +from m1n1.trace.asc import ASCTracer, EP, EPState, msg, msg_log, DIR +from m1n1.trace.dockchannel import DockChannelTracer +from m1n1.trace.dart import DARTTracer +from m1n1.fw.mtp import * + +class MTPTracer(ASCTracer): + def handle_msg(self, direction, r0, r1): + super().handle_msg(direction, r0, r1) + +mtp_tracer = MTPTracer(hv, "/arm-io/mtp", verbose=1) +mtp_tracer.start() + +dart_tracer = DARTTracer(hv, "/arm-io/dart-mtp", verbose=1) +dart_tracer.start() + +trace_device("/arm-io/dart-mtp", True) + +DockChannelTracer = DockChannelTracer._reloadcls() + +mon = RegMonitor(hv.u, ascii=True, bufsize=0x400000) + +class StreamState: + def __init__(self): + self.buf = bytes() + +class MTPStream: + def __init__(self, tracer, name, state): + self.tracer = tracer + self.name = name + self.state = state + + def put(self, d): + buf = self.state.buf + d + + while buf: + if len(buf) < 8: + self.state.buf = buf + return + + hlen, mtype, size, ctr, devid, pad = struct.unpack("", self.state.tx) + self.rx_mem_stream = MTPStream(self, "<", self.state.rx_mem) + self.init_mon() + + def tx(self, d): + self.tx_stream.put(d) + + def rx(self, d): + self.rx_stream.put(d) + + def init_mon(self): + pass + #if self.state.buf is not None: + #addr, size = self.dart.iotranslate(1, self.state.buf, self.state.buf_size)[0] + #size = align_up(size, 4) + #mon.add(addr, size) + + def poll_ring(self): + wptr = struct.unpack("": + msg = TXMessage.parse(data) + elif dir == "<": + msg = RXMessage.parse(data) + if devid == 0: + if (msg.hdr.flags & 0xc0) == 0x00: + msg.msg = NotificationMsg.parse(msg.msg) + elif (msg.hdr.flags & 0xc0) == 0x80: + msg.msg = DeviceControlAck.parse(msg.msg) + else: + assert False + + try: + mtype = msg.msg.name + except: + mtype = None + + if mtype == "InitAFEMsg": + afe = self.dart.ioread(1, msg.msg.buf_addr, msg.msg.buf_size) + chexdump(afe, print_fn=self.log) + open("afe.bin", "wb").write(afe) + elif mtype == "InitBufMsg": + self.state.buf = msg.msg.buf_addr + self.state.buf_size = msg.msg.buf_size + self.init_mon() + + self.log(f"{dir} Type {mcode:02x} Dev {devid} #{ctr} {msg!s}") + +hid_tracer = MTPChannelTracer(hv, "/arm-io/dockchannel-mtp", verbose=3) +hid_tracer.start(dart_tracer.dart)