#!/usr/bin/env python3 # SPDX-License-Identifier: MIT import sys, traceback, struct, array, bisect, os, signal from construct import * from asm import ARMAsm from tgtypes import * from proxy import IODEV, START, EVENT, EXC, EXC_RET, ExcInfo from utils import * from sysreg import * from macho import MachO from adt import load_adt import xnutools import shell PAC_MASK = 0xfffff00000000000 class MMIOTraceFlags(Register32): WIDTH = 4, 0 WRITE = 5 MULTI = 6 EvtMMIOTrace = Struct( "flags" / RegAdapter(MMIOTraceFlags), "reserved" / Int32ul, "pc" / Hex(Int64ul), "addr" / Hex(Int64ul), "data" / Hex(Int64ul), ) class HV_EVENT(IntEnum): HOOK_VM = 1 VTIMER = 2 USER_INTERRUPT = 3 WDT_BARK = 4 VMProxyHookData = Struct( "flags" / RegAdapter(MMIOTraceFlags), "id" / Int32ul, "addr" / Hex(Int64ul), "data" / Array(2, Hex(Int64ul)), ) class HV: PTE_VALID = 1 << 0 PTE_MEMATTR_UNCHANGED = 0b1111 << 2 PTE_S2AP_RW = 0b11 << 6 PTE_SH_NS = 0b11 << 8 PTE_ACCESS = 1 << 10 PTE_ATTRIBUTES = PTE_ACCESS | PTE_SH_NS | PTE_S2AP_RW | PTE_MEMATTR_UNCHANGED SPTE_TRACE_READ = 1 << 63 SPTE_TRACE_WRITE = 1 << 62 SPTE_SYNC_TRACE = 1 << 61 SPTE_MAP = 0 << 50 SPTE_HOOK = 1 << 50 SPTE_PROXY_HOOK_R = 2 << 50 SPTE_PROXY_HOOK_W = 3 << 50 SPTE_PROXY_HOOK_RW = 4 << 50 MSR_REDIRECTS = { SCTLR_EL1: SCTLR_EL12, TTBR0_EL1: TTBR0_EL12, TTBR1_EL1: TTBR1_EL12, TCR_EL1: TCR_EL12, ESR_EL1: ESR_EL12, FAR_EL1: FAR_EL12, AFSR0_EL1: AFSR0_EL12, AFSR1_EL1: AFSR1_EL12, MAIR_EL1: MAIR_EL12, AMAIR_EL1: AMAIR_EL12, CONTEXTIDR_EL1: CONTEXTIDR_EL12, ACTLR_EL1: ACTLR_EL12, AMX_CTL_EL1: AMX_CTL_EL12, SPRR_CONFIG_EL1: SPRR_CONFIG_EL12, SPRR_PERM_EL1: SPRR_PERM_EL12, SPRR_PERM_EL0: SPRR_PERM_EL02, SPRR_UNK1_EL1: SPRR_UNK1_EL12, SPRR_UMASK0_EL1: SPRR_UMASK0_EL12, APCTL_EL1: APCTL_EL12, APSTS_EL1: APSTS_EL12, KERNELKEYLO_EL1: KERNELKEYLO_EL12, KERNELKEYHI_EL1: KERNELKEYHI_EL12, GXF_CONFIG_EL1: GXF_CONFIG_EL12, GXF_ABORT_EL1: GXF_ABORT_EL12, GXF_ENTER_EL1: GXF_ENTER_EL12, } def __init__(self, iface, proxy, utils): self.iface = iface self.p = proxy self.u = utils self.vbar_el1 = None self.want_vbar = None self.vectors = [None] self._stepping = False self.sym_offset = 0 self.symbols = [] self.sysreg = {} self.novm = False self._in_handler = False self._sigint_pending = False self.vm_hooks = [] def unmap(self, ipa, size): assert self.p.hv_map(ipa, 0, size, 0) >= 0 def map_hw(self, ipa, pa, size): assert self.p.hv_map(ipa, pa | self.PTE_ATTRIBUTES | self.PTE_VALID, size, 1) >= 0 def map_sw(self, ipa, pa, size): assert self.p.hv_map(ipa, pa | self.SPTE_MAP, size, 1) >= 0 def map_hook(self, ipa, size, read=None, write=None, **kwargs): if read is not None: if write is not None: t = self.SPTE_PROXY_HOOK_RW else: t = self.SPTE_PROXY_HOOK_R elif write is not None: t = self.SPTE_PROXY_HOOK_W else: assert False index = len(self.vm_hooks) self.vm_hooks.append((read, write, ipa, kwargs)) assert self.p.hv_map(ipa, (index << 2) | t, size, 1) >= 0 def addr(self, addr): unslid_addr = addr + self.sym_offset if addr < self.tba.virt_base or unslid_addr < self.macho.vmin: return f"0x{addr:x}" saddr, name = self.sym(addr) if name is None: return f"0x{addr:x} (0x{unslid_addr:x})" return f"0x{addr:x} ({name}+0x{unslid_addr - saddr:x})" def sym(self, addr): unslid_addr = addr + self.sym_offset if addr < self.tba.virt_base or unslid_addr < self.macho.vmin: return None, None idx = bisect.bisect_left(self.symbols, (unslid_addr + 1, "")) - 1 if idx < 0 or idx >= len(self.symbols): return f"0x{addr:x} (0x{unslid_addr:x})" return self.symbols[idx] def handle_mmiotrace(self, data): evt = EvtMMIOTrace.parse(data) if evt.flags.WRITE: t = "W" else: t = "R" dev, zone = self.device_addr_tbl.lookup(evt.addr) if evt.flags.MULTI: m = "+" else: m = " " print(f"[0x{evt.pc:016x}] MMIO: {t}.{1< 0x{ret:x}") return ret for addr in (0x23b700420, 0x23d280098, 0x23d280088, 0x23d280090): self.map_hook(addr, 4, write=wh, read=rh) hcr = HCR(self.u.mrs(HCR_EL2)) if self.novm: hcr.VM = 0 hcr.AMO = 0 else: hcr.TACR = 1 hcr.TIDCP = 0 hcr.TVM = 0 hcr.FMO = 1 hcr.IMO = 0 self.u.msr(HCR_EL2, hcr.value) # Trap dangerous things hacr = HACR(0) if not self.novm: #hacr.TRAP_CPU_EXT = 1 #hacr.TRAP_SPRR = 1 #hacr.TRAP_GXF = 1 hacr.TRAP_CTRR = 1 hacr.TRAP_EHID = 1 hacr.TRAP_HID = 1 hacr.TRAP_ACC = 1 hacr.TRAP_IPI = 1 self.u.msr(HACR_EL2, hacr.value) # Enable AMX amx_ctl = AMX_CTL(self.u.mrs(AMX_CTL_EL1)) amx_ctl.EN_EL1 = 1 self.u.msr(AMX_CTL_EL1, amx_ctl.value) # Set guest AP keys self.u.msr(APVMKEYLO_EL2, 0x4E7672476F6E6147) self.u.msr(APVMKEYHI_EL2, 0x697665596F755570) self.u.msr(APSTS_EL12, 1) #self.p.hv_map_vuart(0x2_35200000, getattr(IODEV, self.iodev.name + "_SEC")) actlr = ACTLR(self.u.mrs(ACTLR_EL12)) actlr.EnMDSB = 1 self.u.msr(ACTLR_EL12, actlr.value) self.setup_adt() def setup_adt(self): if self.iodev in (IODEV.USB0, IODEV.USB1): idx = int(str(self.iodev)[-1]) for idx in (0, 1): for prefix in ("/arm-io/dart-usb%d", "/arm-io/atc-phy%d", "/arm-io/usb-drd%d", "/arm-io/acio%d", "/arm-io/acio-cpu%d", "/arm-io/dart-acio%d", "/arm-io/apciec%d", "/arm-io/dart-apciec%d", "/arm-io/apciec%d-piodma", "/arm-io/i2c0/hpmBusManager/hpm%d", "/arm-io/atc%d-dpxbar", "/arm-io/atc%d-dpphy", "/arm-io/atc%d-dpin0", "/arm-io/atc%d-dpin1", "/arm-io/atc-phy%d", ): name = prefix % idx print(f"Removing ADT node {name}") try: del self.adt[name] except KeyError: pass for name in ("/cpus/cpu1", "/cpus/cpu2", "/cpus/cpu3", "/cpus/cpu4", "/cpus/cpu5", "/cpus/cpu6", "/cpus/cpu7", ): print(f"Removing ADT node {name}") try: del self.adt[name] except KeyError: pass #for cpu in list(self.adt["cpus"]): #if cpu.name != "cpu0": #print(f"Removing ADT node {cpu._path}") #try: #del self.adt["cpus"][cpu.name] #except KeyError: #pass def set_bootargs(self, boot_args): if "-v" in boot_args.split(): self.tba.video.display = 0 else: self.tba.video.display = 1 print(f"Setting boot arguments to {boot_args!r}") self.tba.cmdline = boot_args def load_macho(self, data, symfile=None): if isinstance(data, str): data = open(data, "rb") self.macho = macho = MachO(data) symbols = None if symfile is not None: if isinstance(symfile, str): symfile = open(symfile, "rb") syms = MachO(symfile) macho.add_symbols("com.apple.kernel", syms) self.symbols = [(v, k) for k, v in macho.symbols.items()] self.symbols.sort() def load_hook(data, segname, size, fileoff, dest): if segname != "__TEXT_EXEC": return data print(f"Patching segment {segname}...") a = array.array("I", data) output = [] p = 0 while (p := data.find(b"\x20\x00", p)) != -1: if (p & 3) != 2: p += 1 continue opcode = a[p // 4] inst = self.hvc((opcode & 0xffff)) off = fileoff + (p & ~3) if off >= 0xbfcfc0: print(f" 0x{off:x}: 0x{opcode:04x} -> hvc 0x{opcode:x} (0x{inst:x})") a[p // 4] = inst p += 4 print("Done.") return a.tobytes() #image = macho.prepare_image(load_hook) image = macho.prepare_image() sepfw_start, sepfw_length = self.u.adt["chosen"]["memory-map"].SEPFW tc_start, tc_size = self.u.adt["chosen"]["memory-map"].TrustCache image_size = align(len(image)) sepfw_off = image_size image_size += align(sepfw_length) self.bootargs_off = image_size bootargs_size = 0x4000 image_size += bootargs_size print(f"Total region size: 0x{image_size:x} bytes") self.phys_base = phys_base = guest_base = self.u.heap_top guest_base += 16 << 20 # ensure guest starts within a 16MB aligned region of mapped RAM adt_base = guest_base guest_base += align(self.u.ba.devtree_size) tc_base = guest_base guest_base += align(tc_size) self.guest_base = guest_base mem_top = self.u.ba.phys_base + self.u.ba.mem_size mem_size = mem_top - phys_base print(f"Physical memory: 0x{phys_base:x} .. 0x{mem_top:x}") print(f"Guest region start: 0x{guest_base:x}") self.entry = macho.entry - macho.vmin + guest_base print(f"Mapping guest physical memory...") self.map_hw(0x800000000, 0x800000000, self.u.ba.phys_base - 0x800000000) self.map_hw(phys_base, phys_base, self.u.ba.mem_size_actual - phys_base + 0x800000000) print(f"Loading kernel image (0x{len(image):x} bytes)...") self.u.compressed_writemem(guest_base, image, True) self.p.dc_cvau(guest_base, len(image)) self.p.ic_ivau(guest_base, len(image)) print(f"Copying SEPFW (0x{sepfw_length:x} bytes)...") self.p.memcpy8(guest_base + sepfw_off, sepfw_start, sepfw_length) print(f"Copying TrustCache (0x{tc_size:x} bytes)...") self.p.memcpy8(tc_base, tc_start, tc_size) print(f"Adjusting addresses in ADT...") self.adt["chosen"]["memory-map"].SEPFW = (guest_base + sepfw_off, sepfw_length) self.adt["chosen"]["memory-map"].TrustCache = (tc_base, tc_size) self.adt["chosen"]["memory-map"].DeviceTree = (adt_base, align(self.u.ba.devtree_size)) self.adt["chosen"]["memory-map"].BootArgs = (guest_base + self.bootargs_off, bootargs_size) adt_blob = self.adt.build() print(f"Uploading ADT (0x{len(adt_blob):x} bytes)...") self.iface.writemem(adt_base, adt_blob) print(f"Setting up bootargs at 0x{guest_base + self.bootargs_off:x}...") self.tba.mem_size = mem_size self.tba.phys_base = phys_base self.tba.virt_base = 0xfffffe0010000000 + (phys_base & (32 * 1024 * 1024 - 1)) self.tba.devtree = adt_base - phys_base + self.tba.virt_base self.tba.top_of_kernel_data = guest_base + image_size self.sym_offset = macho.vmin - guest_base + self.tba.phys_base - self.tba.virt_base self.iface.writemem(guest_base + self.bootargs_off, BootArgs.build(self.tba)) def _handle_sigint(self, signal=None, stack=None): self._sigint_pending = True if self._in_handler: return # Kick the proxy to break out of the hypervisor self.iface.dev.write(b"!") def start(self): print(f"Disabling other iodevs...") for iodev in IODEV: if iodev != self.iodev: print(f" - {iodev!s}") self.p.iodev_set_usage(iodev, 0) print(f"Improving logo...") self.p.fb_improve_logo() print(f"Shutting down framebuffer...") self.p.fb_shutdown() print(f"Enabling SPRR...") self.u.msr(SPRR_CONFIG_EL1, 1) print(f"Enabling GXF...") self.u.msr(GXF_CONFIG_EL1, 1) print(f"Jumping to entrypoint at 0x{self.entry:x}") self.iface.dev.timeout = None signal.signal(signal.SIGINT, self._handle_sigint) # Does not return self.p.hv_start(self.entry, self.guest_base + self.bootargs_off)