experiments/aop.py: Extend AOP experiment, add tracer

The AOP uses an 'EPIC' protocol similar to the one other coprocessor
firmware is using but not in the exact same version. Add code for
tracing the AOP calls and extend the aop.py experiment with the client
side of it. Include description of audio calls and some other calls
related to sensor discovery.

Furthermore, in experiments/aop.py, do some AOP audio setup. Once that
is done we can start streaming samples from the internal microphones by
making what AOP considers power state adjustment calls. That is, we
adjust the power state of a 'hpai' device, first to a 'pw1 ' stage,
then to 'pwrd' stage.

So, to see microphone samples, enter the AOP experiment shell first:

  $ M1N1DEVICE=/dev/ttyACM0 experiments/aop.py

Within the shell, adjust the power state of 'hpai':

  >>> aop_set_audio_pstate('hpai', 'pw1 ')

At that point /arm-io/admac-aop-audio powers up. In parallel to the AOP
shell, we can start tools/admac_stream.py on the just powered-up ADMAC
instance:

  $ M1N1HEAP=0x10010000000 M1N1DEVICE=/dev/ttyACM1 tools/admac_stream.py \
        --node admac-aop-audio --channel 1 -v | xxd -g 4 -c 12 -e

Returning back to the AOP shell, we can then set 'hpai' to 'pwrd' state
to kick off the streaming:

  >>> aop_set_audio_pstate('hpai', 'pwrd')

By that point, we should see samples coming out on the ADMAC end. The
samples are 32-bit floats packed in groups of three in a frame, e.g.

00000000: ba7ac6a7 ba32d3c3 baa17ae2  ..z...2..z..
0000000c: 38ccea5f b99c1a37 ba0c4bb1  _..87....K..
00000018: 39d2354f 3964b5ff 39b209fb  O5.9..d9...9
00000024: b96a1d1f 39c8503f 3958fc4f  ..j.?P.9O.X9
00000030: b6b1f5ff 39c72b8f 39bbe017  .....+.9...9
0000003c: 3a912de5 36dd4f7f 37f1147f  .-.:.O.6...7

This has been tested and will to some degree be specific to 2021 Macbook
Pro (t6000). Differences on other models TBD (at the very least the
number of microphones can be presumed different).

Signed-off-by: Martin Povišer <povik@protonmail.com>
This commit is contained in:
Martin Povišer 2022-10-20 23:43:55 +02:00 committed by Hector Martin
parent a8ef753520
commit fc66046d76
3 changed files with 953 additions and 14 deletions

View file

@ -4,6 +4,7 @@ import sys, pathlib
sys.path.append(str(pathlib.Path(__file__).resolve().parents[1]))
import struct
import traceback
from construct import *
from m1n1.setup import *
@ -12,6 +13,186 @@ from m1n1.hw.dart import DART, DARTRegs
from m1n1.fw.asc import StandardASC, ASCDummyEndpoint
from m1n1.fw.asc.base import *
from m1n1.fw.aop import *
from m1n1.fw.aop.ipc import *
from m1n1.fw.afk.rbep import *
from m1n1.fw.afk.epic import *
# Set up a secondary proxy channel so that we can stream
# the microphone samples
p.usb_iodev_vuart_setup(p.iodev_whoami())
p.iodev_set_usage(IODEV.USB_VUART, USAGE.UARTPROXY)
p.pmgr_adt_clocks_enable("/arm-io/dart-aop")
adt_dc = u.adt["/arm-io/aop/iop-aop-nub/aop-audio/dc-2400000"]
pdm_config = Container(
unk1=2,
clockSource=u'pll ',
pdmFrequency=2400000,
unk3_clk=24000000,
unk4_clk=24000000,
unk5_clk=24000000,
channelPolaritySelect=256,
unk7=99,
unk8=1013248,
unk9=0,
ratios=Container(
r1=15,
r2=5,
r3=2,
),
filterLengths=0x542c47,
coeff_bulk=120,
coefficients=GreedyRange(Int32sl).parse(adt_dc.coefficients),
unk10=1,
micTurnOnTimeMs=20,
unk11=1,
micSettleTimeMs=50,
)
decimator_config = Container(
latency=15,
ratios=Container(
r1=15,
r2=5,
r3=2,
),
filterLengths=0x542c47,
coeff_bulk=120,
coefficients=GreedyRange(Int32sl).parse(adt_dc.coefficients),
)
class AFKEP_Hello(AFKEPMessage):
TYPE = 63, 48, Constant(0x80)
UNK = 7, 0
class AFKEP_Hello_Ack(AFKEPMessage):
TYPE = 63, 48, Constant(0xa0)
class EPICEndpoint(AFKRingBufEndpoint):
BUFSIZE = 0x1000
def __init__(self, *args, **kwargs):
self.seq = 0x0
self.wait_reply = False
self.ready = False
super().__init__(*args, **kwargs)
@msg_handler(0x80, AFKEP_Hello)
def Hello(self, msg):
self.rxbuf, self.rxbuf_dva = self.asc.ioalloc(self.BUFSIZE)
self.txbuf, self.txbuf_dva = self.asc.ioalloc(self.BUFSIZE)
self.send(AFKEP_Hello_Ack())
def handle_hello(self, hdr, sub, fd):
if sub.type != 0xc0:
return False
payload = fd.read()
name = payload.split(b"\0")[0].decode("ascii")
self.log(f"Hello! (endpoint {name})")
self.ready = True
return True
def handle_reply(self, hdr, sub, fd):
if self.wait_reply:
self.pending_call.read_resp(fd)
self.wait_reply = False
return True
return False
def handle_ipc(self, data):
fd = BytesIO(data)
hdr = EPICHeader.parse_stream(fd)
sub = EPICSubHeaderVer2.parse_stream(fd)
handled = False
if sub.category == EPICCategory.REPORT:
handled = self.handle_hello(hdr, sub, fd)
if sub.category == EPICCategory.REPLY:
handled = self.handle_reply(hdr, sub, fd)
if not handled and getattr(self, 'VERBOSE', False):
self.log(f"< 0x{hdr.channel:x} Type {hdr.type} Ver {hdr.version} Tag {hdr.seq}")
self.log(f" Len {sub.length} Ver {sub.version} Cat {sub.category} Type {sub.type:#x} Ts {sub.timestamp:#x}")
self.log(f" Unk1 {sub.unk1:#x} Unk2 {sub.unk2:#x}")
chexdump(fd.read())
def indirect(self, call, chan=0x1000000d, timeout=0.1):
tx = call.ARGS.build(call.args)
self.asc.iface.writemem(self.txbuf, tx[4:])
cmd = self.roundtrip(IndirectCall(
txbuf=self.txbuf_dva, txlen=len(tx) - 4,
rxbuf=self.rxbuf_dva, rxlen=self.BUFSIZE,
retcode=0,
), category=EPICCategory.COMMAND, typ=call.TYPE)
fd = BytesIO()
fd.write(struct.pack("<I", cmd.rets.retcode))
fd.write(self.asc.iface.readmem(self.rxbuf, cmd.rets.rxlen))
fd.seek(0)
call.read_resp(fd)
return call
def roundtrip(self, call, chan=0x1000000d, timeout=0.3,
category=EPICCategory.NOTIFY, typ=None):
tx = call.ARGS.build(call.args)
fd = BytesIO()
fd.write(EPICHeader.build(Container(
channel=chan,
type=EPICType.NOTIFY,
version=2,
seq=self.seq,
)))
self.seq += 1
fd.write(EPICSubHeaderVer2.build(Container(
length=len(tx),
category=category,
type=typ or call.TYPE,
)))
fd.write(tx)
self.pending_call = call
self.wait_reply = True
self.send_ipc(fd.getvalue())
deadline = time.time() + timeout
while time.time() < deadline and self.wait_reply:
self.asc.work()
if self.wait_reply:
self.wait_reply = False
raise ASCTimeout("ASC reply timed out")
return call
class SPUAppEndpoint(EPICEndpoint):
SHORT = "SPUAppep"
class AccelEndpoint(EPICEndpoint):
SHORT = "accelep"
class GyroEndpoint(EPICEndpoint):
SHORT = "gyroep"
class UNK23Endpoint(EPICEndpoint):
SHORT = "unk23ep"
class LASEndpoint(EPICEndpoint):
SHORT = "lasep"
#VERBOSE = True # <--- uncomment to see lid angle measurements
class WakehintEndpoint(EPICEndpoint):
SHORT = "wakehintep"
class UNK26Endpoint(EPICEndpoint):
SHORT = "unk26ep"
class AudioEndpoint(EPICEndpoint):
SHORT = "audioep"
class OSLogMessage(Register64):
TYPE = 63, 56
@ -41,15 +222,18 @@ class AOPClient(StandardASC, AOPBase):
ENDPOINTS = {
8: AOPOSLogEndpoint,
0x20: ASCDummyEndpoint,
0x21: ASCDummyEndpoint,
0x22: ASCDummyEndpoint,
0x23: ASCDummyEndpoint,
0x24: ASCDummyEndpoint,
0x25: ASCDummyEndpoint,
0x26: ASCDummyEndpoint,
0x27: ASCDummyEndpoint,
0x28: ASCDummyEndpoint
0x20: SPUAppEndpoint,
0x21: AccelEndpoint,
0x22: GyroEndpoint,
0x23: UNK23Endpoint,
0x24: LASEndpoint,
0x25: WakehintEndpoint,
0x26: UNK26Endpoint,
0x27: AudioEndpoint,
0x28: EPICEndpoint,
0x29: EPICEndpoint,
0x2a: EPICEndpoint,
0x2b: EPICEndpoint
}
def __init__(self, u, adtpath, dart=None):
@ -61,11 +245,11 @@ class AOPClient(StandardASC, AOPBase):
p.dapf_init_all()
dart = DART.from_adt(u, "/arm-io/dart-aop")
dart = DART.from_adt(u, "/arm-io/dart-aop", iova_range=(0x2c000, 0x10_000_000))
dart.initialize()
dart.regs.TCR[0].set( BYPASS_DAPF=0, BYPASS_DART=0, TRANSLATE_ENABLE=1)
dart.regs.TCR[15].set(BYPASS_DAPF=0, BYPASS_DART=1, TRANSLATE_ENABLE=0)
dart.regs.TCR[0].set(BYPASS_DAPF=0, BYPASS_DART=0, TRANSLATE_ENABLE=1)
dart.regs.TCR[7].set(BYPASS_DAPF=0, BYPASS_DART=0, TRANSLATE_ENABLE=1)
dart.regs.TCR[15].val = 0x20100
aop = AOPClient(u, "/arm-io/aop", dart)
@ -77,9 +261,50 @@ aop.update_bootargs({
aop.verbose = 4
def set_aop_audio_pstate(devid, pstate):
audep.roundtrip(SetDeviceProp(
devid=devid,
modifier=202,
data=Container(
devid=devid,
cookie=1,
target_pstate=pstate,
unk2=1,
)
)).check_retcode()
try:
aop.boot()
for epno in range(0x20, 0x2c):
aop.start_ep(epno)
timeout = 10
while (not aop.audioep.ready) and timeout:
aop.work_for(0.1)
timeout -= 1
if not timeout:
raise Exception("Timed out waiting on audio endpoint")
print("Finished boot")
audep = aop.audioep
audep.roundtrip(AttachDevice(devid='pdm0')).check_retcode()
audep.indirect(SetDeviceProp(
devid='pdm0', modifier=200, data=pdm_config)
).check_retcode()
audep.indirect(SetDeviceProp(
devid='pdm0', modifier=210, data=decimator_config)
).check_retcode()
audep.roundtrip(AttachDevice(devid='hpai')).check_retcode()
audep.roundtrip(AttachDevice(devid='lpai')).check_retcode()
audep.roundtrip(SetDeviceProp(
devid='lpai', modifier=301, data=Container(unk1=7, unk2=7, unk3=1, unk4=7))
).check_retcode()
except KeyboardInterrupt:
pass
except Exception:
print(traceback.format_exc())
run_shell(locals())
run_shell(locals(), poll_func=aop.work)

349
proxyclient/hv/trace_aop.py Normal file
View file

@ -0,0 +1,349 @@
# SPDX-License-Identifier: MIT
from m1n1.trace import Tracer
from m1n1.trace.dart import DARTTracer
from m1n1.trace.asc import ASCTracer, EP, EPState, msg, msg_log, DIR, EPContainer
from m1n1.utils import *
from m1n1.constructutils import *
from m1n1.fw.afk.rbep import *
from m1n1.fw.afk.epic import *
from m1n1.fw.aop import *
from m1n1.fw.aop.ipc import *
import sys
class AFKRingBufSniffer(AFKRingBuf):
def __init__(self, ep, state, base, size):
super().__init__(ep, base, size)
self.state = state
self.rptr = getattr(state, "rptr", 0)
def update_rptr(self, rptr):
self.state.rptr = rptr
def update_wptr(self):
raise NotImplementedError()
def get_wptr(self):
return struct.unpack("<I", self.read_buf(2 * self.BLOCK_SIZE, 4))[0]
def read_buf(self, off, size):
return self.ep.dart.ioread(0, self.base + off, size)
class AFKEp(EP):
BASE_MESSAGE = AFKEPMessage
def __init__(self, tracer, epid):
super().__init__(tracer, epid)
self.txbuf = None
self.rxbuf = None
self.state.txbuf = EPState()
self.state.rxbuf = EPState()
self.state.shmem_iova = None
self.state.txbuf_info = None
self.state.rxbuf_info = None
self.state.verbose = 1
def start(self):
self.create_bufs()
def create_bufs(self):
if not self.state.shmem_iova:
return
if not self.txbuf and self.state.txbuf_info:
off, size = self.state.txbuf_info
self.txbuf = AFKRingBufSniffer(self, self.state.txbuf,
self.state.shmem_iova + off, size)
if not self.rxbuf and self.state.rxbuf_info:
off, size = self.state.rxbuf_info
self.rxbuf = AFKRingBufSniffer(self, self.state.rxbuf,
self.state.shmem_iova + off, size)
Init = msg_log(0x80, DIR.TX)
Init_Ack = msg_log(0xa0, DIR.RX)
GetBuf = msg_log(0x89, DIR.RX)
Shutdown = msg_log(0xc0, DIR.TX)
Shutdown_Ack = msg_log(0xc1, DIR.RX)
@msg(0xa1, DIR.TX, AFKEP_GetBuf_Ack)
def GetBuf_Ack(self, msg):
self.state.shmem_iova = msg.DVA
self.txbuf = None
self.rxbuf = None
self.state.txbuf = EPState()
self.state.rxbuf = EPState()
self.state.txbuf_info = None
self.state.rxbuf_info = None
@msg(0xa2, DIR.TX, AFKEP_Send)
def Send(self, msg):
for data in self.txbuf.read():
#if self.state.verbose >= 3:
if True:
self.log(f"===TX DATA=== epid={self.epid} rptr={self.txbuf.state.rptr:#x}")
chexdump(data)
self.log(f"===END DATA===")
self.log("Backtrace on TX data:")
self.hv.bt()
self.handle_ipc(data, dir=">")
return True
Hello = msg_log(0xa3, DIR.TX)
@msg(0x85, DIR.RX, AFKEPMessage)
def Recv(self, msg):
for data in self.rxbuf.read():
#if self.state.verbose >= 3:
if True:
self.log(f"===RX DATA=== epid={self.epid} rptr={self.rxbuf.state.rptr:#x}")
chexdump(data)
self.log(f"===END DATA===")
self.handle_ipc(data, dir="<")
return True
def handle_ipc(self, data, dir=None):
pass
@msg(0x8a, DIR.RX, AFKEP_InitRB)
def InitTX(self, msg):
off = msg.OFFSET * AFKRingBuf.BLOCK_SIZE
size = msg.SIZE * AFKRingBuf.BLOCK_SIZE
self.state.txbuf_info = (off, size)
self.create_bufs()
@msg(0x8b, DIR.RX, AFKEP_InitRB)
def InitRX(self, msg):
off = msg.OFFSET * AFKRingBuf.BLOCK_SIZE
size = msg.SIZE * AFKRingBuf.BLOCK_SIZE
self.state.rxbuf_info = (off, size)
self.create_bufs()
class DummyAFKEp(AFKEp):
def handle_ipc(self, data, dir=None):
pass
class EPICEp(AFKEp):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pending_call = None
self.pending_cmd = None
def handle_hello(self, hdr, sub, fd):
if sub.type != 0xc0:
return False
payload = fd.read()
name = payload.split(b"\0")[0].decode("ascii")
self.log(f"Hello! (endpoint {name})")
return True
def handle_notify(self, hdr, sub, fd):
for calltype in CALLTYPES:
if calltype.matches(hdr, sub):
call = calltype.from_stream(fd)
self.trace_call_early(call)
self.pending_call = call
return True
return False
def handle_reply(self, hdr, sub, fd):
if self.pending_call is None:
return False
call = self.pending_call
call.read_resp(fd)
self.trace_call(call)
self.pending_call = None
return True
def dispatch_ipc(self, dir, hdr, sub, fd):
if sub.category == EPICCategory.COMMAND:
return self.handle_notify(hdr, sub, fd)
if dir == "<" and sub.category == EPICCategory.REPORT:
return self.handle_hello(hdr, sub, fd)
if dir == ">" and sub.category == EPICCategory.NOTIFY:
return self.handle_notify(hdr, sub, fd)
if dir == "<" and sub.category == EPICCategory.REPLY:
return self.handle_reply(hdr, sub, fd)
def handle_ipc(self, data, dir=None):
fd = BytesIO(data)
hdr = EPICHeader.parse_stream(fd)
sub = EPICSubHeaderVer2.parse_stream(fd)
if not getattr(self, 'VERBOSE', False):
return
self.log(f"{dir} 0x{hdr.channel:x} Type {hdr.type} Ver {hdr.version} Tag {hdr.seq}")
self.log(f" Len {sub.length} Ver {sub.version} Cat {sub.category} Type {sub.type:#x} Ts {sub.timestamp:#x}")
self.log(f" Unk1 {sub.unk1:#x} Unk2 {sub.unk2:#x}")
if self.dispatch_ipc(dir, hdr, sub, fd):
return
def trace_call_early(self, call):
# called at TX time
if isinstance(call, IndirectCall):
call.read_txbuf(self)
def trace_call(self, call):
if isinstance(call, IndirectCall):
call.read_rxbuf(self)
call = call.unwrap()
call.dump(self.log)
class SPUAppEp(EPICEp):
SHORT = "SPUApp"
class AccelEp(EPICEp):
SHORT = "accel"
class GyroEp(EPICEp):
SHORT = "gyro"
class LASEp(EPICEp):
SHORT = "las"
class WakeHintEp(EPICEp):
SHORT = "wakehint"
class UNK26Ep(EPICEp):
SHORT = "unk26"
class AudioEp(EPICEp):
SHORT = "aop-audio"
VERBOSE = True
class VoiceTriggerEp(EPICEp):
SHORT = "aop-voicetrigger"
VERBOSE = True
class AOPTracer(ASCTracer, AOPBase):
ENDPOINTS = {
0x20: SPUAppEp,
0x21: AccelEp,
0x22: GyroEp,
0x24: LASEp,
0x25: WakeHintEp,
0x26: UNK26Ep,
0x27: AudioEp,
0x28: VoiceTriggerEp,
}
def __init__(self, hv, devpath, verbose=False):
self.default_bootargs = None
super().__init__(hv, devpath, verbose)
self.u = hv.u
AOPBase.__init__(self, hv.u, self.dev)
def start(self, *args):
self.default_bootargs = self.read_bootargs()
super().start(*args)
def w_CPU_CONTROL(self, val):
if val.RUN:
self.bootargs = self.read_bootargs()
self.log("Bootargs patched by AP:")
self.default_bootargs.dump_diff(self.bootargs, self.log)
self.log("(End of list)")
super().w_CPU_CONTROL(val)
@classmethod
def replay(cls, f, passthru=False):
epmap = dict()
epcont = EPContainer()
class FakeASCTracer:
def __init__(self):
self.hv = None
def log(self, str):
print(str)
asc_tracer = FakeASCTracer()
for cls in cls.mro():
eps = getattr(cls, "ENDPOINTS", None)
if eps is None:
break
for k, v in eps.items():
if k in epmap:
continue
ep = v(asc_tracer, k)
epmap[k] = ep
if getattr(epcont, ep.name, None):
ep.name = f"{ep.name}{k:02x}"
setattr(epcont, ep.name, ep)
ep.start()
def readdump(firstline, hdr, f):
l = firstline
assert hdr in l
postscribe = l[l.index(hdr) + len(hdr):]
annotation = dict([s.split("=") for s \
in postscribe.strip().split(" ")])
dump = []
for l in f:
if "===END DATA===" in l:
break
dump.append(l)
return chexundump("".join(dump)), annotation
def read_txbuf(icall, ep):
hdr = "===COMMAND TX DATA==="
for l in f:
if hdr in l:
break
data, annot = readdump(l, hdr, f)
assert int(annot["addr"], 16) == icall.args.txbuf
icall.txbuf = data
def read_rxbuf(icall, ep):
hdr = "===COMMAND RX DATA==="
for l in f:
if hdr in l:
break
data, annot = readdump(l, hdr, f)
assert int(annot["addr"], 16) == icall.rets.rxbuf
icall.rxbuf = data
IndirectCall.read_rxbuf = read_rxbuf
IndirectCall.read_txbuf = read_txbuf
for l in f:
if (rxhdr := "===RX DATA===") in l:
dir = "<"
hdr = rxhdr
elif (txhdr := "===TX DATA===") in l:
dir = ">"
hdr = txhdr
else:
if passthru:
print(l, end="")
continue
data, annot = readdump(l, hdr, f)
epid = int(annot["epid"])
epmap[epid].handle_ipc(data, dir)
if __name__ == "__main__":
# We can replay traces by saving the textual output of live tracing
# and then passing it to this script.
with open(sys.argv[1]) as f:
AOPTracer.replay(f)
sys.exit(0)
dart_aop_tracer = DARTTracer(hv, "/arm-io/dart-aop", verbose=4)
dart_aop_tracer.start()
dart_aop_base = u.adt["/arm-io/dart-aop"].get_reg(0)[0]
#hv.trace_range(irange(*u.adt["/arm-io/dart-aop"].get_reg(1)))
#hv.trace_range(irange(*u.adt["/arm-io/aop"].get_reg(1)))
#hv.trace_range(irange(*u.adt["/arm-io/aop"].get_reg(3)))
#hv.trace_range(irange(*u.adt["/arm-io/admac-aop-audio"].get_reg(0)))
aop_tracer = AOPTracer(hv, "/arm-io/aop", verbose=1)
aop_tracer.start(dart_aop_tracer.dart)

View file

@ -0,0 +1,365 @@
from enum import IntEnum
from construct import *
from io import BytesIO
from m1n1.utils import FourCC, chexdump
from m1n1.constructutils import ZPadding
from m1n1.fw.afk.epic import EPICCmd, EPICCategory
EPICSubHeaderVer2 = Struct(
"length" / Int32ul,
"version" / Default(Int8ul, 2),
"category" / EPICCategory,
"type" / Hex(Int16ul),
"timestamp" / Default(Int64ul, 0),
"unk1" / Default(Hex(Int32ul), 0),
"unk2" / Default(Hex(Int32ul), 0),
)
class AOPAudioPropKey(IntEnum):
IS_READY = 0x01
UNK_11 = 0x11
PLACEMENT = 0x1e
UNK_21 = 0x21
ORIENTATION = 0x2e
LOCATION_ID = 0x30
SERIAL_NO = 0x3e
VENDOR_ID = 0x5a
PRODUCT_ID = 0x5b
SERVICE_CONTROLLER = 0x64
DEVICE_COUNT = 0x65
VERSION = 0x67
class EPICCall:
@classmethod
def matches(cls, hdr, sub):
return int(sub.type) == cls.TYPE
def _args_fixup(self):
pass
def __init__(self, *args, **kwargs):
if args:
self.args = args[0]
else:
self.args = Container(**kwargs)
self._args_fixup()
self.rets = None
@classmethod
def from_stream(cls, f):
return cls(cls.ARGS.parse_stream(f))
def dump(self, logger=None):
if logger is None:
logger = print
args_fmt = [f"{k}={v}" for (k, v) in self.args.items() if k != "_io"]
rets_fmt = [f"{k}={v}" for (k, v) in self.rets.items() if k != "_io"]
logger(f"{type(self).__name__}({', '.join(args_fmt)}) -> ({', '.join(rets_fmt)})")
def read_resp(self, f):
self.rets = self.RETS.parse_stream(f)
CALLTYPES = []
def reg_calltype(calltype):
CALLTYPES.append(calltype)
return calltype
@reg_calltype
class GetHIDDescriptor(EPICCall):
TYPE = 0x1
ARGS = Struct(
"blank" / Const(0x0, Int32ul),
)
RETS = Struct(
"retcode" / Default(Hex(Int32ul), 0),
"descriptor" / HexDump(GreedyBytes),
)
@reg_calltype
class GetProperty(EPICCall):
TYPE = 0xa
ARGS = Struct(
"blank" / Const(0x0, Int32ul),
"key" / Enum(Int32ul, AOPAudioPropKey),
)
RETS = Struct(
#"blank" / Const(0x0, Int32ul),
"value" / GreedyBytes,
)
@reg_calltype
class WrappedCall(EPICCall):
SUBCLASSES = {}
TYPE = 0x20
HDR = Struct(
"blank" / Const(0x0, Int32ul),
"unk1" / Hex(Const(0xffffffff, Int32ul)),
"calltype" / Hex(Int32ul),
"blank2" / ZPadding(16),
"pad" / Hex(Int32ul),
"len" / Hex(Int64ul),
"residue" / HexDump(GreedyBytes),
)
@classmethod
def from_stream(cls, f):
payload = f.read()
subsub = cls.HDR.parse(payload)
calltype = int(subsub.calltype)
subcls = cls.SUBCLASSES.get(calltype, None)
if subcls is None:
raise ValueError(f"unknown calltype {calltype:#x}")
return subcls(subcls.ARGS.parse(payload))
@classmethod
def reg_subclass(cls, cls2):
cls.SUBCLASSES[int(cls2.CALLTYPE)] = cls2
return cls2
@classmethod
def matches(cls, hdr, sub):
return sub.category == EPICCategory.NOTIFY and sub.type == cls.TYPE
def check_retcode(self):
if self.rets.retcode:
self.dump()
raise ValueError(f"retcode {self.rets.retcode} in {str(type(self))} (call dumped, see above)")
@WrappedCall.reg_subclass
class AttachDevice(WrappedCall):
CALLTYPE = 0xc3_00_00_02
ARGS = Struct(
"blank" / Const(0x0, Int32ul),
"unk1" / Hex(Const(0xffffffff, Int32ul)),
"calltype" / Hex(Const(0xc3000002, Int32ul)),
"blank2" / ZPadding(16),
"pad" / Padding(4),
"len" / Hex(Const(0x2c, Int64ul)),
"devid" / FourCC,
"pad" / Padding(4),
)
RETS = Struct(
"retcode" / Default(Hex(Int32ul), 0),
"unk" / HexDump(GreedyBytes),
)
@WrappedCall.reg_subclass
class ProbeDevice(WrappedCall):
CALLTYPE = 0xc3_00_00_01
ARGS = Struct(
"blank" / Const(0x0, Int32ul),
"unk1" / Hex(Const(0xffffffff, Int32ul)),
"calltype" / Hex(Const(0xc3000001, Int32ul)),
"blank2" / ZPadding(16),
"pad" / Padding(4),
"len" / Hex(Const(0x28, Int64ul)),
"devno" / Int32ul,
)
RETS = Struct(
"retcode" / Default(Hex(Int32ul), 0),
"devid" / FourCC,
"blank2" / Const(0x0, Int32ul),
"unk1" / Const(8, Int32ul),
"blank3" / Const(0x0, Int32ul),
"unk2" / Hex(Const(0x01_0d_1c_20, Int32ul)),
"blank4" / Const(0x0, Int32ul),
"remainder" / HexDump(GreedyBytes),
)
PDMConfig = Struct(
"unk1" / Int32ul,
"clockSource" / FourCC,
"pdmFrequency" / Int32ul,
"unk3_clk" / Int32ul,
"unk4_clk" / Int32ul,
"unk5_clk" / Int32ul,
"channelPolaritySelect" / Hex(Int32ul),
"unk7" / Hex(Int32ul),
"unk8" / Hex(Int32ul),
"unk9" / Hex(Int16ul),
"ratios" / Struct(
"r1" / Int8ul,
"r2" / Int8ul,
"r3" / Int8ul,
"pad" / Default(Int8ul, 0),
),
"filterLengths" / Hex(Int32ul),
"coeff_bulk" / Int32ul,
#"coefficients" / Struct(
# "c1" / Int32sl[this._.ratios.r3 * 4 + 4],
# "c2" / Int32sl[this._.ratios.r2 * 4 + 4],
# "c3" / Int32sl[this._.ratios.r1 * 4 + 4],
#),
#"junk" / Padding(
# this.coeff_bulk * 4 - 48 \
# - (this.ratios.r1 + this.ratios.r2 + this.ratios.r3) * 16
#),
"coefficients" / Int32sl[
(this.ratios.r1 + this.ratios.r2 + this.ratios.r3) * 4 + 12
],
"junk" / Padding(
lambda this: max(0,
this.coeff_bulk * 4 - 48 \
- (this.ratios.r1 + this.ratios.r2 + this.ratios.r3) * 16
)
),
"unk10" / Int32ul, # maybe
"micTurnOnTimeMs" / Int32ul,
"blank" / ZPadding(16),
"unk11" / Int32ul,
"micSettleTimeMs" / Int32ul,
"blank2" / ZPadding(69),
)
DecimatorConfig = Struct(
"latency" / Int32ul,
"ratios" / Struct(
"r1" / Int8ul,
"r2" / Int8ul,
"r3" / Int8ul,
"pad" / Default(Int8ul, 0),
),
"filterLengths" / Hex(Int32ul),
"coeff_bulk" / Int32ul,
"coefficients" / Int32sl[
(this.ratios.r1 + this.ratios.r2 + this.ratios.r3) * 4 + 12
],
"junk" / Padding(
lambda this: max(0,
this.coeff_bulk * 4 - 48 \
- (this.ratios.r1 + this.ratios.r2 + this.ratios.r3) * 16
)
),
)
PowerSetting = Struct(
"devid" / FourCC,
"cookie" / Int32ul,
"pad" / Padding(4),
"blank" / ZPadding(8),
"target_pstate" / FourCC,
"unk2" / Int32ul,
"blank2" / ZPadding(20),
)
DEVPROPS = {
('hpai', 202): PowerSetting,
('lpai', 202): PowerSetting,
('hpai', 200): FourCC,
('lpai', 200): FourCC,
('pdm0', 200): PDMConfig,
('pdm0', 210): DecimatorConfig,
('lpai', 301): Struct(
"unk1" / Int32ul,
"unk2" / Int32ul,
"unk3" / Int32ul,
"unk4" / Int32ul,
),
}
@WrappedCall.reg_subclass
class GetDeviceProp(WrappedCall):
CALLTYPE = 0xc3_00_00_04
ARGS = Struct(
"blank" / Const(0x0, Int32ul),
"unk1" / Hex(Const(0xffffffff, Int32ul)),
"calltype" / Hex(Const(0xc3000004, Int32ul)),
"blank2" / ZPadding(16),
"pad" / Padding(4),
"len" / Hex(Const(0x30, Int64ul)),
"devid" / FourCC,
"modifier" / Int32ul,
"unk6" / Hex(Const(0x01, Int32ul)),
)
RETS = Struct(
"retcode" / Default(Hex(Int32ul), 0),
"len" / Optional(Int32ul),
"data" / Switch(lambda s: (s._params.devid, s._params.modifier),
DEVPROPS,
default=HexDump(GreedyBytes))
)
def read_resp(self, f):
self.rets = self.RETS.parse_stream(f,
devid=self.args.devid, modifier=self.args.modifier
)
@WrappedCall.reg_subclass
class SetDeviceProp(WrappedCall):
CALLTYPE = 0xc3_00_00_05
ARGS = Struct(
"blank" / Const(0x0, Int32ul),
"unk1" / Hex(Const(0xffffffff, Int32ul)),
"calltype" / Hex(Const(0xc3000005, Int32ul)),
"blank2" / ZPadding(16),
"pad" / Padding(4),
"len" / Hex(Int64ul), # len(this.data) + 0x30
"devid" / FourCC,
"modifier" / Int32ul,
"len2" / Hex(Int32ul), # len(this.data)
"data" / Switch(lambda s: (s.devid, s.modifier),
DEVPROPS,
default=HexDump(GreedyBytes))
)
RETS = Struct(
"retcode" / Default(Hex(Int32ul), 0),
"unk" / HexDump(GreedyBytes),
)
def _args_fixup(self):
data_len = len(self.ARGS.build(Container(len=0, len2=0, **self.args))) - 52
if 'len' not in self.args:
self.args.len = data_len + 0x30
if 'len2' not in self.args:
self.args.len2 = data_len
@reg_calltype
class IndirectCall(EPICCall):
ARGS = EPICCmd
RETS = EPICCmd
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.txbuf = None
self.rxbuf = None
@classmethod
def matches(cls, hdr, sub):
return sub.category == EPICCategory.COMMAND
def read_txbuf(self, ep):
cmd = self.args
ep.dart.invalidate_cache()
self.txbuf = ep.dart.ioread(0, cmd.txbuf, cmd.txlen)
# dump the command data for offline replays of traces
ep.log(f"===COMMAND TX DATA=== addr={cmd.txbuf:#x}")
chexdump(self.txbuf)
ep.log(f"===END DATA===")
def read_rxbuf(self, ep):
cmd = self.rets
ep.dart.invalidate_cache()
self.rxbuf = ep.dart.ioread(0, cmd.rxbuf, cmd.rxlen)
ep.log(f"===COMMAND RX DATA=== addr={cmd.rxbuf:#x}")
chexdump(self.rxbuf)
ep.log(f"===END DATA===")
def unwrap(self):
fd = BytesIO()
fd.write(b"\x00\x00\x00\x00")
fd.write(self.txbuf)
fd.seek(0)
wrapped = WrappedCall.from_stream(fd)
fd = BytesIO()
fd.write(b"\x00\x00\x00\x00")
fd.write(self.rxbuf)
fd.seek(0)
wrapped.read_resp(fd)
return wrapped