fish-shell/build_tools/pexpect_helper.py
Johannes Altmanninger 965bc78d33 Work around weird CI failures due to missing pre-execute \r\n
I forgot that 610338cc70 (On undo after execute, restore the cursor
position, 2024-12-21) would cause a fallout to tests:

It makes us reuse in another place our usual cursor-movement sequences.

This causes failures like this (linebreaks added for readability):

	Testing file pexpects/bind.py:Failed to match pattern: (?:\r\n|\x1b\[2 q)[^\n]*def abc\r\n
	bind.py:45: timeout from expect_prompt(TO_END + "def abc\r\n")  # emacs transpose words, default timeout: no delay

	Escaped buffer:
	\x1b[?2004h\x1b[>4;1m\x1b[=5u\x1b=\rprompt 2>echo \rprompt 2>echo abc \rprompt 2>echo def abc\r
	prompt 2>echo def abc\x1b[?2004l\x1b[>4;0m\x1b[=0u\x1b>\x1b]133;C\x07def abc\r\n\x1b]133;D;0\x07\x1b[?25h⏎
	\r⏎ \r\rprompt 3>\x1b[?2004h\x1b[>4;1m\x1b[=5u\x1b=

It seems that we don't print anything where we should print something
like "\r\n" or "\e[2 q" to move the cursor below the command line.

I haven't gotten to the bottom of this but it might be related to
terminfo. Once we get rid of that, we can unconditionally print
our canonical movement sequences.

This issue seems to only affect tests, since fish operates fine in
a sourcehut CI system. Let's ignore it for now.
2024-12-21 14:37:57 +01:00

386 lines
13 KiB
Python

"""pexpect_helper provides a wrapper around the pexpect module.
This module exposes a single class SpawnedProc, which wraps pexpect.spawn().
This exposes a pseudo-tty, which fish or another process may talk to.
The send() function may be used to send data to fish, and the expect_* family
of functions may be used to match what is output to the tty.
Example usage:
sp = SpawnedProc() # this launches fish
sp.expect_prompt() # wait for a prompt
sp.sendline("echo hello world")
sp.expect_prompt("hello world")
"""
from __future__ import print_function
import inspect
import os
import os.path
import re
import sys
import time
import pexpect
from signal import Signals
# Default timeout for failing to match.
TIMEOUT_SECS = 5
UNEXPECTED_SUCCESS = object()
# When rendering fish's output, remove the control sequences that modify terminal state,
# to avoid confusing the calling terminal. No need to replace things like colors and cursor
# movement that are harmless and/or will not leak anyway.
SANITIZE_FOR_PRINTING_RE = re.compile(
r"""
\x1b\[\?1004[hl]
| \x1b\[\?2004[hl]
| \x1b\[>4;[10]m
| \x1b\[>5u
| \x1b\[<1u
| \x1b=
| \x1b>
| \x1b\].*?\x07
""",
re.VERBOSE)
def get_prompt_re(counter):
"""Return a regular expression for matching a with a given prompt counter."""
return re.compile("prompt %d>" % counter)
def get_callsite():
"""Return a triple (filename, line_number, line_text) of the call site location."""
callstack = inspect.getouterframes(inspect.currentframe())
for f in callstack:
# Skip call sites from this file.
if inspect.getmodule(f.frame) is Message.MODULE:
continue
# Skip functions which have a truthy callsite_skip attribute.
if getattr(f.function, "callsite_skip", False):
continue
return (os.path.basename(f.filename), f.lineno, f.code_context)
return ("Unknown", -1, "")
def escape(s):
"""Escape the string 's' to make it human-understandable."""
res = []
for c in s:
if c == "\n":
res.append("\\n")
elif c == "\r":
res.append("\\r")
elif c == "\t":
res.append("\\t")
elif c.isprintable():
res.append(c)
else:
res.append("\\x{:02x}".format(ord(c)))
return "".join(res)
def pexpect_error_type(err):
"""Return a human-readable description of a pexpect error type."""
if isinstance(err, pexpect.EOF):
return "EOF"
elif isinstance(err, pexpect.TIMEOUT):
return "timeout"
elif err is UNEXPECTED_SUCCESS:
return "unexpected success"
else:
return "unknown error"
class Message(object):
"""Some text either sent-to or received-from the spawned proc.
Attributes:
dir: the message direction, either DIR_INPUT or DIR_OUTPUT
filename: the name of the file from which the message was sent
text: the text of the messages
when: a timestamp of when the message was sent
"""
# Input is input into fish shell ("sent data").
DIR_INPUT = " INPUT"
# Output means output from fish shell ("received data").
DIR_OUTPUT = "OUTPUT"
MODULE = sys.modules[__name__]
def __init__(self, dir, text, when):
"""Construct from a direction, message text and timestamp."""
self.dir = dir
self.filename, self.lineno, _ = get_callsite()
self.text = text
self.when = when
@staticmethod
def sent_input(text, when):
"""Return an input message with the given text."""
return Message(Message.DIR_INPUT, text, when)
@staticmethod
def received_output(text, when):
"""Return a output message with the given text."""
return Message(Message.DIR_OUTPUT, text, when)
# Sequences for moving the cursor below the commandline. This happens before executing.
MOVE_TO_END: str = r"(?:\r\n|\x1b\[2 q|)"
TO_END: str = MOVE_TO_END + r"[^\n]*"
TO_END_SUFFIX: str = r"[^\n]*" + MOVE_TO_END
class SpawnedProc(object):
"""A process, talking to our ptty. This wraps pexpect.spawn.
Attributes:
colorize: whether error messages should have ANSI color escapes
messages: list of Message sent and received, in-order
start_time: the timestamp of the first message, or None if none yet
spawn: the pexpect.spawn value
prompt_counter: the index of the prompt. This cooperates with the fish_prompt
function to ensure that each printed prompt is distinct.
"""
def __init__(
self, name="fish", timeout=TIMEOUT_SECS, env=os.environ.copy(), **kwargs
):
"""Construct from a name, timeout, and environment.
Args:
name: the name of the executable to launch, as a key into the
environment dictionary. By default this is 'fish' but may be
other executables.
timeout: A timeout to pass to pexpect. This indicates how long to wait
before giving up on some expected output.
env: a string->string dictionary, describing the environment variables.
"""
if name not in env:
raise ValueError("'%s' variable not found in environment" % name)
exe_path = env.get(name)
self.colorize = sys.stdout.isatty() or env.get("FISH_FORCE_COLOR", "0") == "1"
self.messages = []
self.start_time = None
self.spawn = pexpect.spawn(
exe_path, env=env, encoding="utf-8", timeout=timeout, **kwargs
)
self.spawn.delaybeforesend = None
self.prompt_counter = 0
def time_since_first_message(self):
"""Return a delta in seconds since the first message, or 0 if this is the first."""
now = time.monotonic()
if not self.start_time:
self.start_time = now
return now - self.start_time
def send(self, s):
"""Cover over pexpect.spawn.send().
Send the given string to the tty, returning the number of bytes written.
"""
res = self.spawn.send(s)
when = self.time_since_first_message()
self.messages.append(Message.sent_input(s, when))
return res
def sendline(self, s):
"""Cover over pexpect.spawn.sendline().
Send the given string + linesep to the tty, returning the number of bytes written.
"""
return self.send(s + os.linesep)
def expect_re(self, pat, pat_desc=None, unmatched=None, shouldfail=False, **kwargs):
"""Cover over pexpect.spawn.expect().
Consume all "new" output of self.spawn until the given pattern is matched, or
the timeout is reached.
Note that output between the current position and the location of the match is
consumed as well.
The pattern is typically a regular expression in string form, but may also be
any of the types accepted by pexpect.spawn.expect().
If the 'unmatched' parameter is given, it is printed as part of the error message
of any failure.
On failure, this prints an error and exits.
"""
try:
self.spawn.expect(pat, **kwargs)
when = self.time_since_first_message()
self.messages.append(
Message.received_output(self.spawn.match.group(), when)
)
# When a match is found,
# spawn.match is the MatchObject that produced it.
# This can be used to check what exactly was matched.
if shouldfail:
err = UNEXPECTED_SUCCESS
if not pat_desc:
pat_desc = str(pat)
self.report_exception_and_exit(pat_desc, unmatched, err)
return self.spawn.match
except pexpect.ExceptionPexpect as err:
if shouldfail:
return True
if not pat_desc:
pat_desc = str(pat)
self.report_exception_and_exit(pat_desc, unmatched, err)
def expect_str(self, s, **kwargs):
"""Cover over expect_re() which accepts a literal string."""
return self.expect_re(re.escape(s), **kwargs)
def expect_prompt(self, *args, increment=True, **kwargs):
"""Convenience function which matches some text and then a prompt.
Match the given positional arguments as expect_re, and then look
for a prompt.
If increment is set, then this should be a new prompt and the prompt counter
should be bumped; otherwise this is not a new prompt.
Returns None on success, and exits on failure.
Example:
sp.sendline("echo hello world")
sp.expect_prompt("hello world")
"""
if args:
self.expect_re(*args, **kwargs)
if increment:
self.prompt_counter += 1
self.expect_re(
get_prompt_re(self.prompt_counter),
pat_desc="prompt %d" % self.prompt_counter,
)
def report_exception_and_exit(self, pat, unmatched, err):
"""Things have gone badly.
We have an exception 'err', some pexpect.ExceptionPexpect.
Report it to stdout, along with the offending call site.
If 'unmatched' is set, print it to stdout.
"""
# Close the process so we can get the status
self.spawn.close()
colors = self.colors()
failtype = pexpect_error_type(err)
# If we get an EOF, we check if the process exited with a signal.
# This shows us e.g. if it crashed
if failtype == 'EOF' and self.spawn.signalstatus is not None and self.spawn.signalstatus != 0:
failtype = "SIGNAL " + Signals(self.spawn.signalstatus).name
fmtkeys = {"failtype": failtype, "pat": escape(pat)}
fmtkeys.update(**colors)
filename, lineno, code_context = get_callsite()
fmtkeys["filename"] = filename
fmtkeys["lineno"] = lineno
fmtkeys["code"] = "\n".join([n.strip() for n in code_context if n])
if unmatched:
print(
"{RED}Error: {NORMAL}{BOLD}{unmatched}{RESET}".format(
unmatched=unmatched, **fmtkeys
)
)
print(
"{RED}Failed to match pattern:{NORMAL} {BOLD}{pat}{RESET}".format(**fmtkeys)
)
print(
"{filename}:{lineno}: {BOLD}{failtype}{RESET} from {code}".format(**fmtkeys)
)
print("")
print("{CYAN}Escaped buffer:{RESET}".format(**colors))
print(escape(self.spawn.before))
print("")
print("{CYAN}When written to the tty, this looks like:{RESET}".format(**colors))
print("{CYAN}<-------{RESET}".format(**colors))
sys.stdout.write(SANITIZE_FOR_PRINTING_RE.sub('', self.spawn.before))
sys.stdout.flush()
maybe_nl=""
if not self.spawn.before.endswith("\n"):
maybe_nl="\n{CYAN}(no trailing newline)".format(**colors)
print("{RESET}{maybe_nl}{CYAN}------->{RESET}".format(maybe_nl=maybe_nl, **colors))
print("")
# Show the last 10 messages.
print("Last 10 messages:")
delta = None
for m in self.messages[-10:]:
etext = escape(m.text)
timestamp = m.when * 1000.0
# Use relative timestamps and add a sign.
# This assumes a max length of 10^10 milliseconds (115 days) for the initial timestamp,
# and 11.5 days for the delta.
if delta:
timestamp -= delta
timestampstr = "{timestamp:+10.2f} ms".format(timestamp=timestamp)
else:
timestampstr = "{timestamp:10.2f} ms".format(timestamp=timestamp)
delta = m.when * 1000.0
print(
"{dir} {timestampstr} (Line {lineno}): {BOLD}{etext}{RESET}".format(
dir=m.dir,
timestampstr=timestampstr,
filename=m.filename,
lineno=m.lineno,
etext=etext,
**colors
)
)
print("")
sys.exit(1)
def sleep(self, secs):
"""Cover over time.sleep()."""
time.sleep(secs)
def colors(self):
"""Return a dictionary mapping color names to ANSI escapes"""
def ansic(n):
"""Return either an ANSI escape sequence for a color, or empty string."""
return "\033[%dm" % n if self.colorize else ""
return {
"RESET": ansic(0),
"BOLD": ansic(1),
"NORMAL": ansic(39),
"BLACK": ansic(30),
"RED": ansic(31),
"GREEN": ansic(32),
"YELLOW": ansic(33),
"BLUE": ansic(34),
"MAGENTA": ansic(35),
"CYAN": ansic(36),
"LIGHTGRAY": ansic(37),
"DARKGRAY": ansic(90),
"LIGHTRED": ansic(91),
"LIGHTGREEN": ansic(92),
"LIGHTYELLOW": ansic(93),
"LIGHTBLUE": ansic(94),
"LIGHTMAGENTA": ansic(95),
"LIGHTCYAN": ansic(96),
"WHITE": ansic(97),
}
def control(char: str) -> str:
""" Returns the char sent when control is pressed along the given key. """
assert len(char) == 1
char = char.lower()
if ord("a") <= ord(char) <= ord("z"):
return chr(ord(char) - ord("a") + 1)
return chr({
"@": 0,
"`": 0,
"[": 27,
"{": 27,
"\\": 28,
"|": 28,
"]": 29,
"}": 29,
"^": 30,
"~": 30,
"_": 31,
"?": 127,
}[char])