# SPDX-License-Identifier: MIT import struct from enum import IntEnum from m1n1.proxyutils import RegMonitor from m1n1.utils import * from m1n1.trace.dart import DARTTracer from m1n1.trace.asc import ASCTracer, EP, EPState, msg, msg_log, DIR trace_device("/arm-io/dcp", True, ranges=[1]) DARTTracer = DARTTracer._reloadcls() ASCTracer = ASCTracer._reloadcls() iomon = RegMonitor(hv.u, ascii=True) class IOEpMessage(Register64): TYPE = 63, 48 class IOEp_Generic(IOEpMessage): ARG3 = 47, 32 ARG2 = 31, 16 ARG1 = 15, 0 class IOEp_SetBuf_Ack(IOEpMessage): UNK1 = 47, 32 IOVA = 31, 0 class IOEp_Send(IOEpMessage): WPTR = 31, 0 class IORingBuf(Reloadable): def __init__(self, ep, state, base): self.ep = ep self.dart = ep.dart self.state = state self.base = base self.align = 0x40 def init(self): self.state.bufsize, unk = struct.unpack(" (self.state.bufsize - rptr - 16): hdr = self.dart.ioread(0, self.base + 0xc0, 16) rptr = 16 magic, size = struct.unpack("<4sI", hdr[:8]) assert magic == b"IOP " payload = self.dart.ioread(0, self.base + 0xc0 + rptr, size) rptr = (align_up(rptr + size, self.align)) % self.state.bufsize self.state.rptr = rptr yield hdr[8:] + payload if max is not None: max -= 1 if max <= 0: break class IOEp(EP): BASE_MESSAGE = IOEp_Generic def __init__(self, tracer, epid): super().__init__(tracer, epid) self.state.txbuf = EPState() self.state.rxbuf = EPState() self.state.shmem_iova = None self.state.verbose = 1 def start(self): #self.add_mon() self.create_bufs() def create_bufs(self): if not self.state.shmem_iova: return self.txbuf = IORingBuf(self, self.state.txbuf, self.state.shmem_iova) self.rxbuf = IORingBuf(self, self.state.rxbuf, self.state.shmem_iova + 0x4000) def add_mon(self): if self.state.shmem_iova: iomon.add(self.state.shmem_iova, 32768, name=f"{self.name}.shmem@{self.state.shmem_iova:08x}", offset=0) Init = msg_log(0x80, DIR.TX) Init_Ack = msg_log(0xa0, DIR.RX) GetBuf = msg_log(0x89, DIR.RX) @msg(0xa1, DIR.TX, IOEp_SetBuf_Ack) def GetBuf_Ack(self, msg): self.state.shmem_iova = msg.IOVA #self.add_mon() @msg(0xa2, DIR.TX, IOEp_Send) def Send(self, msg): for data in self.txbuf.read(wptr=msg.WPTR): if self.state.verbose >= 1: self.log(f">TX rptr={self.txbuf.state.rptr:#x}") chexdump(data) return True Hello = msg_log(0xa3, DIR.TX) @msg(0x85, DIR.RX, IOEpMessage) def Recv(self, msg): for data in self.rxbuf.read(): if self.state.verbose >= 1: self.log(f" const*, OSArray const*)", "D413": "bool IOMFB::ServiceRelay::setProperty(unsigned int, OSString<0x40> const*, OSDictionary const*)", "D414": "bool IOMFB::ServiceRelay::setProperty(unsigned int, OSString<0x40> const*, OSNumber const*)", "D415": "bool IOMFB::ServiceRelay::setProperty(unsigned int, OSString<0x40> const*, OSBoolean const*)", "D416": "bool IOMFB::ServiceRelay::setProperty(unsigned int, OSString<0x40> const*, OSString const*)", "D417": "bool IOMFB::ServiceRelay::setProperty(unsigned int, char const[0x40], OSArray const*)", "D418": "bool IOMFB::ServiceRelay::setProperty(unsigned int, char const[0x40], OSDictionary const*)", "D419": "bool IOMFB::ServiceRelay::setProperty(unsigned int, char const[0x40], OSNumber const*)", "D420": "bool IOMFB::ServiceRelay::setProperty(unsigned int, char const[0x40], OSBoolean const*)", "D421": "bool IOMFB::ServiceRelay::setProperty(unsigned int, char const[0x40], OSString const*)", "D422": "bool IOMFB::ServiceRelay::setProperty(unsigned int, char const[0x40], bool)", "D423": "void IOMFB::ServiceRelay::removeProperty(unsigned int, char const[0x40])", "D424": "void IOMFB::ServiceRelay::removeProperty(unsigned int, OSString<0x40> const*)", "D450": "bool IOMFB::MemDescRelay::from_id(unsigned int, unsigned long*, unsigned long*, unsigned long long*)", "D451": "unsigned int IOMFB::MemDescRelay::allocate_buffer(unsigned int, unsigned long long, unsigned int, unsigned long*, unsigned long*, unsigned long long*)", "D452": "unsigned int IOMFB::MemDescRelay::map_physical(unsigned long long, unsigned long long, unsigned int, unsigned long*, unsigned long long*)", "D453": "unsigned int IOMFB::MemDescRelay::withAddressRange(unsigned long long, unsigned long long, unsigned int, task*, unsigned long*, unsigned long long*)", "D454": "IOMFBStatus IOMFB::MemDescRelay::prepare(unsigned int, unsigned int)", "D455": "IOMFBStatus IOMFB::MemDescRelay::complete(unsigned int, unsigned int)", "D456": "bool IOMFB::MemDescRelay::release_descriptor(unsigned int)", "D500": "IOMFBStatus IOMFB::PlatformFunctionRelay::allocate_record(unsigned int, char const*, unsigned int, bool)", "D501": "IOMFBStatus IOMFB::PlatformFunctionRelay::release_record(unsigned int)", "D502": "IOMFBStatus IOMFB::PlatformFunctionRelay::callFunctionLink(unsigned int, unsigned long, unsigned long, unsigned long)", "D550": "bool IORegistryEntry::setProperty(OSString *, OSArray *)", "D551": "bool IORegistryEntry::setProperty(OSString *, IOMFB::AFKArray *)", "D552": "bool IORegistryEntry::setProperty(OSString *, OSDictionary *)", "D553": "bool IORegistryEntry::setProperty(OSString *, IOMFB::AFKDictionary *)", "D554": "bool IORegistryEntry::setProperty(OSString *, OSNumber *)", "D555": "bool IORegistryEntry::setProperty(OSString *, IOMFB::AFKNumber *)", "D556": "bool IORegistryEntry::setProperty(OSString *, OSBoolean *)", "D557": "bool IORegistryEntry::setProperty(OSString *, OSString *)", "D558": "bool IORegistryEntry::setProperty(OSString *, IOMFB::AFKString *)", "D559": "bool IORegistryEntry::setProperty(char const*, OSArray *)", "D560": "bool IORegistryEntry::setProperty(char const*, IOMFB::AFKArray *)", "D561": "bool IORegistryEntry::setProperty(char const*, OSDictionary *)", "D562": "bool IORegistryEntry::setProperty(char const*, IOMFB::AFKDictionary *)", "D563": "bool IORegistryEntry::setProperty(char const*, OSNumber *)", "D564": "bool IORegistryEntry::setProperty(char const*, IOMFB::AFKNumber *)", "D565": "bool IORegistryEntry::setProperty(char const*, OSBoolean *)", "D566": "bool IORegistryEntry::setProperty(char const*, OSString *)", "D567": "bool IORegistryEntry::setProperty(char const*, IOMFB::AFKString *)", "D568": "bool IORegistryEntry::setProperty(char const*, char const*)", "D569": "bool IORegistryEntry::setProperty(char const*, bool)", "D570": "IOMFBStatus IOMobileFramebufferAP::setProperties(OSDictionary*)", "D571": "void IOMobileFramebufferAP::swapping_client_did_start(IOMobileFramebufferUserClient*)", "D572": "void IOMobileFramebufferAP::swapping_client_will_stop(IOMobileFramebufferUserClient*)", "D573": "IOMFBStatus IOMobileFramebufferAP::set_canvas_size(unsigned int, unsigned int)", "D574": "IOMFBStatus IOMobileFramebufferAP::powerUpDART(bool)", "D575": "IOMFBStatus IOMobileFramebufferAP::get_dot_pitch(unsigned int*)", "D576": "void IOMobileFramebufferAP::hotPlug_notify_gated(unsigned long long)", "D577": "void IOMobileFramebufferAP::powerstate_notify(bool, bool)", "D578": "bool IOMobileFramebufferAP::idle_fence_create(IdleCachingState)", "D579": "void IOMobileFramebufferAP::idle_fence_complete()", "D580": "void IOMobileFramebufferAP::idle_surface_release_ap()", "D581": "void IOMobileFramebufferAP::swap_complete_head_of_line(unsigned int, bool, unsigned int, bool)", "D582": "bool IOMobileFramebufferAP::create_default_fb_surface(unsigned int, unsigned int)", "D583": "bool IOMobileFramebufferAP::serializeDebugInfoCb(unsigned long, IOMFB::BufferDescriptor const*, unsigned int)", "D584": "void IOMobileFramebufferAP::clear_default_surface()", "D585": "void IOMobileFramebufferAP::swap_notify_gated(unsigned long long, unsigned long long, unsigned long long)", "D586": "void IOMobileFramebufferAP::swap_info_notify_dispatch(SwapInfoBlob const*)", "D587": "void IOMFBStatus IOMobileFramebufferAP::updateBufferMappingCount_gated(bool)", "D588": "void IOMobileFramebufferAP::resize_default_fb_surface_gated()", "D589": "void IOMobileFramebufferAP::swap_complete_ap_gated(unsigned int, bool, SwapCompleteData const*, SwapInfoBlob const*, unsigned int)", "D590": "void IOMobileFramebufferAP::batched_swap_complete_ap_gated(unsigned int*, unsigned int, bool, bool const*, SwapCompleteData const*)", "D591": "void IOMobileFramebufferAP::swap_complete_intent_gated(unsigned int, bool, IOMFBSwapRequestIntent, unsigned int, unsigned int)", "D592": "void IOMobileFramebufferAP::abort_swap_ap_gated(unsigned int)", "D593": "void IOMobileFramebufferAP::enable_backlight_message_ap_gated(bool)", "D594": "void IOMobileFramebufferAP::setSystemConsoleMode(bool)", "D595": "void IOMobileFramebufferAP::setSystemConsoleMode_gated(bool)", "D596": "bool IOMobileFramebufferAP::isDFBAllocated()", "D597": "bool IOMobileFramebufferAP::preserveContents()", "D598": "void IOMobileFramebufferAP::find_swap_function_gated()", "D700": "int IOMFB::DCPPowerManager::set_kernel_power_assert(bool, bool)", } # iboot interface """ 0: setResource 1: setSurface 2: setPower 3: getHpdStatus 4: getTimingModes 5: getColorModes 6: setMode 7: setBrightness 8: rwBCONRegsRequest 9: setParameter 10: setMatrix 11: setProperty 12: getProperty 13: setBlock 14: getBlock 15: swapBegin 16: setSwapLayer 17: setSwapTimestamp 18: setSwapEnd 19: setSwapWait 20: setBrightnessCfg 21: setNamedProperty 22: getNamedProperty """ class DCPMessage(Register64): TYPE = 3, 0 class DCPEp_SetShmem(DCPMessage): IOVA = 47, 16 class CallContext(IntEnum): CALLBACK = 0 CMD = 2 ASYNC = 3 OOB = 6 class DCPEp_Msg(DCPMessage): LEN = 63, 32 OFF = 31, 16 CTX = 11, 8, CallContext ACK = 6 class DCPCallState: pass class DCPCallChannel(Reloadable): def __init__(self, dcpep, name, buf, bufsize): self.dcpep = dcpep self.name = name self.buf = buf self.bufsize = bufsize self.log = self.dcpep.log self.state = self.dcpep.state def call(self, msg, dir): ident = f"{self.name}.{msg.OFF:x}" if any(msg.OFF == s.off for s in self.state.ch.get(self.name, [])): self.log(f"{dir}{self.name}.{msg.OFF:x} !!! Overlapping call ({msg})") assert False state = DCPCallState() data = self.dcpep.dart.ioread(0, self.state.shmem_iova + self.buf + msg.OFF, msg.LEN) tag = data[:4][::-1].decode("ascii") in_len, out_len = struct.unpack("= 1: self.log(f"{dir}{self.name}.{msg.OFF:x} {tag}:{KNOWN_MSGS.get(tag, 'unk')} ({msg})") if verb >= 2: print(f"Message: {tag} ({KNOWN_MSGS.get(tag, 'unk')}): (in {in_len:#x}, out {out_len:#x})") if data_in: print(f"{dir} Input ({len(data_in):#x} bytes):") chexdump(data_in[:self.state.max_len]) #if tag not in KNOWN_MSGS: #hv.run_shell() if self.state.dumpfile: dump = f"CALL {dir} {msg.value:#018x} {self.name} {state.off:#x} {state.tag} {in_len:#x} {out_len:#x} {data_in.hex()}\n" self.state.dumpfile.write(dump) self.state.dumpfile.flush() self.state.ch.setdefault(self.name, []).append(state) def ack(self, msg, dir): assert msg.LEN == 0 states = self.state.ch.get(self.name, None) if not states: self.log(f"{dir}ACK {self.name}.{msg.OFF:x} !!! ACK without call ({msg})") return state = states[-1] if self.state.show_acks: self.log(f"{dir}ACK {self.name}.{msg.OFF:x} ({msg})") data_out = self.dcpep.dart.ioread(0, self.state.shmem_iova + state.out_addr, state.out_len) verb = self.dcpep.get_verbosity(state.tag) if verb >= 3 and state.out_len > 0: print(f"{dir}{self.name}.{msg.OFF:x} Output buffer ({len(data_out):#x} bytes):") chexdump(data_out[:self.state.max_len]) if self.state.dumpfile: dump = f"ACK {dir} {msg.value:#018x} {self.name} {state.off:#x} {data_out.hex()}\n" self.state.dumpfile.write(dump) self.state.dumpfile.flush() states.pop() class DCPEp(EP): BASE_MESSAGE = DCPMessage def __init__(self, tracer, epid): super().__init__(tracer, epid) self.state.shmem_iova = None self.state.show_globals = True self.state.show_acks = True self.state.max_len = 1024 * 1024 self.state.verbosity = 3 self.state.op_verb = {} self.state.ch = {} self.state.dumpfile = None self.ch_cb = DCPCallChannel(self, "CB", 0x60000, 0x20000) self.ch_cmd = DCPCallChannel(self, "CMD", 0, 0x8000) self.ch_async = DCPCallChannel(self, "ASYNC", 0x40000, 0x20000) self.ch_oob = DCPCallChannel(self, "OOB", 0x8000, 0x8000) self.cmd_ch = { CallContext.CALLBACK: self.ch_cmd, CallContext.CMD: self.ch_cmd, CallContext.ASYNC: self.ch_oob, # guess? CallContext.OOB: self.ch_oob, } self.cb_ch = { CallContext.CALLBACK: self.ch_cb, CallContext.CMD: None, CallContext.ASYNC: self.ch_async, CallContext.OOB: None, } def start(self): self.add_mon() def add_mon(self): if self.state.shmem_iova and self.state.show_globals: addr = self.state.shmem_iova + 0x80000 iomon.add(addr, 128, name=f"{self.name}.shmem@{addr:08x}", offset=addr) #addr = self.state.shmem_iova #iomon.add(addr, 0x80080, #name=f"{self.name}.shmem@{addr:08x}", offset=addr) InitComplete = msg_log(1, DIR.RX) @msg(0, DIR.TX, DCPEp_SetShmem) def SetShmem(self, msg): self.log(f"Shared memory IOVA: {msg.IOVA:#x}") self.state.shmem_iova = msg.IOVA self.add_mon() @msg(2, DIR.TX, DCPEp_Msg) def Tx(self, msg): if msg.ACK: self.cb_ch[msg.CTX].ack(msg, ">") else: self.cmd_ch[msg.CTX].call(msg, ">") if self.state.show_globals: iomon.poll() return True @msg(2, DIR.RX, DCPEp_Msg) def Rx(self, msg): if msg.ACK: self.cmd_ch[msg.CTX].ack(msg, "<") else: self.cb_ch[msg.CTX].call(msg, "<") if self.state.show_globals: iomon.poll() return True def get_verbosity(self, tag): return self.state.op_verb.get(tag, self.state.verbosity) def set_verb_known(self, verb): for i in KNOWN_MSGS: if verb is None: self.state.op_verb.pop(i, None) else: self.state.op_verb[i] = verb class SystemService(IOEp): NAME = "system" class TestService(IOEp): NAME = "test" class DCPExpertService(IOEp): NAME = "dcpexpert" class Disp0Service(IOEp): NAME = "disp0" class DPTXService(IOEp): NAME = "dptx" class DPSACService(IOEp): NAME = "dpsac" class DPDevService(IOEp): NAME = "dpdev" class MDCP29XXService(IOEp): NAME = "mcdp29xx" class AVService(IOEp): NAME = "av" class HDCPService(IOEp): NAME = "hdcp" class RemoteAllocService(IOEp): NAME = "remotealloc" class DCPTracer(ASCTracer): ENDPOINTS = { 0x20: SystemService, 0x21: TestService, 0x22: DCPExpertService, 0x23: Disp0Service, 0x24: DPTXService, 0x25: IOEp, # dcpav-power-ep 0x26: DPSACService, 0x27: DPDevService, 0x28: MDCP29XXService, 0x29: AVService, 0x2a: IOEp, # dcpdptx-port-ep 0x2b: HDCPService, 0x2c: IOEp, # cb-ap-to-dcp-service-ep 0x2d: RemoteAllocService, 0x37: DCPEp, # iomfb-link } def handle_msg(self, direction, r0, r1): super().handle_msg(direction, r0, r1) #iomon.poll() dart_dcp_tracer = DARTTracer(hv, "/arm-io/dart-dcp") dart_dcp_tracer.start() def readmem_iova(addr, size): try: return dart_dcp_tracer.dart.ioread(0, addr, size) except Exception as e: print(e) return None iomon.readmem = readmem_iova dcp_tracer = DCPTracer(hv, "/arm-io/dcp", verbose=1) dcp_tracer.start(dart_dcp_tracer.dart) #dcp_tracer.ep.dcpep.state.dumpfile = open("dcp.log", "a")