experiments/speaker_amp.py: Drop 'polling console'

Drop the 'polling console' code as there are now better ways to achieve
the same. (One can use USB_VUART for second proxy session, see 85ee2f24)

Signed-off-by: Martin Povišer <povik@protonmail.com>
This commit is contained in:
Martin Povišer 2022-02-07 17:22:09 +01:00 committed by Hector Martin
parent 532f260fae
commit 9bd84dfbb9

View file

@ -13,90 +13,13 @@ sys.path.append(str(pathlib.Path(__file__).resolve().parents[1]))
# (expects mono, 24-bit signed samples padded to 32 bits on the msb side)
import argparse
import os.path
import code
import sys
from m1n1.setup import *
from m1n1.hw.dart import DART, DARTRegs
from m1n1.hw.admac import *
from m1n1.hw.i2c import I2C
# this here is an embedded console so that one can poke while
# the descriptors keep being filled in
class PollingConsole(code.InteractiveConsole):
def __init__(self, locals=None, filename="<console>"):
global patch_stdout, PromptSession, FileHistory
global Thread, Queue, Empty
from prompt_toolkit import PromptSession
from prompt_toolkit.history import FileHistory
from prompt_toolkit.patch_stdout import patch_stdout
from threading import Thread
from queue import Queue, Empty
super().__init__(locals, filename)
self._qu_input = Queue()
self._qu_result = Queue()
self._should_exit = False
self.session = PromptSession(history=FileHistory(os.path.expanduser("~/.m1n1-history")))
self._other_thread = Thread(target=self._other_thread_main, daemon=False)
self._other_thread.start()
def __enter__(self):
self._patch = patch_stdout()
self._patch.__enter__()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._patch.__exit__(exc_type, exc_val, exc_tb)
def _other_thread_main(self):
first = True
while True:
if first:
more_input = False
first = False
else:
more_input = self._qu_result.get()
try:
self._qu_input.put(self.session.prompt("(♫♫) " if not more_input else "... "))
except EOFError:
self._qu_input.put(None)
return
def poll(self):
if self._should_exit:
return False
try:
line = self._qu_input.get(timeout=0.01)
except Empty:
return True
if line is None:
self._should_exit = True
return False
self._qu_result.put(self.push(line))
return True
class NoConsole:
def poll(self):
time.sleep(0.01)
return True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
argparser = argparse.ArgumentParser()
argparser.add_argument("--console", action='store_true')
argparser.add_argument("-f", "--file", "--input", "--samples",
type=str, default=None,
help='input filename to take samples from ' \
@ -105,11 +28,8 @@ argparser.add_argument("-b", "--bufsize", type=int, default=1024*32,
help='size of buffers to keep submitting to DMA')
args = argparser.parse_args()
if args.console and args.file is None:
print("Specify file with samples (option -f) if using console")
sys.exit(1)
inputf = open(args.file, "rb") if args.file is not None else sys.stdin.buffer
inputf = open(args.file, "rb") if args.file is not None \
else sys.stdin.buffer
p.pmgr_adt_clocks_enable("/arm-io/i2c1")
p.pmgr_adt_clocks_enable("/arm-io/admac-sio")
@ -162,7 +82,8 @@ p.write32(0x23c1002d4, 0x76a03) # take out of reset
tx_chan.submit(inputf.read(args.bufsize))
tx_chan.enable()
while tx_chan.can_submit():
tx_chan.submit(inputf.read(args.bufsize))
# accesses to 0x100-sized blocks in the +0x4000 region require
# the associated enable bit cleared, or they cause SErrors
@ -201,20 +122,10 @@ i2c1.write_reg(0x31, 0x03, [0x0])
# take the IC out of software shutdown
i2c1.write_reg(0x31, 0x02, [0x0c])
with (PollingConsole(locals()) if args.console else NoConsole()) as cons:
try:
while cons.poll():
while (not tx_chan.can_submit()) and cons.poll():
tx_chan.poll()
if not cons.poll():
break
tx_chan.submit(inputf.read(args.bufsize))
except KeyboardInterrupt:
pass
while True:
while tx_chan.can_submit():
tx_chan.submit(inputf.read(args.bufsize))
tx_chan.poll()
# mute
i2c1.write_reg(0x31, 0x02, [0x0d])