xxh/xxh
2020-03-04 12:42:43 +03:00

494 lines
No EOL
24 KiB
Text
Executable file

#!/usr/bin/env xonsh
import os, sys, argparse, datetime, re, getpass, pexpect
from shutil import which
from sys import exit
from argparse import RawTextHelpFormatter
from urllib.parse import urlparse
from random import randint
sys.path.append(str(pf"{__file__}".absolute().parent))
import xonssh_xxh
from xonssh_xxh.settings import global_settings
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def eeprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
exit(1)
class Xxh:
def __init__(self):
self.package_dir_path = pf"{xonssh_xxh.__file__}".parent
self.url_xxh_github = 'https://github.com/xxh/xxh'
self.url_xxh_plugins_search = 'https://github.com/search?q=xxh-plugin'
self.url_appimage = 'https://github.com/niess/linuxdeploy-plugin-python/releases/download/continuous/xonsh-x86_64.AppImage'
self.local_xxh_version = global_settings['XXH_VERSION']
self.local_xxh_home = '~/.xxh'
self.host_xxh_home = '~/.xxh'
self.url = None
self.portable_methods = ['appimage']
self.portable_methods_str = ', '.join(self.portable_methods)
self.xonsh_bin_name = 'xonsh'
self.ssh_arguments = []
self.ssh_arg_v = []
self.sshpass = []
self.use_pexpect = True
self._password = None
self._verbose = False
self._vverbose = False
self._method = 'appimage'
def snail(self):
try:
terminal = os.get_terminal_size()
terminal_cols = terminal.columns
except:
terminal_cols=70
if terminal_cols < 70:
return f"\n\nContribution: {self.url_xxh_github}\n\nPlugins: {self.url_xxh_plugins_search}"
l,r,s,t = (['@','-','_'][randint(0,2)], ['@','-','_'][randint(0,2)], ['_',' '][randint(0,1)], ['_',''][randint(0,1)])
return f"\n" \
+f" {s}___ __________ {l} {r}\n" \
+f" {s}_____ / \\ \\__/\n" \
+f" {s}___ / ______ \\ / \\ contribution\n" \
+f" {s}____ / / __ \\ \\ / _/ {self.url_xxh_github}\n" \
+f" {s}__ ( / / / \\ \\ /\n" \
+f" \\ \\___/ / / / plugins\n" \
+f"{' ' if not t else ''} _{t}__\\ /__/ / {self.url_xxh_plugins_search}\n" \
+f"{' ' if not t else ''} / {'' if not t else ' '} \\________/ /\n" \
+f"{' ' if not t else ''} /_{t}__________________/\n" \
+f"" # d2F0Y2ggLW4uMiB4eGggLWg
def pssh(self, cmd, accept_host=None, host_password=None, key_password=None):
if self.password:
host_password = self.password
if self.vverbose:
eprint('Try pexpect command: '+cmd)
sess = pexpect.spawn(cmd)
user_host_accept = None
user_host_password = None
user_key_password = None
patterns = ['Are you sure you want to continue connecting.*', "Please type 'yes' or 'no':",
'Enter passphrase for key.*', 'password:', pexpect.EOF, '[$#~]', 'Last login.*']
while True:
try:
i = sess.expect(patterns, timeout=3)
except:
if self.vverbose:
print('Unknown answer details:')
print(sess)
print('Unknown answer from host')
return {}
if self.vverbose:
eprint(f'Pexpect caught pattern: {patterns[i]}')
if i in [0,1]:
# Expected:
# The authenticity of host '<...>' can't be established.
# ECDSA key fingerprint is <...>
# Are you sure you want to continue connecting (yes/no)?
print((sess.before + sess.after).decode("utf-8"), end='')
if accept_host is None:
user_host_accept = input()
sess.sendline(user_host_accept)
if user_host_accept == 'yes':
user_host_accept = True
elif user_host_accept == 'no':
user_host_accept = False
else:
user_host_accept = None
elif accept_host:
sess.sendline('yes')
else:
sess.sendline('no')
if i == 2:
# Expected:
# Enter passphrase for key '<keyfile>':
if key_password is None:
user_key_password = getpass.getpass(prompt=(sess.before + sess.after).decode("utf-8")+' ')
sess.sendline(user_key_password)
else:
sess.sendline(key_password)
if i == 3:
# Expected:
# <host>`s password:
if host_password is None:
user_host_password = getpass.getpass(prompt=(sess.before + sess.after).decode("utf-8")+' ')
sess.sendline(user_host_password)
else:
sess.sendline(host_password)
if i == 4:
# Getting result
output = sess.before.decode("utf-8")
output = re.sub('\r\nConnection to (.*) closed.\r\r\n', '', output)
output = output[:-3] if output.endswith('\r\r\n') else output
output = output[3:] if output.startswith(' \r\n') else output
result = {
'user_host_accept': user_host_accept,
'user_host_password':user_host_password,
'user_key_password':user_key_password,
'output':output
}
return result
if i == [5,6]:
# Prompt
print(sess.before.decode("utf-8"))
sess.interact()
result = {
'user_host_accept': user_host_accept,
'user_host_password':user_host_password,
'user_key_password':user_key_password
}
return result
return {}
@property
def password(self):
return self._password
@password.setter
def password(self, password):
self._password = password
if password:
if not which('sshpass'):
eeprint('Install sshpass to using password: https://duckduckgo.com/?q=install+sshpass\n'
+ 'Note! There are a lot of security reasons to stop using password auth.')
verbose = '-v' if '-v' in self.sshpass else []
self.sshpass = ['sshpass', '-p', password] + verbose
else:
self.sshpass = []
@property
def method(self):
return self._method
@method.setter
def method(self, value):
if value not in self.portable_methods:
eeprint(f'Currently supported methods: {self.portable_methods_str}')
self._method = value
@property
def verbose(self):
return self._verbose
@verbose.setter
def verbose(self, value):
self._verbose = value
if not self._verbose:
self.vverbose=False
@property
def vverbose(self):
return self._vverbose
@vverbose.setter
def vverbose(self, value):
self._vverbose = value
if self._vverbose:
self.verbose = True
self.ssh_arg_v = ['-v']
if self.sshpass and ['-v'] not in self.sshpass:
self.sshpass += ['-v']
else:
self.ssh_arg_v = []
if '-v' in self.sshpass:
self.sshpass.remove('-v')
def parse_destination(self, destination):
destination = f'ssh://{destination}' if 'ssh://' not in destination else destination
url = urlparse(destination)
return url
def get_host_info(self):
host = self.url.hostname
host_info_sh = self.package_dir_path / 'host_info.sh'
if self.use_pexpect:
cmd = "bash -c 'cat {host_info_sh} | sed \"s|_xxh_home_|{host_xxh_home}|\" | ssh {ssh_v} {ssh_arguments} {host} -T \"bash -s\"'".format(
host_info_sh=host_info_sh, host_xxh_home=self.host_xxh_home, ssh_v=('' if not self.ssh_arg_v else '-v'), ssh_arguments=' '.join(self.ssh_arguments), host=host)
pr = self.pssh(cmd)
if pr == {}:
eeprint('Unexpected result. Try again with +v or +vv or try ssh before xxh')
if self.verbose:
eprint('Pexpect result:')
eprint(pr)
if pr['user_host_password'] is not None:
self.password = pr['user_host_password']
r = pr['output']
else:
r = $(cat @(host_info_sh) | sed @(f's|_xxh_home_|{self.host_xxh_home}|') | @(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -T "bash -s" ).strip()
if self.verbose:
eprint(f'Host info:\n{r}')
if r == '':
eeprint('Empty answer from host when getting first info. Often this is a connection error.\n'
+ 'Check your connection parameters using the same command but with ssh.')
r = dict([l.split('=') for l in r.replace('\r','').split('\n') if l.strip() != '' and '=' in l])
return r
def main(self):
argp = argparse.ArgumentParser(description=f"The xxh is for using the xonsh shell wherever you go through the ssh. {self.snail()}", formatter_class=RawTextHelpFormatter, prefix_chars='-+')
argp.add_argument('--version', '-V', action='version', version=f"xonssh-xxh/{self.local_xxh_version}")
argp.add_argument('-p', dest='ssh_port', help="Port to connect to on the remote host.")
argp.add_argument('-l', dest='ssh_login', help="Specifies the user to log in as on the remote machine.")
argp.add_argument('-i', dest='ssh_private_key', help="File from which the identity (private key) for public key authentication is read.")
argp.add_argument('-o', dest='ssh_options', metavar='SSH_OPTION -o ...', action='append', help="SSH options are described in ssh man page. Example: -o Port=22 -o User=snail")
argp.add_argument('destination', metavar='[user@]host[:port]', help="Destination may be specified as [user@]host[:port] or host from ~/.ssh/config")
argp.add_argument('+i','++install', default=False, action='store_true', help="Install xxh to destination host.")
argp.add_argument('+if','++install-force', default=False, action='store_true', help="Removing the host xxh home and install xxh again.")
argp.add_argument('+P','++password', help="Password for ssh auth.")
argp.add_argument('+PP','++password-prompt', default=False, action='store_true', help="Enter password manually using prompt.")
argp.add_argument('+lh','++local-xxh-home', default=self.local_xxh_home, help=f"Local xxh home path. Default: {self.local_xxh_home}")
argp.add_argument('+hh','++host-xxh-home', default=self.host_xxh_home, help=f"Host xxh home path. Default: {self.host_xxh_home}")
argp.add_argument('+he','++host-execute-file', help=f"Execute script file placed on host and exit.")
argp.add_argument('+m','++method', default='appimage', help=f"Portable method: {self.portable_methods_str}")
argp.add_argument('+v','++verbose', default=False, action='store_true', help="Verbose mode.")
argp.add_argument('+vv','++vverbose', default=False, action='store_true', help="Super verbose mode.")
argp.usage = """xxh <host from ~/.ssh/config>
usage: xxh [ssh arguments] [user@]host[:port] [xxh arguments]
usage: xxh [-h] [-V] [-p SSH_PORT] [-l SSH_LOGIN] [-i SSH_PRIVATE_KEY] [-o SSH_OPTION -o ...]
[user@]host[:port]
[+i] [+if] [+P PASSWORD] [+PP]
[+lxh LOCAL_XXH_HOME] [+hxh HOST_XXH_HOME] [+he HOST_EXECUTE_FILE]
[+m METHOD] [+v] [+vv]
"""
help = argp.format_help().replace('\n +','\n\nxxh arguments:\n +',1).replace('optional ', 'common ')\
.replace('number and exit', 'number and exit\n\nssh arguments:').replace('positional ', 'required ')
argp.format_help = lambda: help
opt = argp.parse_args()
self.verbose = opt.verbose
self.vverbose = opt.vverbose
self.method = opt.method
self.url = url = self.parse_destination(opt.destination)
username = getpass.getuser()
host = url.hostname
if not host:
eeprint(f"Wrong distination '{host}'")
if url.port:
opt.ssh_port = url.port
if url.username:
opt.ssh_login = url.username
if opt.ssh_login:
username = opt.ssh_login
self.ssh_arguments = ['-o', 'StrictHostKeyChecking=accept-new']
if not self.verbose:
self.ssh_arguments += ['-o', 'LogLevel=QUIET']
if opt.ssh_port:
self.ssh_arguments += ['-o', f'Port={opt.ssh_port}']
if opt.ssh_private_key:
self.ssh_arguments += ['-o', f'IdentityFile={opt.ssh_private_key}']
if opt.ssh_login:
self.ssh_arguments += ['-o', f'User={opt.ssh_login}']
if opt.ssh_options:
for ssh_option in opt.ssh_options:
self.ssh_arguments += ['-o', ssh_option]
if self.verbose:
eprint(f'ssh arguments: {self.ssh_arguments}')
if opt.password is not None:
self.password = opt.password
elif opt.password_prompt:
password = ''
while not password:
password = getpass.getpass(f"Enter {username}@{host}'s password: ")
self.password = password
opt.install = True if opt.install_force else opt.install
self.local_xxh_home = pf"{opt.local_xxh_home}"
local_xxh_home_parent = self.local_xxh_home.parent
if self.local_xxh_home.exists():
if not os.access(self.local_xxh_home, os.W_OK):
eeprint(f"The local xxh home path isn't writable: {self.local_xxh_home}" )
elif local_xxh_home_parent.exists():
if os.access(local_xxh_home_parent, os.W_OK):
eprint(f'Create local xxh home path: {self.local_xxh_home}')
mkdir @(self.ssh_arg_v) -p @(self.local_xxh_home) @(self.local_xxh_home / 'plugins')
else:
eeprint(f"Parent for local xxh home path isn't writable: {local_xxh_home_parent}")
else:
eeprint(f"Paths aren't writable:\n {local_xxh_home_parent}\n {self.local_xxh_home}")
# Fix env to avoid ssh warnings
for lc in ['LC_TIME','LC_MONETARY','LC_ADDRESS','LC_IDENTIFICATION','LC_MEASUREMENT','LC_NAME','LC_NUMERIC','LC_PAPER','LC_TELEPHONE']:
${...}[lc] = "POSIX"
if pf'{opt.host_xxh_home}' == pf'/':
eeprint("Host xxh home path {host_xxh_home} looks like /. Please check twice!")
host_info = self.get_host_info()
if not host_info:
eeprint(f'Unknown answer from host when getting info')
if 'xxh_home_realpath' not in host_info or host_info['xxh_home_realpath'] == '':
eeprint(f'Unknown answer from host when getting realpath for directory {host_xxh_home}')
if 'xxh_version' not in host_info or host_info['xxh_version'] == '':
eeprint(f'Unknown answer from host when getting version for directory {host_xxh_home}')
host_xxh_home = host_info['xxh_home_realpath']
host_xxh_home = pf"{host_xxh_home}"
host_xxh_version = host_info['xxh_version']
if host_info['xxh_home_writable'] == '0' and host_info['xxh_parent_home_writable'] == '0':
yn = input(f"{host}:{host_xxh_home} is not writable. Continue? [y/n] ").strip().lower()
if yn != 'y':
eeprint('Stopped')
if host_info['scp'] == '' and host_info['rsync'] == '':
eeprint(f"There are no rsync or scp on target host. Sad but files can't be uploaded.")
host_xonsh_bin = host_xxh_home / self.xonsh_bin_name
host_xonshrc = host_xxh_home / 'xonshrc.xsh'
if opt.install_force == False:
# Check version
ask = False
if host_xxh_version == 'version_not_found':
ask = f'Host xxh home is not empty but something went wrong while getting host xxh version.'
elif host_xxh_version not in ['dir_not_found','dir_empty'] and host_xxh_version != self.local_xxh_version:
ask = f"Local xxh version '{self.local_xxh_version}' is not equal host xxh version '{host_xxh_version}'."
if ask:
choice = input(f"{ask} What's next? \n"
+ " s - [default] Stop here. You'll try to connect using ordinary ssh for backup current xxh home.\n"
+ " u - Safe update. Host xxh home will be renamed and local xxh version will be installed.\n"
+ " f - Force install local xxh version on host. Host xxh installation will be lost.\n"
+ " i - Ignore, cross fingers and continue the connection.\n"
+ "S/u/f/i? ").lower()
if choice == 's' or choice.strip() == '':
print('Stopped')
exit(0)
elif choice == 'u':
local_time = datetime.datetime.now().isoformat()[:19]
eprint(f"Move {host}:{host_xxh_home} to {host}:{host_xxh_home}-{local_time}")
echo @(f"mv {host_xxh_home} {host_xxh_home}-{local_time}") | @(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -T "bash -s"
opt.install = True
elif choice == 'f':
opt.install = True
opt.install_force = True
elif choice == 'i':
pass
else:
eeprint('Unknown answer')
if host_xxh_version in ['dir_not_found','dir_empty'] and opt.install_force == False:
yn = input(f"{host}:{host_xxh_home} not found. Install xxh? [Y/n] ").strip().lower()
if yn == 'y' or yn == '':
opt.install = True
else:
eeprint('Unknown answer')
if opt.install:
eprint("\033[0;33m", end='')
if opt.method == 'appimage':
local_xonsh_appimage_fullpath = self.local_xxh_home / self.xonsh_bin_name
if not local_xonsh_appimage_fullpath.is_file():
eprint(f'First time download and save xonsh AppImage from {self.url_appimage}')
if which('wget'):
r=![wget -q --show-progress @(self.url_appimage) -O @(local_xonsh_appimage_fullpath)]
if r.returncode != 0:
eeprint(f'Error while download appimage using wget: {r}')
elif which('curl'):
r=![curl @(self.url_appimage) -o @(local_xonsh_appimage_fullpath)]
if r.returncode != 0:
eeprint(f'Error while download appimage using curl: {r}')
else:
eeprint('Please install wget or curl and try again. Howto: https://duckduckgo.com/?q=how+to+install+wget+in+linux')
chmod +x @(local_xonsh_appimage_fullpath)
else:
eprint(f'Method "{opt.method}" is not supported now')
if opt.install_force:
eprint(f'Remove host xxh home {host}:{host_xxh_home}')
echo @(f"rm -rf {host_xxh_home}/*") | @(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -T "bash -s"
eprint(f"Install xxh to {host}:{host_xxh_home}" )
if host_xxh_version in ['dir_not_found']:
eprint(f'Create xxh home {host_xxh_home}')
echo @(f"mkdir -p {host_xxh_home}") | @(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -T "bash -s"
if which('rsync') and host_info['rsync']:
eprint('Upload using rsync')
rsync @(self.ssh_arg_v) -e @(f"{''.join(self.sshpass)} ssh {'' if self.ssh_arg_v == [] else '-v'} {' '.join(self.ssh_arguments)}") -az --info=progress2 --include ".*" --exclude='*.pyc' @(self.local_xxh_home)/ @(host):@(host_xxh_home)/ 1>&2
rsync @(self.ssh_arg_v) -e @(f"{''.join(self.sshpass)} ssh {'' if self.ssh_arg_v == [] else '-v'} {' '.join(self.ssh_arguments)}") -az --info=progress2 --include ".*" --exclude='*.pyc' @(self.package_dir_path)/ @(host):@(host_xxh_home)/ 1>&2
elif which('scp') and host_info['scp']:
eprint("Upload using scp. Note: install rsync on local and remote host to increase speed.")
scp_host = f"{host}:{host_xxh_home}/"
@(self.sshpass) scp @(self.ssh_arg_v) @(self.ssh_arguments) -r -C @([] if self.vverbose else ['-q']) @(self.local_xxh_home)/* @(scp_host) 1>&2
@(self.sshpass) scp @(self.ssh_arg_v) @(self.ssh_arguments) -r -C @([] if self.vverbose else ['-q']) @(self.package_dir_path)/* @(scp_host) 1>&2
else:
eprint('Please install rsync or scp!')
plugins_fullpath = self.local_xxh_home / 'plugins'
if plugins_fullpath.exists():
plugin_post_installs = sorted(plugins_fullpath.glob('*/post_install.xsh'))
if len(plugin_post_installs) > 0:
eprint(f'Run plugins post install on {host}')
scripts=''
for script in plugin_post_installs:
scripts += " && %s -i --rc %s -- %s" % (host_xonsh_bin, host_xonshrc, str(script).replace(str(self.local_xxh_home)+'/', ''))
eprint(f' * {script}')
if scripts:
echo @(f"cd {host_xxh_home} {scripts}" ) | @(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -T "bash -s" 1>&2
eprint(f'Check {opt.method}')
host_settings_file = host_xxh_home / 'settings.py'
check = $(@(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -t @(host_xonsh_bin) --no-script-cache -i --rc @(host_xonshrc) -- @(host_settings_file) )
if self.vverbose:
eprint(f'Check xonsh result:\n{check}')
if check == '' or 'AppImages require FUSE to run' in check:
eprint('AppImage is not supported by host. Trying to unpack and run...')
host_xonsh_bin_new = host_xxh_home / 'xonsh-squashfs/usr/bin/python3'
@(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -t @(f"cd {host_xxh_home} && ./{self.xonsh_bin_name} --appimage-extract | grep -E 'usr/python/bin/xonsh$' && mv squashfs-root xonsh-squashfs && mv {host_xonsh_bin} {host_xonsh_bin}-disabled && ln -s {host_xonsh_bin_new} xonsh") 1>&2
host_xonsh_bin = host_xonsh_bin_new
eprint(f'First run xonsh on {host}\033[0m')
host_execute_file = ['--', opt.host_execute_file] if opt.host_execute_file else []
@(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -t @(host_xonsh_bin) --no-script-cache -i --rc @(host_xonshrc) @(host_execute_file)
if __name__ == '__main__':
if os.name == 'nt':
eeprint(f"Windows is not supported. WSL1 is not recommended also. WSL2 is not tested yet.\nContribution: {self.url_xxh_github}")
if not which('ssh'):
eeprint('Install OpenSSH client before using xxh: https://duckduckgo.com/?q=how+to+install+openssh+client+in+linux')
xxh = Xxh()
xxh.main()