m1n1.fw.agx: WIP generate initdata

Signed-off-by: Scott Mansell <phiren@gmail.com>
This commit is contained in:
Scott Mansell 2022-03-30 22:51:36 +13:00 committed by Asahi Lina
parent 79b01a7bcb
commit a17e9e4f5e
8 changed files with 773 additions and 95 deletions

View file

@ -15,6 +15,8 @@ p.pmgr_adt_clocks_enable("/arm-io/sgx")
agx = Agx(u)
agx.verbose = 10
agx.boot()
#agx.uat.dump(0)
agx.build_initdata()
run_shell(globals(), msg="Have fun!")

View file

@ -2,7 +2,7 @@ from construct import *
from construct.core import evaluate
from .utils import Reloadable, ReloadableMeta
import inspect
import textwrap
def recusive_reload(obj):
if isinstance(obj, Construct) and hasattr(obj, 'subcons'):
@ -26,20 +26,32 @@ def recusive_reload(obj):
def repr_value(value):
if isinstance(value, int):
return f"0x{value:x}"
return f"{value:#x}"
else:
return repr(value)
class ConstructClassException(Exception):
pass
# We need to inherrit Construct as a metaclass so things like If and Select will work
class ReloadableConstructMeta(ReloadableMeta, Construct):
def __new__(cls, name, bases, attrs):
cls = super().__new__(cls, name, bases, attrs)
cls.name = name
try:
cls.flagbuildnone = cls.subcon.flagbuildnone
except AttributeError:
cls.flagbuildnone = False
cls.docs = None
return cls
class ConstructClass(Reloadable, metaclass=ReloadableConstructMeta):
class ConstructClassBase(Reloadable, metaclass=ReloadableConstructMeta):
""" Offers two benifits over regular construct
1. It's reloadable, and can recusrivly reload other refrenced ConstructClasses
@ -60,9 +72,52 @@ class ConstructClass(Reloadable, metaclass=ReloadableConstructMeta):
"""
flagbuildnone = True
parsed = None
def Apply(self, dict=None, **kwargs):
if dict is None:
dict = kwargs
for key in dict:
if not key.startswith('_'):
setattr(self, key, dict[key])
self._keys += [key]
@classmethod
def _build(cls, obj, stream, context, path):
addr = stream.tell()
try:
new_obj = cls.subcon._build(obj, stream, context, f"{path} -> {cls.name}")
except ConstructClassException:
raise
except ConstructError:
raise
except Exception as e:
raise ConstructClassException(f"at {path} -> {cls.name}") from e
# if obj is a raw value or Container, instance a proper object for it
if not isinstance(obj, ConstructClassBase):
obj = cls.__new__(cls)
# update the object with anything that build updated (such as defaults)
obj._apply(new_obj)
obj._addr = addr
return obj
@classmethod
def _sizeof(cls, context, path):
return cls.subcon._sizeof(context, f"{path} -> {cls.name}")
@classmethod
def _reloadcls(cls):
newcls = super()._reloadcls()
recusive_reload(newcls.subcon)
return newcls
def _apply(self, obj):
raise NotImplementedError()
@classmethod
def _parse(cls, stream, context, path):
addr = stream.tell()
@ -80,35 +135,82 @@ class ConstructClass(Reloadable, metaclass=ReloadableConstructMeta):
self._stream = stream
self._addr = addr
# Copy everything across from the constructed container/value
if isinstance(obj, Container):
for key in obj:
setattr(self, key, obj[key])
self._keys = [k for k in obj.keys() if k != "_io"]
else:
self.value = obj
self._apply(obj)
return self
@classmethod
def _reloadcls(cls):
newcls = super()._reloadcls()
recusive_reload(newcls.subcon)
return newcls
def build_stream(self, obj=None, stream=None, **contextkw):
assert stream != None
if obj is None:
obj = self
return Construct.build_stream(self, obj, stream, **contextkw)
def build(self, obj=None, **contextkw):
if obj is None:
obj = self
return Construct.build(self, obj, **contextkw)
class ConstructClass(ConstructClassBase, Container):
""" Offers two benifits over regular construct
1. It's reloadable, and can recusrivly reload other refrenced ConstructClasses
2. It's a class, so you can define methods
Currently only supports parsing, but could be extended to support building
Example:
Instead of:
MyStruct = Struct(
"field1" / Int32ul
)
class MyClass(ConstructClass):
subcon = Struct(
"field1" / Int32ul
)
"""
def __repr__(self, ignore=[]) -> str:
str = f"{self.__class__.__name__} @ 0x{self._addr:x}:"
str = f"{self.__class__.__name__} @ 0x{self._addr:x}:\n"
for key in self:
if key in ignore or key.startswith('_'):
continue
value = getattr(self, key)
val_repr = repr_value(value)
if '\n' in val_repr:
val_repr = textwrap.indent(val_repr, ' ' * 6)
if not val_repr.endswith('\n'):
val_repr += '\n'
str += f" {key} =\n{val_repr}"
else:
str += f" {key} = {val_repr}\n"
if hasattr(self, '_keys'):
str += "\n"
for key in self._keys:
if key in ignore:
continue
value = getattr(self, key)
str += f"\t{key} = {repr_value(value)}\n"
else:
str += f"\t{repr_value(self.value)}"
return str
__all__ = ["ConstructClass"]
def _apply(self, obj):
self.update(obj)
class ConstructValueClass(ConstructClassBase):
""" Same as Construct, but for subcons that are single values, rather than containers
the value is stored as .value
"""
def __repr__(self) -> str:
str = f"{self.__class__.__name__} @ 0x{self._addr:x}:"
str += f"\t{repr_value(self.value)}"
return str
@classmethod
def _build(cls, obj, stream, context, path):
return super()._build(cls, obj.value, stream, context, path)
def _apply(self, obj):
self.value = obj
__all__ = ["ConstructClass", "ConstructValueClass"]

View file

@ -1,11 +1,14 @@
# SPDX-License-Identifier: MIT
from ...utils import *
from ...malloc import Heap
from ..asc import StandardASC
from ..asc.base import ASCBaseEndpoint, msg_handler
from ...hw.uat import UAT
from .initdata import InitData, IOMapping
class PongMsg(Register64):
TYPE = 59, 52
@ -59,6 +62,21 @@ class Agx(StandardASC):
ttbr1_addr = getattr(self.sgx_dev, 'gfx-shared-region-base')
self.uat.initilize(ttbr_base, ttbr1_addr, kernel_base_va)
# create some heaps
shared_size = 2 * 1024 * 1024
shared_paddr, shared_dva = self.ioalloc(shared_size, ctx=0)
self.sharedHeap = Heap(shared_dva, shared_dva + shared_size)
print("shared heap:", hex(shared_dva), hex(shared_size))
normal_size = 2 * 1024 * 1024
normal_paddr, normal_dva = self.ioalloc(shared_size, ctx=0)
self.normalHeap = Heap(normal_dva, normal_dva + normal_size)
print("normal heap:", hex(normal_dva), hex(normal_size))
self.initdata = None
def iomap(self, addr, size, ctx=0):
dva = self.uat.iomap(ctx, addr, size)
@ -76,3 +94,21 @@ class Agx(StandardASC):
def iowrite(self, dva, data, ctx=0):
return self.uat.iowrite(ctx, dva & 0xFFFFFFFFFF, data)
def build_initdata(self):
if self.initdata is None:
self.initdata_addr = self.normalHeap.malloc(InitData.sizeof())
io_mappings = [IOMapping() for _ in range(0x14)]
self.initdata = InitData(self.normalHeap, self.sharedHeap, {"io_mappings": io_mappings})
self.initdata.build_stream(stream = self.uat.iostream(0, self.initdata_addr))
return self.initdata_addr
def boot(self):
# boot asc
super().boot()
initdata_addr = self.build_initdata()
self.agx.send_initdata(self.initdata_addr)

View file

@ -0,0 +1,203 @@
from m1n1.utils import *
from m1n1.constructutils import ConstructClass
from construct import *
ChannelState = Struct (
"head" / Default(Int32ul, 0),
Padding(0x1c),
"tail" / Default(Int32ul, 0),
Padding(0x1c),
"gpu_head" / Default(Int32ul, 0),
Padding(0x1c),
"gpu_tail" / Default(Int32ul, 0),
Padding(0x1c),
)
def ChannelInfo(msg_cls):
return Struct (
"state_addr" / Int64ul,
"state" / Pointer(this.state_addr, ChannelState),
"ringbuffer_addr" / Int64ul,
"ringbuffer" / Pointer(this.ringbuffer_addr, Array(256, msg_cls)),
)
class NotifyCmdQueueWork(ConstructClass):
subcon = Struct (
"queue_type" / Default(Int32ul, 0),
"cmdqueue_addr" / Default(Int64ul, 0),
#"cmdqueue" / Pointer(this.cmdqueue_addr, CommandQueueInfo),
"head" / Default(Int32ul, 0),
"unk_10" / Default(Int32ul, 0),
"unk_14" / Default(Int32ul, 0),
Padding(0x18),
)
TYPES = {
0: "SubmitTA",
1: "Submit3D",
2: "SubmitCompute",
}
def get_workitems(self):
try:
return self.workItems
except AttributeError:
self.workItems = self.cmdqueue.getSubmittedWork(self.head)
return self.workItems
def __repr__(self):
if (self.cmdqueue_addr == 0):
return "<Empty NotifyCmdQueueWork>"
str = f"{self.TYPES[self.queue_type]}(0x{self.cmdqueue_addr & 0xfff_ffffffff:x}, {self.head}, {self.unk_10}, {self.unk_14})"
# str += "\n WorkItems:"
# for work in self.get_workitems():
# str += f"\n\t{work}"
return str
class DeviceControl_17(ConstructClass):
subcon = Struct (
"msg_type" / Const(0x17, Int32ul),
"unk_4" / Int32ul,
"unk_8" / Int32ul,
"unk_c" / Int32ul,
"unk_10" / Int32ul,
"unk_14" / Int32ul,
"unk_18" / Int32ul,
"unkptr_1c" / Int64ul,
Padding(0xc)
)
# def __repr__(self):
# return f"DeviceControl_17()"
class DeviceControl_19(ConstructClass):
subcon = Struct (
"msg_type" / Const(0x19, Int32ul),
Padding(0x2c)
)
def __repr__(self):
return f"DeviceControl_19()"
class DeviceControl_1e(ConstructClass):
subcon = Struct (
"msg_type" / Const(0x1e, Int32ul),
# todo: more fields
Padding(0x2c)
)
def __repr__(self):
return f"DeviceControl_17()"
class DeviceControl_23(ConstructClass):
subcon = Struct (
"msg_type" / Const(0x23, Int32ul),
)
def __repr__(self):
return f"DeviceControl_23()"
# class DeviceControl_dummy(ConstructClass):
# subcon = Struct (
# "msg_type" / Const(0xcc, Int32ul),
# Padding(0x2c)
# )
# def __init__(self):
# self.msg_type = 0xcc
class UnknownMsg(ConstructClass):
subcon = Struct (
"msg_type" / Int32ul,
"data" / Bytes(0x2c),
)
def __init__(self):
self.msg_type = 0xcc
self.data = b"\0"*0x2c
def __repr__(self):
return f"Unknown(type={self.msg_type:x}, data={hexdump32(self.data)})"
DeviceControlMsg = Select (
DeviceControl_17,
DeviceControl_19,
DeviceControl_23,
UnknownMsg,
)
channelNames = [
"TA_0", "3D_0", "CL_0",
"TA_1", "3D_1", "CL_1",
"TA_2", "3D_2", "CL_2",
"TA_3", "3D_3", "CL_3",
"DevCtrl",
"Return0", "Return1", "Return2", "Return3"
]
class Channels(ConstructClass):
subcon = Struct(
# 12 channels for notifiy command queues of work submission
*[ channelNames[i] / ChannelInfo(NotifyCmdQueueWork) for i in range(12)],
# Device Control Channel
"DevCtrl" / ChannelInfo(DeviceControlMsg),
# Return Channels
"Return0" / ChannelInfo(Bytes(0x38)),
"Return1" / ChannelInfo(UnknownMsg),
"Return2" / ChannelInfo(UnknownMsg),
"Return3" / ChannelInfo(UnknownMsg),
)
def __init__(self, heap, shared_heap):
for i in range(12):
setattr(self, channelNames[i], Container(
state_addr = shared_heap.malloc(ChannelState.sizeof()),
ringbuffer_addr = heap.malloc(256 * NotifyCmdQueueWork.sizeof()),
ringbuffer = [Container()] * 256,
))
self.DevCtrl = Container(
state_addr = shared_heap.malloc(ChannelState.sizeof()),
ringbuffer_addr = heap.malloc(256 * UnknownMsg.sizeof()),
ringbuffer = [UnknownMsg()] * 256,
)
self.Return0 = Container(
state_addr = heap.malloc(ChannelState.sizeof()),
ringbuffer_addr = heap.malloc(256 * 0x38),
ringbuffer = [b"\0" * 0x38] * 256
)
self.Return1 = Container(
state_addr = heap.malloc(ChannelState.sizeof()),
ringbuffer_addr = heap.malloc(256 * UnknownMsg.sizeof()),
ringbuffer = [UnknownMsg()] * 256,
)
self.Return2 = Container(
state_addr = heap.malloc(ChannelState.sizeof()),
ringbuffer_addr = heap.malloc(256 * UnknownMsg.sizeof()),
ringbuffer = [UnknownMsg()] * 256,
)
self.Return3 = Container(
state_addr = heap.malloc(ChannelState.sizeof()),
ringbuffer_addr = heap.malloc(256 * UnknownMsg.sizeof()),
ringbuffer = [UnknownMsg()] * 256,
)
def __repr__(self):
str = "Channels:\n"
for name in channelNames:
channel = getattr(self, name)
str += f" {name}: head:{channel.state.head} tail:{channel.state.tail} "
str += f"ringbuffer: {channel.ringbuffer_addr:#x}\n"
return str

View file

@ -0,0 +1,278 @@
from m1n1.utils import *
from m1n1.constructutils import ConstructClass
from construct import *
from .channels import Channels
class InitData_unkptr20(ConstructClass):
subcon = Struct(
"unkptr_0" / Int64ul,
"unkptr_8" / Int64ul,
Padding(0x70)
)
def __init__(self, heap, shared_heap):
self.unkptr_0 = heap.malloc(0x40)
self.unkptr_8 = heap.malloc(0x40) # totally guessing the size on this one
class RegionB_unkprt_188(ConstructClass):
subcon = Struct(
"unk_0" / Int32ul,
"unk_4" / Int32ul,
"unk_8" / Int32ul,
"unk_c" / Int32ul,
"unk_10" / Float32l,
"unk_14" / Int32ul,
"unk_18" / Int32ul,
"unk_1c" / Int32ul,
"unk_20" / Int32ul,
"unk_24" / Int32ul,
"unk_28" / Int32ul,
"unk_2c" / Int32ul,
"unk_30" / Int32ul,
"unk_34" / Int32ul,
"unk_38" / Int32ul,
"unk_3c" / Int32ul,
"unk_40" / Int32ul,
"unk_44" / Int32ul,
"unk_48" / Int32ul,
"unk_4c" / Int32ul,
"padding" / Bytes(0x24),
"unk_84" / Float32l,
"unk_88" / Float32l,
"unk_8c" / Float32l,
"unk_90" / Float32l,
"unk_94" / Float32l,
"unk_98" / Float32l,
"unk_9c" / Float32l,
)
def __init__(self):
self.unk_0 = 0
self.unk_4 = 192000
self.unk_8 = 0
self.unk_c = 4
self.unk_10 = 1.0
self.unk_14 = 0
self.unk_18 = 0
self.unk_1c = 0
self.unk_20 = 0
self.unk_24 = 0
self.unk_28 = 1
self.unk_2c = 1
self.unk_30 = 0
self.unk_34 = 0
self.unk_38 = 0
self.unk_3c = 100
self.unk_40 = 1
self.unk_44 = 600
self.unk_48 = 0
self.unk_4c = 100
self.padding = b"\x00" * 0x24
self.unk_84 = 1.02
self.unk_88 = 1.02
self.unk_8c = 1.02
self.unk_90 = 1.02
self.unk_94 = 1.02
self.unk_98 = 1.02
self.unk_9c = 1.02
class IOMapping(ConstructClass):
subcon = Struct(
"phys_addr" / Int64ul,
"virt_addr" / Int64ul,
"size" / Int32ul,
"range_size" / Int32ul, # Useally the same as size, but for MCC, this is the size of a single MMC register range.
"readwrite" / Int64ul
)
def __init__(self, phys=0, addr=0, size=0, range_size=0, readwrite=0):
self.phys_addr = phys
self.virt_addr = addr
self.size = size
self.range_size = range_size
self.readwrite = readwrite
def __repr__(self):
if self.virt_addr == 0:
return "\n<IOMapping: Invalid>"
hv = self._stream.uat.hv
dev, range = hv.device_addr_tbl.lookup(self.phys_addr)
offset = self.phys_addr - range.start
return f"\nIO Mapping: {['RO', 'RW'][self.readwrite]} {self.virt_addr:#x} -> " \
f"{dev}+{offset:#x} ({self.size:#x} / {self.range_size:#x})"
class RegionB_unkprt_1a0(ConstructClass):
subcon = Struct(
"unk_0" / Int32ul,
"unk_4" / Int32ul,
"unk_8" / Int32ul, # Number of IO mappings?
"unk_c" / Int32ul,
"unk_10" / Int32ul,
"unk_14" / Int32ul,
"unk_18" / Int64ul,
"unk_20" / Int32ul,
"unk_24" / Int32ul,
"unk_28" / Int32ul,
"unk_2c" / Int32ul,
"unk_30" / Int64ul, # This might be another IO mapping? But it's weird
"unk_38" / Int64ul,
Padding(0x20),
"unk_data" / Bytes(0x170), # Doesn't seem to be necessary
"io_mappings" / Array(0x14, IOMapping),
)
def __init__(self, heap, shared_heap, info):
self.unk_0 = 0
self.unk_4 = 0x13
self.unk_8 = 0
self.unk_c = 0x14
self.unk_10 = 0
self.unk_14 = 1
self.unk_18 = 0xffc00000
self.unk_20 = 0
self.unk_24 = 0x11
self.unk_28 = 0
self.unk_2c = 0x11
self.unk_30 = 0x6f_ffff8000
self.unk_38 = 0xffffffa0_11800000
self.unk_data = b"\0"*0x170
self.io_mappings = info['io_mappings']
class InitData_RegionB(ConstructClass):
subcon = Struct(
"channels" / Channels,
Padding(0x60),
"unkptr_170" / Int64ul, # size 0xc0, Empty
"unkptr_178" / Int64ul, # size: 0x1c0, has random negative 1s, Needed for login screen
"unkptr_180" / Int64ul, # size: 0x140, Empty
"unkptr_188_addr" / Int64ul, # size: 0x3b80, few floats, few ints, needed for init
"unkptr_188" / Pointer(this.unkptr_188_addr, RegionB_unkprt_188)
"unkptr_190" / Int64ul, # size: 0x80, empty
"unkptr_198_addr" / Int64ul, # size: 0xc0, fw writes timestamps into this
"unkptr_198" / Pointer(this.unkptr_198_addr, Bytes(0xc0)),
"unkptr_1a0_addr" / Int64ul, # size: 0xb80, io stuff
"unkptr_1a0" / Pointer(this.unkptr_1a0_addr, RegionB_unkprt_1a0),
"unkptr_1a8" / Int64ul, # repeat of 1a0
"unkptr_1b0" / Int64ul, # Points into RegionC, Empty
"unkptr_1b8" / Int64ul, # Unallocated, Size 0x1000
"unkptr_1c0" / Int64ul, # Unallocated, size 0x300
"unkptr_1c8" / Int64ul, # Unallocated, unknown size
Padding(0x44),
"unkptr_214" / Int64ul, # Size: 0x4000
"unkptr_21c" / Int64ul, # Size: 0x4000
)
def __init__(self, heap, shared_heap, info):
self.channels = Channels(heap, shared_heap)
self.unkptr_170 = heap.malloc(0xc0)
self.unkptr_178 = heap.malloc(0x1c0)
self.unkptr_180 = heap.malloc(0x140)
self.unkptr_188_addr = heap.malloc(0x3b80)
self.unkptr_188 = RegionB_unkprt_188()
self.unkptr_190 = heap.malloc(0x80)
self.unkptr_198_addr = heap.malloc(0xc0)
self.unkptr_198 = b"\x25" + b"\x00"*0xbf
self.unkptr_1a0_addr = self.unkptr_1a8 = heap.malloc(0xb80)
self.unkptr_1a0 = RegionB_unkprt_1a0(heap, shared_heap, info)
self.unkptr_1b0 = shared_heap.malloc(0x1000)
self.unkptr_1b8 = heap.malloc(0x1000)
self.unkptr_1c0 = heap.malloc(0x300)
self.unkptr_1c8 = heap.malloc(0x1000)
self.unkptr_214 = self.unkptr_21c = shared_heap.malloc(0x4000)
def mon(self, add_fn):
add_fn(self.unkptr_170, 0x140, "unkptr_170")
add_fn(self.unkptr_178, 0x1c0, "unkptr_178")
add_fn(self.unkptr_180, 0x140, "unkptr_180")
add_fn(self.unkptr_188_addr, 0x3b80, "unkptr_188")
add_fn(self.unkptr_190, 0x80, "unkptr_190")
add_fn(self.unkptr_198_addr, 0xc0, "unkptr_198")
add_fn(self.unkptr_1a0_addr, 0xb80, "unkptr_1a0")
add_fn(self.unkptr_1b0, 0x1000, "unkptr_1b0")
add_fn(self.unkptr_214, 0x4000, "unkptr_214")
# Unallocated during init
add_fn(self.unkptr_1b8, 0x1000, "unkptr_1b8")
add_fn(self.unkptr_1c0, 0x300, "unkptr_1c0")
add_fn(self.unkptr_1c8, 0x1000, "unkptr_1c8")
class UatLevelInfo(ConstructClass):
subcon = Struct(
"index_shift" / Int8ul,
"unk_1" / Int8ul, # always 14, page bits?
"unk_2" / Int8ul, # always 14, also page bits?
"unk_3" / Int8ul, # always 8
"unk_4" / Int16ul, # 0x4000, Table size?
"num_entries" / Int16ul,
"unk_8" / Int64ul, # always 1
"unk_10" / Int64ul, # Full address mask? the same for all levels. Always 0x3ffffffc000
"index_mask" / Int64ul,
)
def __init__(self, index_shift, num_entries):
self.index_shift = index_shift
self.unk_1 = 14
self.unk_2 = 14
self.unk_3 = 8
self.unk_4 = 0x4000 # I doubt anything other than 16k pages is supported
self.num_entries = num_entries
self.unk_8 = 1
self.unk_10 = 0x3ffffffc000
self.index_mask = ((num_entries * 8) - 1) << index_shift
class InitData(ConstructClass):
subcon = Struct(
"unkptr_0" / Int64ul, # allocation size: 0x4000
"unk_8" / Default(Int32ul, 0),
"unk_c"/ Default(Int32ul, 0),
"regionB_addr" / Int64ul, # 0xfa00c338000 allocation size: 0x34000
"regionB" / Pointer(this.regionB_addr, InitData_RegionB),
"regionC_addr" / Int64ul, # 0xfa000200000 allocation size: 0x88000, heap?
"unkptr_20_addr" / Int64ul, # allocation size: 0x4000, but probably only 0x80 bytes long
"unkptr_20" / Pointer(this.unkptr_20_addr, InitData_unkptr20),
"uat_num_levels" / Int8ul,
"uat_page_bits" / Int8ul,
"uat_page_size" / Int16ul,
"uat_level_info" / Array(3, UatLevelInfo),
Padding(0x18),
"host_mapped_fw_allocations" / Int32ul, # must be 1
Padding(0x1000) # For safety
)
def __init__(self, heap, shared_heap, info):
self.unkptr_0 = shared_heap.memalign(0x4000, 0x4000)
self.regionB_addr = heap.malloc(InitData_RegionB.sizeof())
self.regionB = InitData_RegionB(heap, shared_heap, info)
self.regionC_addr = shared_heap.malloc(0x88000)
self.unkptr_20_addr = shared_heap.malloc(InitData_unkptr20.sizeof())
self.unkptr_20 = InitData_unkptr20(heap, shared_heap)
# This section seems to be data that would be used by firmware side page allocation
# But the current firmware doesn't have this functionality enabled, so it's not used?
self.uat_num_levels = 3
self.uat_page_bits = 14
self.uat_page_size = 0x4000
self.uat_level_info = [
UatLevelInfo(36, 8),
UatLevelInfo(25, 2048),
UatLevelInfo(14, 2048)
]
# Since the current firmware doesn't have this functionality enabled, we must enabled host
# mapped firmware allocations
self.host_mapped_fw_allocations = 1

View file

@ -135,6 +135,7 @@ class UatStream(Reloadable):
self.uat.iowrite(self.ctx, self.pos, bytes)
self.pos += len(bytes)
self.cache = None
return len(bytes)
def writable(self):
return True
@ -389,7 +390,7 @@ class UAT(Reloadable):
def invalidate_cache(self):
self.pt_cache = {}
def dump_level(self, level, base, table):
def recurse_level(self, level, base, table, page_fn=None, table_fn=None):
def extend(addr):
if addr >= 0x80_00000000:
addr |= 0xf00_00000000
@ -398,30 +399,40 @@ class UAT(Reloadable):
offset, size, ptecls = self.LEVELS[level]
cached, tbl = self.get_pt(table, size)
unmapped = False
sparse = False
for i, pte in enumerate(tbl):
pte = ptecls(pte)
if not pte.valid():
if not unmapped:
print(" ...")
unmapped = True
sparse = True
continue
unmapped = False
range_size = 1 << offset
start = extend(base + i * range_size)
end = start + range_size - 1
addr = pte.offset()
type = " page" if level + 1 == len(self.LEVELS) else "table"
print(f"{' ' * level}{type} ({i:03}): {start:011x} ... {end:011x}"
f" -> {pte.describe()}")
if level + 1 != len(self.LEVELS):
self.dump_level(level + 1, start, addr)
if level + 1 == len(self.LEVELS):
if page_fn:
page_fn(start, end, i, pte, level, sparse=sparse)
else:
if table_fn:
table_fn(start, end, i, pte, level, sparse=sparse)
self.recurse_level(level + 1, start, pte.offset(), page_fn, table_fn)
sparse = False
def foreach_page(self, ctx, page_fn):
self.recurse_level(0, 0, self.ttbr + ctx * 16, page_fn)
def dump(self, ctx):
if not self.ttbr:
print("No translation table")
return
self.dump_level(0, 0, self.ttbr + ctx * 16)
def print_fn(start, end, i, pte, level, sparse):
type = "page" if level+1 == len(self.LEVELS) else "table"
if sparse:
print(f"{' ' * level}...")
print(f"{' ' * level}{type}({i:03}): {start:011x} ... {end:011x}"
f" -> {pte.describe()}")
self.recurse_level(0, 0, self.ttbr + ctx * 16, print_fn, print_fn)

View file

@ -1,7 +1,10 @@
import textwrap
from .asc import *
from .agx_control import ControlList
from ..hw.uat import UAT
from ..hw.uat import UAT, MemoryAttr
from ..fw.agx.initdata import InitData as NewInitData
from m1n1.proxyutils import RegMonitor
from m1n1.constructutils import *
@ -23,7 +26,7 @@ class WorkCommand_4(ConstructClass):
subcon = Struct(
"magic" / Const(0x4, Int32ul),
"ptr" / Int64ul, # These appare to be shared over multiple contexes
"unk_c" / Int32ul, # Counts up by 0x100 each frame
"unk_c" / Int32ul, # Counts up by 0x100 each frame, gets written to ptr? (on completion?)
"flag" / Int32ul, # 2, 4 or 6
"unk_14" / Int32ul, # Counts up by 0x100 each frame? starts at diffrent point?
"uuid" / Int32ul,
@ -86,15 +89,13 @@ class WorkCommand_3(ConstructClass):
# offset 000001e8
"controllist_ptr" / Int64ul,
"controllist_size" / Int32ul,
"controllist_data" / Pointer(this.controllist_ptr, Bytes(this.controllist_size)),
"controllist" / Pointer(this.controllist_ptr, ControlList),
)
def parsed(self, ctx):
self.controllist = ControlList.parse(self.controllist_data)
def __repr__(self) -> str:
str = super().__repr__(ignore=['magic', 'controllist_data'])
str += f"\nControl List:\n{repr(self.controllist)}"
str = super().__repr__(ignore=['magic', 'controllist_ptr', 'controllist_size'])
# str += f" Control List - {self.controllist_size:#x} bytes @ {self.controllist_ptr:#x}:\n"
# str += textwrap.indent(repr(self.controllist), ' ' * 3)
return str
@ -130,7 +131,7 @@ class WorkCommand_1(ConstructClass):
"unk_8" / Int32ul,
"controllist_ptr" / Int64ul, # Command list
"controllist_size" / Int32ul,
"controllist_data" / Pointer(this.controllist_ptr, Bytes(this.controllist_size)),
"controllist" / Pointer(this.controllist_ptr, ControlList),
"unkptr_18" / Int64ul,
"unkptr_20" / Int64ul, # Size: 0x100
"unkptr_28" / Int64ul, # Size: 0x8c0
@ -147,12 +148,10 @@ class WorkCommand_1(ConstructClass):
"unk_70" / Int64ul,
)
def parsed(self, ctx):
self.controllist = ControlList.parse(self.controllist_data)
def __repr__(self) -> str:
str = super().__repr__(ignore=['magic', 'controllist_data'])
str += f"\nControl List:\n{repr(self.controllist)}"
str = super().__repr__(ignore=['magic', 'controllist_ptr', 'controllist_size'])
# str += f" Control List - {self.controllist_size:#x} bytes @ {self.controllist_ptr:#x}:\n"
# str += textwrap.indent(repr(self.controllist), ' ' * 3)
return str
@ -252,7 +251,7 @@ class CommandQueueInfo(ConstructClass):
Padding(0x20),
"unk_84" / Int32ul, # Set to 1 by gpu after work complete. Reset to zero by cpu
Padding(0x18),
"unkptr_a0" / Int64ul, # Size 0x40
"unkptr_a0" / Int64ul, # Size 0x40 ; Also seen in DeviceControl_17
# End of struct
)
@ -292,12 +291,12 @@ class CommandQueueInfo(ConstructClass):
class NotifyCmdQueueWork(ConstructClass):
subcon = Struct (
"queue_type" / Select(Const(0, Int32ul), Const(1, Int32ul), Const(2, Int32ul)),
"cmdqueue_addr" / Int64ul,
"queue_type" / Default(Select(Const(0, Int32ul), Const(1, Int32ul), Const(2, Int32ul)), 0),
"cmdqueue_addr" / Default(Int64ul, 0),
"cmdqueue" / Pointer(this.cmdqueue_addr, CommandQueueInfo),
"head" / Int32ul,
"unk_10" / Int32ul,
"unk_14" / Int32ul,
"head" / Default(Int32ul, 0),
"unk_10" / Default(Int32ul, 0),
"unk_14" / Default(Int32ul, 0),
"padding" / Bytes(0x18),
)
@ -316,9 +315,9 @@ class NotifyCmdQueueWork(ConstructClass):
def __repr__(self):
str = f"{self.TYPES[self.queue_type]}(0x{self.cmdqueue_addr & 0xfff_ffffffff:x}, {self.head}, {self.unk_10}, {self.unk_14})"
str += "\n WorkItems:"
str += "\n WorkItems:"
for work in self.get_workitems():
str += f"\n\t{work}"
str += f"\n{textwrap.indent(repr(work), ' ' * 6)}"
return str
@ -420,8 +419,6 @@ InitData = Struct(
"unk_30" / Int32ul
)
class InitMsg(Register64):
TYPE = 59, 52
@ -448,9 +445,6 @@ class InitEp(EP):
def init_resp(self, msg):
print(f" CPU Sent Init Response {msg.UNK1:x}, ADDR: {msg.ADDR:x}")
# monitor whatever is at this address
self.tracer.mon_addva(0, msg.ADDR, 0x4000, "init_region")
self.tracer.mon.poll()
return True
class GpuMsg(Register64):
@ -506,7 +500,7 @@ class AGXTracer(ASCTracer):
def __init__(self, hv, devpath, verbose=False):
super().__init__(hv, devpath, verbose)
self.uat = UAT(hv.iface, hv.u)
self.uat = UAT(hv.iface, hv.u, hv)
self.mon = RegMonitor(hv.u, ascii=True)
self.dev_sgx = hv.u.adt["/arm-io/sgx"]
self.gpu_region = getattr(self.dev_sgx, "gpu-region-base")
@ -544,13 +538,17 @@ class AGXTracer(ASCTracer):
self.channels_read_ptr[channel] = (self.channels_read_ptr[channel] + 1) % 256
msg = ChannelMessage.parse_stream(self.uat.iostream(0, addr))
return
if isinstance(msg, NotifyCmdQueueWork) and (msg.cmdqueue_addr & 0xfff_ffffffff) in self.ignorelist:
return
print(f"Channel[{self.channelNames[channel]}]: {msg}")
self.last_msg = msg
def ignore(self, addr):
def ignore(self, addr=None):
if addr is None:
addr = self.last_msg.cmdqueue_addr
self.ignorelist += [addr & 0xfff_ffffffff]
def kick(self, val):
@ -580,17 +578,23 @@ class AGXTracer(ASCTracer):
self.print_ringmsg(channel)
# if val not in [0x0, 0x1, 0x10, 0x11]:
if self.last_msg and isinstance(self.last_msg, NotifyCmdQueueWork):
if self.last_msg and isinstance(self.last_msg, (NotifyCmdQueueWork, DeviceControl_17)):
self.hv.run_shell()
self.last_msg = None
def pong(self):
self.mon.poll()
#print("pong")
try:
self.mon.poll()
except:
pass
#self.hv.run_shell()
# check the gfx -> cpu channels
for i in range(13, 17):
state = self.initdata.channels[i].state()
#for i in range(13, 17):
# state = self.initdata.channels[i].state()
@ -601,16 +605,42 @@ class AGXTracer(ASCTracer):
if start:
self.hv.trace_range(irange(start, size), mode=TraceMode.SYNC)
def dump_va(self, ctx):
data = b''
dataStart = 0
def dump_page(start, end, i, pte, level, sparse):
if i == 0 or sparse:
if len(data):
chexdump32(data, dataStart)
data = b''
dataStart = 0
if MemoryAttr(pte.AttrIndex) != MemoryAttr.Device and pte.OS:
if dataStart == 0:
dataStart = start
data += self.uat.ioread(0, start, 0x4000)
self.uat.foreach_page(0, dump_page)
if len(data):
chexdump32(data, dataStart)
def pong_init(self, addr):
self.initdata_addr = addr
self.initdata = InitData.parse_stream(self.uat.iostream(0, addr))
self.initdata2 = NewInitData.parse_stream(self.uat.iostream(0, addr))
#self.initdata2.regionB.mon(lambda addr, size, name: self.mon_addva(0, addr, size, name))
# for i, chan in list(enumerate(self.initdata.channels))[13:16]:
# self.mon_addva(0, chan.state_addr, 0x40, f"chan[{i}]->state")
# self.mon_addva(0, chan.ringbuffer_addr, 256 * 0x30, f"chan[{i}]->ringbuffer")
self.mon.poll()
#self.mon.poll()
self.hv.run_shell()

View file

@ -11,27 +11,29 @@ It seems like a common pattern is:
5. Finish (3D or Compute)
6. End
Error messages call these as SKU commands
"""
from re import A
from telnetlib import SE
from m1n1.constructutils import *
from construct import *
from construct.core import Int64ul, Int32ul
from construct.core import Int64ul, Int32ul, Int32sl
import textwrap
class Start3DCmd(ConstructClass):
subcon = Struct( # 0x194 bytes''''
"magic" / Const(0x24, Int32ul),
"unkptr_4" / Int64ul, # empty before run. Output? WorkCommand_1 + 0x3c0
"unkptr_c" / Int64ul, # ?? WorkCommand_1 + 0x78
"unkptr_14" / Int64ul, # same as workcommand_1.unkptr_28
"unkptr_1c" / Int64ul,
"unkptr_14" / Int64ul, # same as workcommand_1.unkptr_28.
"unkptr_1c" / Int64ul, # constant 0xffffffa00c33ec88, AKA initdata->unkptr_178+8
"unkptr_24" / Int64ul, # 4 bytes
"unkptr_2c" / Int64ul, # 0x3c bytes
"unkptr_34" / Int64ul, # 0x34 bytes
"cmdqueue_ptr" / Int64ul, # points back to the CommandQueueInfo that this command came from
"workitem_ptr" / Int64ul, # points back at the WorkItem that this command came from
"unk_4c" / Int32ul, # 4
"context_id" / Int32ul, # 4
"unk_50" / Int32ul, # 1
"unk_54" / Int32ul, # 1
"unk_58" / Int32ul, # 2
@ -50,6 +52,9 @@ class Start3DCmd(ConstructClass):
Padding(0x194 - 0x9c),
)
def __repr__(self):
return super().__repr__(ignore=["cmdqueue_ptr", "workitem_ptr"])
class Finalize3DCmd(ConstructClass):
subcon = Struct( # 0x9c bytes
@ -57,7 +62,7 @@ class Finalize3DCmd(ConstructClass):
"uuid" / Int32ul, # uuid for tracking
"unk_8" / Int32ul, # 0
"unkptr_c" / Int64ul,
"unk_14" / Int64ul, # 200
"unk_14" / Int64ul, # multiple of 0x100, gets written to unkptr_c
"unkptr_1c" / Int64ul, # Same as Start3DCmd.unkptr_14
"unkptr_24" / Int64ul,
"unk_2c" / Int64ul, # 1
@ -73,25 +78,29 @@ class Finalize3DCmd(ConstructClass):
"unk_7c" / Int64ul, # 0
"unk_84" / Int64ul, # 0
"unk_8c" / Int64ul, # 0
"unk_94" / Int32ul, # fffffe00
"startcmd_offset" / Int32sl, # realative offset from start of Finalize to StartComputeCmd
"unk_98" / Int32ul, # 1
)
def __repr__(self):
return super().__repr__(ignore=["cmdqueue_ptr", "workitem_ptr", "startcmd_offset"])
class ComputeInfo(ConstructClass):
# Only the cmdlist and pipelinebase and cmdlist fields are strictly needed to launch a basic
# compute shader.
subcon = Struct( # 0x1c bytes
"unkptr_0" / Int64ul, # always 0 in my tests. Might be unforms?
"unkptr_8" / Int64ul, # CommandList from userspace
"cmdlist" / Int64ul, # CommandList from userspace
"unkptr_10" / Int64ul, # size 8, null
"unkptr_18" / Int64ul, # size 8, null
"unkptr_20" / Int64ul, # size 8, null
"unkptr_28" / Int64ul,
"pipeline_base" / Int64ul, # 0x11_00000000: Used for certain "short" pointers like pipelines (and shader?)
"unkptr_34" / Int64ul, # always 0x8c60 ?
"unk_3c" / Int32ul, # 0
"unkptr_28" / Int64ul, #
"pipeline_base" / Int64ul, # 0x11_00000000: Used for certain "short" pointers like pipelines (and shaders?)
"unkptr_38" / Int64ul, # always 0x8c60.
"unk_40" / Int32ul, # 0x41
"unk_44" / Int32ul, # 0
"unkptr_48" / Int64ul,
"unkptr_48" / Int64ul, #
"unk_50" / Int32ul, # 0x40 - Size?
"unk_54" / Int32ul, # 0
"unk_58" / Int32ul, # 1
@ -108,7 +117,7 @@ class StartComputeCmd(ConstructClass):
"computeinfo" / Pointer(this.computeinfo_addr, ComputeInfo),
"unkptr_14" / Int64ul, # In gpu-asc's heap? Did this pointer come from the gfx firmware?
"cmdqueue_ptr" / Int64ul, # points back to the submitinfo that this command came from
"unk_24" / Int32ul, # 4
"context_id" / Int32ul, # 4
"unk_28" / Int32ul, # 1
"unk_2c" / Int32ul, # 0
"unk_30" / Int32ul,
@ -129,19 +138,22 @@ class StartComputeCmd(ConstructClass):
except AttributeError:
pass
def __repr__(self):
return super().__repr__(ignore=["cmdqueue_ptr", "workitem_ptr"])
class FinalizeComputeCmd(ConstructClass):
subcon = Struct( # 0x64 bytes''''
"magic" / Const(0x2a, Int32ul),
"unkptr_4" / Int64ul, # same as ComputeStartCmd.unkptr_14
"cmdqueue_ptr" / Int64ul, # points back to the submitinfo
"unk_14" / Int32ul,
"unk_14" / Int32ul, # Context ID?
"unk_18" / Int32ul,
"unkptr_1c" / Int64ul, # same as ComputeStartCmd.unkptr_3c
"unk_24" / Int32ul,
"uuid" / Int32ul, # uuid for tracking?
"unkptr_2c" / Int64ul,
"unk_34" / Int32ul, # Size?
"unk_34" / Int32ul, # Gets written to unkptr_2c (after completion?)
"unk_38" / Int32ul,
"unk_3c" / Int32ul,
"unk_40" / Int32ul,
@ -151,10 +163,13 @@ class FinalizeComputeCmd(ConstructClass):
"unk_50" / Int32ul,
"unk_54" / Int32ul,
"unk_58" / Int32ul,
"unk_5c" / Int32ul,
"startcmd_offset" / Int32sl, # realative offset from start of Finalize to StartComputeCmd
"unk_60" / Int32ul,
)
def __repr__(self):
return super().__repr__(ignore=["cmdqueue_ptr", "workitem_ptr", "startcmd_offset"])
class EndCmd(ConstructClass):
subcon = Struct(
"magic" / Const(0x18, Byte),
@ -195,6 +210,7 @@ class WaitForInterruptCmd(ConstructClass):
return f"WaitForInterrupt({self.unk_1}, {self.unk_2}, {self.unk_3})"
class NopCmd(ConstructClass):
# This doesn't exist
subcon = Struct(
"magic" / Const(0x00, Int32ul),
)
@ -206,7 +222,7 @@ class NopCmd(ConstructClass):
class ControlList(ConstructClass):
subcon = GreedyRange(
Select(
NopCmd,
#NopCmd,
WaitForInterruptCmd,
EndCmd,
@ -222,7 +238,7 @@ class ControlList(ConstructClass):
def __repr__(self):
str = ""
for cmd in self.value:
str += f"\t{cmd}\n"
str += repr(cmd) + '\n'
if isinstance(cmd, EndCmd):
break
return str