= len(self.buffer):
return "\0"
c = self.buffer[self.index]
self.index += 1
return c
def unget_char(self):
""" Goes back by one character for parsing """
self.index -= 1
def end(self):
""" Returns true if reached end of buffer """
return self.index >= len(self.buffer)
def parse_control_sequence(self):
""" Parses terminal specifiec control sequences """
result = ""
c = self.get_char()
# \e0 is used to denote start of control sequence
if c == "O":
c = self.get_char()
# \[1\; is start of control sequence
if c == "1":
b = self.get_char()
c = self.get_char()
if b == "\\" and c == "~":
result += "Home"
elif c == ";":
c = self.get_char()
# 3 is Alt
if c == "3":
result += "ALT - "
c = self.get_char()
# \[4\~ is End
if c == "4":
b = self.get_char()
c = self.get_char()
if b == "\\" and c == "~":
result += "End"
# 5 is Ctrl
if c == "5":
result += "CTRL - "
c = self.get_char()
# 9 is Alt
if c == "9":
result += "ALT - "
c = self.get_char()
if c == "A":
result += "Up Arrow"
elif c == "B":
result += "Down Arrow"
elif c == "C":
result += "Right Arrow"
elif c == "D":
result += "Left Arrow"
elif c == "F":
result += "End"
elif c == "H":
result += "Home"
return result
def get_readable_binding(self):
""" Gets a readable representation of binding """
try:
result = BindingParser.readable_keys[self.buffer.lower()]
except KeyError:
result = self.parse_binding()
return result
def parse_binding(self):
readable_command = ""
result = ""
alt = ctrl = False
while not self.end():
c = self.get_char()
if c == "\\":
c = self.get_char()
if c == "e":
d = self.get_char()
if d == "O":
self.unget_char()
result += self.parse_control_sequence()
elif d == "\\":
if self.get_char() == "[":
result += self.parse_control_sequence()
else:
self.unget_char()
self.unget_char()
alt = True
elif d == "\0":
result += "ESC"
else:
alt = True
self.unget_char()
elif c == "c":
ctrl = True
elif c == "n":
result += "Enter"
elif c == "t":
result += "Tab"
elif c == "b":
result += "Backspace"
elif c.isalpha():
result += "\\" + c
else:
result += c
elif c == "\x7f":
result += "Backspace"
else:
result += c
if ctrl:
readable_command += "CTRL - "
if alt:
readable_command += "ALT - "
if result == "":
return "unknown-control-sequence"
return readable_command + result
class FishConfigTCPServer(SocketServer.TCPServer):
"""TCPServer that only accepts connections from localhost (IPv4/IPv6)."""
WHITELIST = set(["::1", "::ffff:127.0.0.1", "127.0.0.1"])
address_family = socket.AF_INET6 if socket.has_ipv6 else socket.AF_INET
def verify_request(self, request, client_address):
return client_address[0] in FishConfigTCPServer.WHITELIST
class FishConfigHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def write_to_wfile(self, txt):
self.wfile.write(txt.encode("utf-8"))
def do_get_colors(self):
# Looks for fish_color_*.
# Returns an array of lists [color_name, color_description, color_value]
result = []
# Make sure we return at least these
remaining = set(
[
"normal",
"error",
"command",
"end",
"param",
"comment",
"match",
"selection",
"search_match",
"operator",
"escape",
"quote",
"redirection",
"valid_path",
"autosuggestion" "user",
"host",
"cancel",
]
)
# Here are our color descriptions
descriptions = {
"normal": "Default text",
"command": "Ordinary commands",
"quote": "Text within quotes",
"redirection": "Like | and >",
"end": "Like ; and &",
"error": "Potential errors",
"param": "Command parameters",
"comment": "Comments start with #",
"match": "Matching parenthesis",
"selection": "Selected text",
"search_match": "History searching",
"history_current": "Directory history",
"operator": "Like * and ~",
"escape": "Escapes like \\n",
"cwd": "Current directory",
"cwd_root": "cwd for root user",
"valid_path": "Valid paths",
"autosuggestion": "Suggested completion",
"user": "Username in the prompt",
"host": "Hostname in the prompt",
"cancel": "The ^C cancel indicator",
}
out, err = run_fish_cmd("set -L")
for line in out.split("\n"):
for match in re.finditer(r"^fish_color_(\S+) ?(.*)", line):
color_name, color_value = [x.strip() for x in match.group(1, 2)]
color_desc = descriptions.get(color_name, "")
data = {"name": color_name, "description": color_desc}
data.update(parse_color(color_value))
result.append(data)
remaining.discard(color_name)
# Sort our result (by their keys)
result.sort(key=operator.itemgetter("name"))
# Ensure that we have all the color names we know about, so that if the
# user deletes one he can still set it again via the web interface
for color_name in remaining:
color_desc = descriptions.get(color_name, "")
result.append([color_name, color_desc, parse_color("")])
return result
def do_get_functions(self):
out, err = run_fish_cmd("functions")
out = out.strip()
# Not sure why fish sometimes returns this with newlines
if "\n" in out:
return out.split("\n")
else:
return out.strip().split(", ")
def do_get_variable_names(self, cmd):
" Given a command like 'set -U' return all the variable names "
out, err = run_fish_cmd(cmd)
return out.split("\n")
def do_get_variables(self):
out, err = run_fish_cmd("set -L")
# Put all the variables into a dictionary
vars = {}
for line in out.split("\n"):
comps = line.split(" ", 1)
if len(comps) < 2:
continue
fish_var = FishVar(comps[0], comps[1])
vars[fish_var.name] = fish_var
# Mark universal variables. L means don't abbreviate.
for name in self.do_get_variable_names("set -nUL"):
if name in vars:
vars[name].universal = True
# Mark exported variables. L means don't abbreviate.
for name in self.do_get_variable_names("set -nxL"):
if name in vars:
vars[name].exported = True
return [
vars[key].get_json_obj()
for key in sorted(vars.keys(), key=lambda x: x.lower())
]
def do_get_bindings(self):
""" Get key bindings """
# Running __fish_config_interactive print fish greeting and
# loads key bindings
greeting, err = run_fish_cmd(" __fish_config_interactive")
# Load the key bindings and then list them with bind
out, err = run_fish_cmd("__fish_config_interactive; bind")
# Remove fish greeting from output
out = out[len(greeting) :]
# Put all the bindings into a list
bindings = []
command_to_binding = {}
binding_parser = BindingParser()
for line in out.split("\n"):
comps = line.split(" ", 2)
# If we don't have "bind", a sequence and a mapping,
# it's not a valid binding.
if len(comps) < 3:
continue
# Store the "--preset" value for later
if comps[1] == "--preset":
preset = True
# There's possibly a way to do this faster, but it's not important.
comps = line.split(" ", 3)[1:]
elif comps[1] == "--user":
preset = False
comps = line.split(" ", 3)[1:]
# Check again if we removed the level.
if len(comps) < 3:
continue
if comps[1] == "-k":
key_name, command = comps[2].split(" ", 1)
binding_parser.set_buffer(key_name.capitalize())
else:
key_name = None
command = comps[2]
binding_parser.set_buffer(comps[1])
if command in bindings_blacklist:
continue
readable_binding = binding_parser.get_readable_binding()
if command in command_to_binding:
fish_binding = command_to_binding[command]
fish_binding.add_binding(line, readable_binding)
else:
fish_binding = FishBinding(command, line, readable_binding)
bindings.append(fish_binding)
command_to_binding[command] = fish_binding
return [binding.get_json_obj() for binding in bindings]
def do_get_history(self):
# Use NUL to distinguish between history items.
out, err = run_fish_cmd("builtin history -z")
result = out.split("\0")
if result:
result.pop() # trim off the trailing element
return result
def do_get_color_for_variable(self, name):
# Return the color with the given name, or the empty string if there is
# none.
out, err = run_fish_cmd("echo -n $" + name)
return out
def do_set_color_for_variable(
self, name, color, background_color, bold, underline, italics, dim, reverse
):
"Sets a color for a fish color name, like 'autosuggestion'"
if not color:
color = "normal"
varname = "fish_color_" + name
# If the name already starts with "fish_", use it as the varname
# This is needed for 'fish_pager_color' vars.
if name.startswith("fish_"):
varname = name
# TODO: Check if the varname is allowable.
command = "set -U " + varname
if color:
command += " " + color
if background_color:
command += " --background=" + background_color
if bold:
command += " --bold"
if underline:
command += " --underline"
if italics:
command += " --italics"
if dim:
command += " --dim"
if reverse:
command += " --reverse"
out, err = run_fish_cmd(command)
return out
def do_get_function(self, func_name):
out, err = run_fish_cmd("functions " + func_name + " | fish_indent --html")
return out
def do_delete_history_item(self, history_item_text):
# It's really lame that we always return success here
cmd = (
"builtin history delete --case-sensitive --exact -- %s; builtin history save"
% escape_fish_cmd(history_item_text)
)
out, err = run_fish_cmd(cmd)
return True
def do_set_prompt_function(self, prompt_func):
cmd = prompt_func + "\n" + "funcsave fish_prompt"
out, err = run_fish_cmd(cmd)
return len(err) == 0
def do_get_prompt(self, command_to_run, prompt_function_text, extras_dict):
# Return the prompt output by the given command
prompt_demo_ansi, err = run_fish_cmd(command_to_run)
prompt_demo_html = ansi_to_html(prompt_demo_ansi)
prompt_demo_font_size = self.font_size_for_ansi_prompt(prompt_demo_ansi)
result = {
"function": prompt_function_text,
"demo": prompt_demo_html,
"font_size": prompt_demo_font_size,
}
if extras_dict:
result.update(extras_dict)
return result
def do_get_current_prompt(self):
# Return the current prompt. We run 'false' to demonstrate how the
# prompt shows the command status (#1624).
prompt_func, err = run_fish_cmd("functions fish_prompt")
result = self.do_get_prompt(
'builtin cd "' + initial_wd + '" ; false ; fish_prompt',
prompt_func.strip(),
{"name": "Current"},
)
return result
def do_get_sample_prompt(self, text, extras_dict):
# Return the prompt you get from the given text. Extras_dict is a
# dictionary whose values get merged in. We run 'false' to demonstrate
# how the prompt shows the command status (#1624)
cmd = text + '\n builtin cd "' + initial_wd + '" \n false \n fish_prompt\n'
return self.do_get_prompt(cmd, text.strip(), extras_dict)
def parse_one_sample_prompt_hash(self, line, result_dict):
# Allow us to skip whitespace, etc.
if not line:
return True
if line.isspace():
return True
# Parse a comment hash like '# name: Classic'
match = re.match(r"#\s*(\w+?): (.+)", line, re.IGNORECASE)
if match:
key = match.group(1).strip()
value = match.group(2).strip()
result_dict[key] = value
return True
# Skip other hash comments
return line.startswith("#")
def read_one_sample_prompt(self, path):
try:
with open(path, "rb") as fd:
extras_dict = {}
# Read one sample prompt from fd
function_lines = []
parsing_hashes = True
unicode_lines = (line.decode("utf-8") for line in fd)
for line in unicode_lines:
# Parse hashes until parse_one_sample_prompt_hash return
# False.
if parsing_hashes:
parsing_hashes = self.parse_one_sample_prompt_hash(
line, extras_dict
)
# Maybe not we're not parsing hashes, or maybe we already
# were not.
if not parsing_hashes:
function_lines.append(line)
func = "".join(function_lines).strip()
result = self.do_get_sample_prompt(func, extras_dict)
return result
except IOError:
# Ignore unreadable files, etc.
return None
def do_get_sample_prompts_list(self):
paths = glob.iglob("sample_prompts/*.fish")
result = []
try:
pool = multiprocessing.pool.ThreadPool(processes=8)
# Kick off the "Current" meta-sample
current_metasample_async = pool.apply_async(self.do_get_current_prompt)
# Read all of the prompts in sample_prompts
sample_results = pool.map(self.read_one_sample_prompt, paths, 1)
result.append(current_metasample_async.get())
result.extend([r for r in sample_results if r])
except ImportError:
# If the platform doesn't support multiprocessing, we just do it one at a time.
# This happens e.g. on Termux.
print(
"Platform doesn't support multiprocessing, running one at a time. This may take a while."
)
result.append(self.do_get_current_prompt())
result.extend([self.read_one_sample_prompt(path) for path in paths])
return result
def do_get_abbreviations(self):
# Example abbreviation line:
# abbr -a -U -- ls 'ls -a'
result = []
out, err = run_fish_cmd("abbr --show")
for line in out.rstrip().split("\n"):
if not line:
continue
_, abbr = line.split(" -- ", 1)
word, phrase = abbr.split(" ", 1)
result.append({"word": word, "phrase": phrase})
return result
def do_remove_abbreviation(self, abbreviation):
out, err = run_fish_cmd("abbr --erase %s" % abbreviation["word"])
if err:
return err
else:
return None
def do_save_abbreviation(self, abbreviation):
out, err = run_fish_cmd(
# Remove one layer of single-quotes because escape_fish_cmd adds them back.
"abbr --add %s %s"
% (
escape_fish_cmd(abbreviation["word"].strip("'")),
escape_fish_cmd(abbreviation["phrase"].strip("'")),
)
)
if err:
return err
else:
return None
def secure_startswith(self, haystack, needle):
if len(haystack) < len(needle):
return False
bits = 0
for x, y in zip(haystack, needle):
bits |= ord(x) ^ ord(y)
return bits == 0
def font_size_for_ansi_prompt(self, prompt_demo_ansi):
width = ansi_prompt_line_width(prompt_demo_ansi)
# Pick a font size
if width >= 70:
font_size = "8pt"
if width >= 60:
font_size = "10pt"
elif width >= 50:
font_size = "11pt"
elif width >= 40:
font_size = "13pt"
elif width >= 30:
font_size = "15pt"
elif width >= 25:
font_size = "16pt"
elif width >= 20:
font_size = "17pt"
else:
font_size = "18pt"
return font_size
def do_GET(self):
p = self.path
authpath = "/" + authkey
if self.secure_startswith(p, authpath):
p = p[len(authpath) :]
else:
return self.send_error(403)
self.path = p
if p == "/colors/":
output = self.do_get_colors()
elif p == "/functions/":
output = self.do_get_functions()
elif p == "/variables/":
output = self.do_get_variables()
elif p == "/history/":
# start = time.time()
output = self.do_get_history()
# end = time.time()
# print "History: ", end - start
elif p == "/sample_prompts/":
output = self.do_get_sample_prompts_list()
elif re.match(r"/color/(\w+)/", p):
name = re.match(r"/color/(\w+)/", p).group(1)
output = self.do_get_color_for_variable(name)
elif p == "/bindings/":
output = self.do_get_bindings()
elif p == "/abbreviations/":
output = self.do_get_abbreviations()
else:
return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
# Return valid output
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.write_to_wfile("\n")
# Output JSON
self.write_to_wfile(json.dumps(output))
def do_POST(self):
p = self.path
authpath = "/" + authkey
if self.secure_startswith(p, authpath):
p = p[len(authpath) :]
else:
return self.send_error(403)
self.path = p
# This is cheesy, we want just the actual content-type.
# In some cases it'll give us the encoding as well,
# ("application/json;charset=utf-8")
# but we don't currently care.
ctype = self.headers["content-type"].split(";")[0]
if ctype == "application/x-www-form-urlencoded":
length = int(self.headers["content-length"])
url_str = self.rfile.read(length).decode("utf-8")
postvars = parse_qs(url_str, keep_blank_values=1)
elif ctype == "application/json":
length = int(self.headers["content-length"])
url_str = self.rfile.read(length).decode("utf-8")
postvars = json.loads(url_str)
else:
postvars = {}
if p == "/set_color/":
what = postvars.get("what")
color = postvars.get("color")
background_color = postvars.get("background_color")
bold = postvars.get("bold")
italics = postvars.get("italics")
reverse = postvars.get("reverse")
dim = postvars.get("dim")
underline = postvars.get("underline")
if what:
# Not sure why we get lists here?
output = self.do_set_color_for_variable(
what[0],
color[0],
background_color[0],
parse_bool(bold[0]),
parse_bool(underline[0]),
parse_bool(italics[0]),
parse_bool(dim[0]),
parse_bool(reverse[0]),
)
else:
output = "Bad request"
elif p == "/get_function/":
what = postvars.get("what")
output = [self.do_get_function(what[0])]
elif p == "/delete_history_item/":
what = postvars.get("what")
if self.do_delete_history_item(what[0]):
output = ["OK"]
else:
output = ["Unable to delete history item"]
elif p == "/set_prompt/":
what = postvars.get("fish_prompt")
if self.do_set_prompt_function(what):
output = ["OK"]
else:
output = ["Unable to set prompt"]
elif p == "/save_abbreviation/":
errmsg = self.do_save_abbreviation(postvars)
if errmsg:
output = [errmsg]
else:
output = ["OK"]
elif p == "/remove_abbreviation/":
errmsg = self.do_remove_abbreviation(postvars)
if errmsg:
output = [errmsg]
else:
output = ["OK"]
else:
return self.send_error(404)
# Return valid output
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.write_to_wfile("\n")
# Output JSON
self.write_to_wfile(json.dumps(output))
def log_request(self, code="-", size="-"):
""" Disable request logging """
pass
def log_error(self, format, *args):
if format == "code %d, message %s":
# This appears to be a send_error() message
# We want to include the path
(code, msg) = args
format = "code %d, message %s, path %s"
args = (code, msg, self.path)
SimpleHTTPServer.SimpleHTTPRequestHandler.log_error(self, format, *args)
redirect_template_html = """
Start the Fish Web config
"""
# find fish
fish_bin_dir = os.environ.get("__fish_bin_dir")
fish_bin_path = None
if not fish_bin_dir:
print("The $__fish_bin_dir environment variable is not set. " "Looking in $PATH...")
fish_bin_path = find_executable("fish")
if not fish_bin_path:
print("fish could not be found. Is fish installed correctly?")
sys.exit(-1)
else:
print("fish found at '%s'" % fish_bin_path)
else:
fish_bin_path = os.path.join(fish_bin_dir, "fish")
if not os.access(fish_bin_path, os.X_OK):
print(
"fish could not be executed at path '%s'. "
"Is fish installed correctly?" % fish_bin_path
)
sys.exit(-1)
FISH_BIN_PATH = fish_bin_path
# We want to show the demo prompts in the directory from which this was invoked,
# so get the current working directory
initial_wd = os.getcwd()
# Make sure that the working directory is the one that contains the script
# server file, because the document root is the working directory.
where = os.path.dirname(sys.argv[0])
os.chdir(where)
# Generate a 16-byte random key as a hexadecimal string
authkey = binascii.b2a_hex(os.urandom(16)).decode("ascii")
# Try to find a suitable port
PORT = 8000
HOST = "::" if socket.has_ipv6 else "localhost"
while PORT <= 9000:
try:
Handler = FishConfigHTTPRequestHandler
httpd = FishConfigTCPServer((HOST, PORT), Handler)
# Success
break
except socket.error:
err_type, err_value = sys.exc_info()[:2]
# str(err_value) handles Python3 correctly
if "Address already in use" not in str(err_value):
print(str(err_value))
break
PORT += 1
if PORT > 9000:
# Nobody say it
print("Unable to find an open port between 8000 and 9000")
sys.exit(-1)
# Get any initial tab (functions, colors, etc)
# Just look at the first letter
initial_tab = ""
if len(sys.argv) > 1:
for tab in [
"functions",
"prompt",
"colors",
"variables",
"history",
"bindings",
"abbreviations",
]:
if tab.startswith(sys.argv[1]):
initial_tab = "#" + tab
break
url = "http://localhost:%d/%s/%s" % (PORT, authkey, initial_tab)
# Create temporary file to hold redirect to real server. This prevents exposing
# the URL containing the authentication key on the command line (see
# CVE-2014-2914 or https://github.com/fish-shell/fish-shell/issues/1438).
f = tempfile.NamedTemporaryFile(prefix="web_config", suffix=".html", mode="w")
f.write(redirect_template_html % (url, url))
f.flush()
# Open temporary file as URL
# Use open on macOS >= 10.12.5 to work around #4035.
fileurl = "file://" + f.name
esc = get_special_ansi_escapes()
print(
"Web config started at %s%s%s"
% (esc["underline"], fileurl, esc["exit_attribute_mode"])
)
print("%sHit ENTER to stop.%s" % (esc["bold"], esc["exit_attribute_mode"]))
def runThing():
if isMacOS10_12_5_OrLater():
subprocess.check_call(["open", fileurl])
elif is_wsl():
subprocess.call(["cmd.exe", "/c", "start %s" % url])
elif is_termux():
subprocess.call(["termux-open-url", url])
else:
webbrowser.open(fileurl)
# Some browsers still block webbrowser.open if they haven't been opened before,
# so we just spawn it in a thread.
thread = threading.Thread(target=runThing)
thread.start()
# Select on stdin and httpd
stdin_no = sys.stdin.fileno()
try:
while True:
ready_read = select.select([sys.stdin.fileno(), httpd.fileno()], [], [])
if ready_read[0][0] < 1:
print("Shutting down.")
# Consume the newline so it doesn't get printed by the caller
sys.stdin.readline()
break
else:
httpd.handle_request()
except KeyboardInterrupt:
print("\nShutting down.")
# Clean up temporary file
f.close()
thread.join()