mirror of
https://github.com/xxh/xxh
synced 2024-11-30 15:39:16 +00:00
696 lines
No EOL
34 KiB
Text
Executable file
696 lines
No EOL
34 KiB
Text
Executable file
#!/usr/bin/env xonsh
|
|
|
|
import os, sys, argparse, yaml, datetime, re, getpass, pexpect
|
|
from shutil import which
|
|
from sys import exit
|
|
from argparse import RawTextHelpFormatter
|
|
from urllib.parse import urlparse
|
|
from random import randint
|
|
from base64 import b64encode
|
|
|
|
class Xxh:
|
|
def __init__(self, package_dir_path, version='0.0.0'):
|
|
self.package_dir_path = package_dir_path
|
|
self.url_xxh_github = 'https://github.com/xxh/xxh'
|
|
self.url_xxh_plugins_search = 'https://github.com/search?q=xxh-plugin'
|
|
self.local_xxh_version = version
|
|
self.local_xxh_home = '~/.xxh'
|
|
self.config_file = '~/.xxh/.xxhc'
|
|
self.host_xxh_home = '~/.xxh'
|
|
|
|
self.default_shells = {
|
|
'xonsh':{
|
|
'xxh-shell': 'xxh-shell-xonsh-appimage',
|
|
'xxh-shell-source': 'https://github.com/xxh/xxh-shell-xonsh-appimage.git'
|
|
},
|
|
'zsh': {
|
|
'xxh-shell': 'xxh-shell-zsh',
|
|
'xxh-shell-source': 'https://github.com/xxh/xxh-shell-zsh.git'
|
|
},
|
|
'fish': {
|
|
'xxh-shell': 'xxh-shell-fish-appimage',
|
|
'xxh-shell-source': 'https://github.com/xxh/xxh-shell-fish-appimage.git'
|
|
},
|
|
'bash-zero': {
|
|
'xxh-shell': 'xxh-shell-bash-zero',
|
|
'xxh-shell-source': 'https://github.com/xxh/xxh-shell-bash-zero.git'
|
|
},
|
|
'osquery':{
|
|
'xxh-shell': 'xxh-shell-osquery',
|
|
'xxh-shell-source': 'https://github.com/xxh/xxh-shell-osquery.git'
|
|
}
|
|
}
|
|
self._shell = self.default_shells[self.get_current_shell()]['xxh-shell']
|
|
self.shell_source = self.default_shells[self.get_current_shell()]['xxh-shell-source']
|
|
|
|
self.url = None
|
|
self.ssh_arguments = []
|
|
self.ssh_arg_v = []
|
|
self.sshpass = []
|
|
self.use_pexpect = True
|
|
self._password = None
|
|
self._verbose = False
|
|
self._vverbose = False
|
|
self.quiet = False
|
|
|
|
def eprint(self, *args, **kwargs):
|
|
if not self.quiet:
|
|
print(*args, file=sys.stderr, **kwargs)
|
|
|
|
def eeprint(self, *args, **kwargs):
|
|
if not self.quiet:
|
|
print(*args, file=sys.stderr, **kwargs)
|
|
exit(1)
|
|
|
|
def get_current_shell(self):
|
|
if 'SHELL' in ${...}:
|
|
if $SHELL.endswith('zsh'):
|
|
return 'zsh'
|
|
if $SHELL.endswith('fish'):
|
|
return 'fish'
|
|
return 'xonsh'
|
|
|
|
def stripunquote(self, s):
|
|
s = s.strip()
|
|
if s.startswith('"') and s.endswith('"'):
|
|
s = s[1:-1]
|
|
return s
|
|
|
|
def get_xxh_env(self):
|
|
xxh_env = {}
|
|
if 'XXH_SH_ENV' in ${...}:
|
|
xxh_env = $XXH_SH_ENV
|
|
for kw in ['declare', 'typeset', 'export']:
|
|
xxh_env = xxh_env.replace('\n' + kw + ' ', '\n\n' + kw + ' ')
|
|
xxh_env += '\n\n'
|
|
xxh_env = re.findall('^.+ ([a-zA-Z_0-9]+?)=((?:.+\n)+)', xxh_env, re.MULTILINE)
|
|
xxh_env = {v[0]: self.stripunquote(v[1]) for v in xxh_env}
|
|
return xxh_env
|
|
|
|
def pssh(self, cmd, accept_host=None, host_password=None, key_password=None):
|
|
if self.password:
|
|
host_password = self.password
|
|
|
|
if self.vverbose:
|
|
self.eprint('Try pexpect command: '+cmd)
|
|
|
|
sess = pexpect.spawn(cmd)
|
|
user_host_accept = None
|
|
user_host_password = None
|
|
user_key_password = None
|
|
patterns = ['Are you sure you want to continue connecting.*', "Please type 'yes' or 'no':",
|
|
'Enter passphrase for key.*', 'password:', pexpect.EOF, '[$#~]', 'Last login.*']
|
|
while True:
|
|
try:
|
|
i = sess.expect(patterns, timeout=3)
|
|
except:
|
|
if self.vverbose:
|
|
print('Unknown answer details:')
|
|
print(sess)
|
|
print('Unknown answer from host')
|
|
return {}
|
|
|
|
if self.vverbose:
|
|
self.eprint(f'Pexpect caught pattern: {patterns[i]}')
|
|
|
|
if i in [0,1]:
|
|
# Expected:
|
|
# The authenticity of host '<...>' can't be established.
|
|
# ECDSA key fingerprint is <...>
|
|
# Are you sure you want to continue connecting (yes/no)?
|
|
print((sess.before + sess.after).decode("utf-8"), end='')
|
|
if accept_host is None:
|
|
user_host_accept = input()
|
|
sess.sendline(user_host_accept)
|
|
if user_host_accept == 'yes':
|
|
user_host_accept = True
|
|
elif user_host_accept == 'no':
|
|
user_host_accept = False
|
|
else:
|
|
user_host_accept = None
|
|
elif accept_host:
|
|
sess.sendline('yes')
|
|
else:
|
|
sess.sendline('no')
|
|
|
|
if i == 2:
|
|
# Expected:
|
|
# Enter passphrase for key '<keyfile>':
|
|
if key_password is None:
|
|
user_key_password = getpass.getpass(prompt=(sess.before + sess.after).decode("utf-8")+' ')
|
|
sess.sendline(user_key_password)
|
|
else:
|
|
sess.sendline(key_password)
|
|
|
|
if i == 3:
|
|
# Expected:
|
|
# <host>`s password:
|
|
if host_password is None:
|
|
user_host_password = getpass.getpass(prompt=(sess.before + sess.after).decode("utf-8")+' ')
|
|
sess.sendline(user_host_password)
|
|
else:
|
|
sess.sendline(host_password)
|
|
|
|
if i == 4:
|
|
# Getting result
|
|
output = sess.before.decode("utf-8")
|
|
output = re.sub('\r\nConnection to (.*) closed.\r\r\n', '', output)
|
|
output = output[:-3] if output.endswith('\r\r\n') else output
|
|
output = output[3:] if output.startswith(' \r\n') else output
|
|
result = {
|
|
'user_host_accept': user_host_accept,
|
|
'user_host_password':user_host_password,
|
|
'user_key_password':user_key_password,
|
|
'output':output
|
|
}
|
|
|
|
return result
|
|
|
|
if i == [5,6]:
|
|
# Prompt
|
|
print(sess.before.decode("utf-8"))
|
|
sess.interact()
|
|
|
|
result = {
|
|
'user_host_accept': user_host_accept,
|
|
'user_host_password':user_host_password,
|
|
'user_key_password':user_key_password
|
|
}
|
|
return result
|
|
|
|
return {}
|
|
|
|
def shells(self):
|
|
default_shells = [v['xxh-shell'] for k,v in self.default_shells.items()]
|
|
installed_shells = [str(s.name) for s in pf'{self.local_xxh_home}/xxh/shells'.glob('*')]
|
|
available_shells = list(set(default_shells + installed_shells))
|
|
defaults = [k+' (%s)'%v['xxh-shell'] for k,v in self.default_shells.items()]
|
|
list_str = ', '.join(defaults + [s for s in available_shells if s not in default_shells])
|
|
|
|
return {
|
|
'default': default_shells,
|
|
'installed': installed_shells,
|
|
'available': available_shells,
|
|
'available_help': list_str
|
|
}
|
|
|
|
@property
|
|
def password(self):
|
|
return self._password
|
|
|
|
@password.setter
|
|
def password(self, password):
|
|
self._password = password
|
|
if password:
|
|
if not which('sshpass'):
|
|
self.eeprint('Install sshpass to using password: https://duckduckgo.com/?q=install+sshpass\n'
|
|
+ 'Note! There are a lot of security reasons to stop using password auth.')
|
|
verbose = '-v' if '-v' in self.sshpass else []
|
|
self.sshpass = ['sshpass', '-p', password] + verbose
|
|
else:
|
|
self.sshpass = []
|
|
|
|
@property
|
|
def shell(self):
|
|
return self._shell
|
|
|
|
@shell.setter
|
|
def shell(self, value):
|
|
default_shells = [v['xxh-shell'] for k,v in self.default_shells.items()]
|
|
shells = self.shells()
|
|
if value not in shells['available']:
|
|
self.eeprint('Currently supported shells: ' + shells['available_help'])
|
|
self._shell = value
|
|
|
|
@property
|
|
def verbose(self):
|
|
return self._verbose
|
|
|
|
@verbose.setter
|
|
def verbose(self, value):
|
|
self._verbose = value
|
|
if not self._verbose:
|
|
self.vverbose=False
|
|
|
|
@property
|
|
def vverbose(self):
|
|
return self._vverbose
|
|
|
|
@vverbose.setter
|
|
def vverbose(self, value):
|
|
self._vverbose = value
|
|
if self._vverbose:
|
|
self.verbose = True
|
|
self.ssh_arg_v = ['-v']
|
|
if self.sshpass and ['-v'] not in self.sshpass:
|
|
self.sshpass += ['-v']
|
|
else:
|
|
self.ssh_arg_v = []
|
|
if '-v' in self.sshpass:
|
|
self.sshpass.remove('-v')
|
|
|
|
def parse_destination(self, destination):
|
|
destination = f'ssh://{destination}' if 'ssh://' not in destination else destination
|
|
url = urlparse(destination)
|
|
return url
|
|
|
|
def get_host_info(self):
|
|
if '|' in self.host_xxh_home:
|
|
self.eeprint(f'Wrong host xxh home: {self.host_xxh_home}')
|
|
|
|
host = self.url.hostname
|
|
host_info_sh = self.package_dir_path / 'host_info.sh'
|
|
if self.use_pexpect:
|
|
cmd = "bash -c 'cat {host_info_sh} | sed \"s|_xxh_home_|{host_xxh_home}|\" | sed \"s|_xxh_shell_|{shell}|\" | ssh {ssh_v} {ssh_arguments} {host} -T \"bash -s\"'".format(
|
|
host_info_sh=host_info_sh, host_xxh_home=self.host_xxh_home, shell=self.shell, ssh_v=('' if not self.ssh_arg_v else '-v'), ssh_arguments=' '.join(self.ssh_arguments), host=host)
|
|
pr = self.pssh(cmd)
|
|
|
|
if pr == {}:
|
|
self.eeprint('Unexpected result. Try again with +v or +vv or try ssh before xxh')
|
|
|
|
if self.verbose:
|
|
self.eprint('Pexpect result:')
|
|
self.eprint(pr)
|
|
|
|
if pr['user_host_password'] is not None:
|
|
self.password = pr['user_host_password']
|
|
|
|
r = pr['output']
|
|
else:
|
|
r = $(cat @(host_info_sh) | sed @(f's|_xxh_home_|{self.host_xxh_home}|') | @(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -T "bash -s" ).strip()
|
|
|
|
if self.verbose:
|
|
self.eprint(f'Host info:\n{r}')
|
|
|
|
if r == '':
|
|
self.eeprint('Empty answer from host when getting first info. Often this is a connection error.\n'
|
|
+ 'Check your connection parameters using the same command but with ssh.')
|
|
|
|
r = dict([l.split('=') for l in r.replace('\r','').split('\n') if l.strip() != '' and '=' in l])
|
|
|
|
return r
|
|
|
|
def prepare_env_args(self, envs, to_base64=True):
|
|
env_args=[]
|
|
if envs:
|
|
for e in envs:
|
|
el = e.split('=', 1)
|
|
if len(el) != 2:
|
|
self.eeprint(f'Wrong environment (expected NAME=VAL): {e}')
|
|
if not re.match('^[a-zA-Z_]+$', el[0]):
|
|
self.eeprint(f'Wrong environment NAME (expected [a-zA-Z-]): {el[0]}')
|
|
|
|
val = el[1]
|
|
if (val.startswith("'") and val.endswith("'")) or (val.startswith('"') and val.endswith('"')):
|
|
val=val[1:-1]
|
|
|
|
if to_base64:
|
|
val = self.b64e(val)
|
|
|
|
env_args += ['-e', "%s=%s" % ( el[0], val ) ]
|
|
return env_args
|
|
|
|
def b64e(self, s):
|
|
return b64encode(s.encode()).decode()
|
|
|
|
def create_xxh_env(self):
|
|
home = fp'{self.local_xxh_home}'
|
|
if not home.exists():
|
|
mkdir -p @(home) @(home / 'xxh/shells') @(home / 'xxh/plugins')
|
|
|
|
config_file = fp'{self.config_file}'
|
|
sample_config_file = self.package_dir_path / 'config.xxhc'
|
|
if not config_file.exists() and sample_config_file.exists():
|
|
cp @(sample_config_file) @(config_file)
|
|
|
|
def d2F0Y2ggLW4uMiB4eGggLWg(self):
|
|
try:
|
|
terminal = os.get_terminal_size()
|
|
terminal_cols = terminal.columns
|
|
except:
|
|
terminal_cols=70
|
|
if terminal_cols < 70:
|
|
return f"\n\nContribution: {self.url_xxh_github}\n\nPlugins: {self.url_xxh_plugins_search}"
|
|
l,r,s,t = (['@','-','_'][randint(0,2)], ['@','-','_'][randint(0,2)], ['_',' '][randint(0,1)], ['_',''][randint(0,1)])
|
|
return f"\n" \
|
|
+f" {s}___ __________ {l} {r}\n" \
|
|
+f" {s}_____ / \\ \\__/\n" \
|
|
+f" {s}___ / ______ \\ / \\ contribution\n" \
|
|
+f" {s}____ / / __ \\ \\ / _/ {self.url_xxh_github}\n" \
|
|
+f" {s}__ ( / / / \\ \\ /\n" \
|
|
+f" \\ \\___/ / / / plugins\n" \
|
|
+f"{' ' if not t else ''} _{t}__\\ /__/ / {self.url_xxh_plugins_search}\n" \
|
|
+f"{' ' if not t else ''} / {'' if not t else ' '} \\________/ /\n" \
|
|
+f"{' ' if not t else ''} /_{t}__________________/\n" \
|
|
+f""
|
|
|
|
def main(self):
|
|
self.create_xxh_env()
|
|
|
|
argp = argparse.ArgumentParser(description=f"Your favorite shell wherever you go through the ssh.\n{self.d2F0Y2ggLW4uMiB4eGggLWg()}", formatter_class=RawTextHelpFormatter, prefix_chars='-+')
|
|
argp.add_argument('--version', '-V', action='version', version=f"xonssh-xxh/{self.local_xxh_version}")
|
|
argp.add_argument('-p', dest='ssh_port', help="Port to connect to on the remote host.")
|
|
argp.add_argument('-l', dest='ssh_login', help="Specifies the user to log in as on the remote machine.")
|
|
argp.add_argument('-i', dest='ssh_private_key', help="File from which the identity (private key) for public key authentication is read.")
|
|
argp.add_argument('-o', dest='ssh_options', metavar='SSH_OPTION -o ...', action='append', help="SSH options are described in ssh man page. Example: -o Port=22 -o User=snail")
|
|
argp.add_argument('+P','++password', help="Password for ssh auth.")
|
|
argp.add_argument('+PP','++password-prompt', default=False, action='store_true', help="Enter password manually using prompt.")
|
|
argp.add_argument('destination', metavar='[user@]host[:port]', help="Destination may be specified as [ssh://][user@]host[:port] or host from ~/.ssh/config")
|
|
argp.add_argument('+i','++install', default=False, action='store_true', help="Install xxh to destination host.")
|
|
argp.add_argument('+if','++install-force', default=False, action='store_true', help="Removing the host xxh package and install xxh again.")
|
|
argp.add_argument('+iff','++install-force-full', default=False, action='store_true', help="Removing the host xxh home and install xxh again. All installed packages on the host (e.g. pip packages) will be lost.")
|
|
argp.add_argument('+xc','++xxh-config', default=self.config_file, help=f"Xxh config file in yaml. Default: " + self.config_file)
|
|
argp.add_argument('+e','++env', dest='env', metavar='NAME=VAL ...', action='append', help="Setting environment variables")
|
|
argp.add_argument('+eb','++envb', dest='envb', metavar='NAME=BASE64 ...', action='append', help="Setting environment variables base64 encoded")
|
|
argp.add_argument('+E','++env-mode', action='store_true', help="Environment mode in case `source xxh.sh`")
|
|
argp.add_argument('+lh','++local-xxh-home', default=self.local_xxh_home, help=f"Local xxh home path. Default: {self.local_xxh_home}")
|
|
argp.add_argument('+hh','++host-xxh-home', default=self.host_xxh_home, help=f"Host xxh home path. Default: {self.host_xxh_home}")
|
|
argp.add_argument('+hhr','++host-xxh-home-remove', action='store_true', help=f"Remove xxh home on host after disconnect")
|
|
argp.add_argument('+hf','++host-execute-file', help=f"Execute script file placed on host and exit.")
|
|
argp.add_argument('+hc','++host-execute-command', help=f"Execute command on host and exit.")
|
|
argp.add_argument('+s','++shell', default=self.shell, help="Xxh shell: " + self.shells()['available_help'])
|
|
argp.add_argument('+ss','++shell-source', default=self.shell_source, help=f"(future) Custom source of xxh-shell: git url or local path")
|
|
argp.add_argument('+v','++verbose', default=False, action='store_true', help="Verbose mode.")
|
|
argp.add_argument('+vv','++vverbose', default=False, action='store_true', help="Super verbose mode.")
|
|
argp.add_argument('+q','++quiet', default=False, action='store_true', help="Quiet mode.")
|
|
argp.usage = "xxh <host from ~/.ssh/config>\n" \
|
|
+ "usage: xxh [ssh arguments] [user@]host[:port] [xxh arguments]\n" \
|
|
+ "usage: xxh [-p SSH_PORT] [-l SSH_LOGIN] [-i SSH_PRIVATE_KEY]\n" \
|
|
+ " [-o SSH_OPTION -o ...] [+P PASSWORD] [+PP]\n" \
|
|
+ " [user@]host[:port]\n" \
|
|
+ " [+i] [+if] [+iff] [+hhr] [+s SHELL] [+e NAME=VAL +e ...] [+v] [+vv] [+q]\n" \
|
|
+ " [+hh HOST_XXH_HOME] [+hf HOST_EXEC_FILE] [+hc HOST_EXEC_CMD]\n" \
|
|
+ " [+xc CONFIG_FILE] [+lh LOCAL_XXH_HOME] [-h] [-V]\n"
|
|
|
|
help = argp.format_help().replace('\n +','\n\nxxh arguments:\n +',1).replace('optional ', 'common ')\
|
|
.replace('number and exit', 'number and exit\n\nssh arguments:').replace('positional ', 'required ')
|
|
argp.format_help = lambda: help
|
|
opt = argp.parse_args()
|
|
|
|
self.quiet = opt.quiet
|
|
if not self.quiet:
|
|
self.verbose = opt.verbose
|
|
self.vverbose = opt.vverbose
|
|
|
|
self.url = url = self.parse_destination(opt.destination)
|
|
|
|
xxh_config_file = pf"{opt.xxh_config}"
|
|
|
|
if xxh_config_file:
|
|
if not xxh_config_file.exists():
|
|
if xxh_config_file != p'~/.xxh/.xxhc':
|
|
self.eeprint(f'Config does not exist: {xxh_config_file}')
|
|
else:
|
|
if self.verbose:
|
|
self.eprint(f'Load xxh config from {xxh_config_file}')
|
|
with open(xxh_config_file) as f:
|
|
xxh_config = yaml.safe_load(f)
|
|
|
|
if xxh_config and 'hosts' in xxh_config:
|
|
sys_args = sys.argv[1:]
|
|
conf_args = []
|
|
for h, hc in xxh_config['hosts'].items():
|
|
if re.match(h, url.hostname):
|
|
if self.verbose:
|
|
self.eprint('Load xxh config for host ' + h)
|
|
if hc and len(hc) > 0:
|
|
for k, v in hc.items():
|
|
conf_args += [k, v] if v is not None else [k]
|
|
if k in ['+P', '++password']:
|
|
current_user = getpass.getuser()
|
|
current_mode = oct(xxh_config_file.stat().st_mode)[-4:]
|
|
if xxh_config_file.owner() != current_user or current_mode != '0600':
|
|
self.eprint('\n\033[0;93mWARN! There is password in the config file but the file is too open!\n'
|
|
+ f'Run to restrict: chown {current_user}:{current_user} {xxh_config_file} && chmod 0600 {xxh_config_file}\033[0m\n')
|
|
args = conf_args + sys_args
|
|
if opt.verbose:
|
|
print('Final arguments list: ' + str(args))
|
|
opt = argp.parse_args(args)
|
|
|
|
self.verbose = opt.verbose
|
|
self.vverbose = opt.vverbose
|
|
|
|
if opt.shell in self.default_shells:
|
|
self.shell = self.default_shells[opt.shell]['xxh-shell']
|
|
self.shell_source = self.default_shells[opt.shell]['xxh-shell-source']
|
|
else:
|
|
self.shell = opt.shell
|
|
self.shell_source = opt.shell_source
|
|
|
|
short_shell_name = self.shell.split('-')[2]
|
|
|
|
username = getpass.getuser()
|
|
host = url.hostname
|
|
if not host:
|
|
self.eeprint(f"Wrong destination '{host}'")
|
|
if url.port:
|
|
opt.ssh_port = url.port
|
|
if url.username:
|
|
opt.ssh_login = url.username
|
|
if opt.ssh_login:
|
|
username = opt.ssh_login
|
|
|
|
self.ssh_arguments = ['-o', 'StrictHostKeyChecking=accept-new']
|
|
if not self.verbose:
|
|
self.ssh_arguments += ['-o', 'LogLevel=QUIET']
|
|
if opt.ssh_port:
|
|
self.ssh_arguments += ['-o', f'Port={opt.ssh_port}']
|
|
if opt.ssh_private_key:
|
|
self.ssh_arguments += ['-o', f'IdentityFile={opt.ssh_private_key}']
|
|
if opt.ssh_login:
|
|
self.ssh_arguments += ['-o', f'User={opt.ssh_login}']
|
|
if opt.ssh_options:
|
|
for ssh_option in opt.ssh_options:
|
|
self.ssh_arguments += ['-o', ssh_option]
|
|
|
|
if self.verbose:
|
|
self.eprint(f'ssh arguments: {self.ssh_arguments}')
|
|
|
|
if opt.password is not None:
|
|
self.password = opt.password
|
|
elif opt.password_prompt:
|
|
password = ''
|
|
while not password:
|
|
password = getpass.getpass(f"Enter {username}@{host}'s password: ")
|
|
self.password = password
|
|
|
|
env_args = self.prepare_env_args(opt.env)
|
|
env_args += self.prepare_env_args(opt.envb, to_base64=False)
|
|
|
|
opt.install = True if opt.install_force or opt.install_force_full else opt.install
|
|
|
|
self.local_xxh_home = pf"{opt.local_xxh_home}"
|
|
local_xxh_home_parent = self.local_xxh_home.parent
|
|
|
|
if self.local_xxh_home.exists():
|
|
if not os.access(self.local_xxh_home, os.W_OK):
|
|
self.eeprint(f"The local xxh home path isn't writable: {self.local_xxh_home}" )
|
|
elif local_xxh_home_parent.exists():
|
|
if not os.access(local_xxh_home_parent, os.W_OK):
|
|
self.eeprint(f"Parent for local xxh home path isn't writable: {local_xxh_home_parent}")
|
|
else:
|
|
self.eeprint(f"Paths aren't writable:\n {local_xxh_home_parent}\n {self.local_xxh_home}")
|
|
|
|
local_plugins_dir = self.local_xxh_home / 'xxh/plugins'
|
|
mkdir @(self.ssh_arg_v) -p @(self.local_xxh_home) @(local_plugins_dir) @(self.local_xxh_home / 'xxh/shells')
|
|
|
|
# Fix env to avoid ssh warnings
|
|
for lc in ['LC_TIME','LC_MONETARY','LC_ADDRESS','LC_IDENTIFICATION','LC_MEASUREMENT','LC_NAME','LC_NUMERIC','LC_PAPER','LC_TELEPHONE']:
|
|
${...}[lc] = "POSIX"
|
|
|
|
if pf'{opt.host_xxh_home}' == pf'/':
|
|
self.eeprint("Host xxh home path {host_xxh_home} looks like /. Please check twice!")
|
|
|
|
self.host_xxh_home = opt.host_xxh_home
|
|
host_info = self.get_host_info()
|
|
|
|
if not host_info:
|
|
self.eeprint(f'Unknown answer from host when getting info')
|
|
|
|
if 'xxh_home_realpath' not in host_info or host_info['xxh_home_realpath'] == '':
|
|
self.eeprint(f'Unknown answer from host when getting realpath for directory {host_xxh_home}')
|
|
|
|
if 'xxh_version' not in host_info or host_info['xxh_version'] == '':
|
|
self.eeprint(f'Unknown answer from host when getting version for directory {host_xxh_home}')
|
|
|
|
host_xxh_home = host_info['xxh_home_realpath']
|
|
host_xxh_home = pf"{host_xxh_home}"
|
|
host_xxh_version = host_info['xxh_version']
|
|
|
|
if host_info['xxh_home_writable'] == '0' and host_info['xxh_parent_home_writable'] == '0':
|
|
yn = input(f"{host}:{host_xxh_home} is not writable. Continue? [y/n] ").strip().lower()
|
|
if yn != 'y':
|
|
self.eeprint('Stopped')
|
|
|
|
if host_info['scp'] == '' and host_info['rsync'] == '':
|
|
self.eeprint(f"There are no rsync or scp on target host. Sad but files can't be uploaded.")
|
|
|
|
if opt.install_force == False and opt.install_force_full == False:
|
|
# Check version
|
|
ask = False
|
|
if host_xxh_version == 'version_not_found':
|
|
ask = f'Host xxh home is not empty but something went wrong while getting host xxh version.'
|
|
elif host_xxh_version not in ['dir_not_found','dir_empty'] and host_xxh_version != self.local_xxh_version:
|
|
ask = f"Local xxh version '{self.local_xxh_version}' is not equal host xxh version '{host_xxh_version}'."
|
|
|
|
if ask:
|
|
choice = input(f"{ask} What's next? \n"
|
|
+ " s - Stop here. You'll try to connect using ordinary ssh for backup current xxh home.\n"
|
|
+ " u - Safe update. Host xxh home will be renamed and local xxh version will be installed.\n"
|
|
+ " f - [default] Force reinstall xxh. Installed packages (e.g. pip) will be saved.\n"
|
|
+ " ff - Force full reinstall on host. Installed packages (e.g. pip) will be lost.\n"
|
|
+ " i - Ignore, cross fingers and continue the connection.\n"
|
|
+ "s/u/F/i? ").lower()
|
|
|
|
if choice == 's':
|
|
print('Stopped')
|
|
exit(0)
|
|
elif choice == 'u':
|
|
local_time = datetime.datetime.now().isoformat()[:19]
|
|
self.eprint(f"Move {host}:{host_xxh_home} to {host}:{host_xxh_home}-{local_time}")
|
|
echo @(f"mv {host_xxh_home} {host_xxh_home}-{local_time}") | @(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -T "bash -s"
|
|
opt.install = True
|
|
elif choice == 'f' or choice.strip() == '':
|
|
opt.install = True
|
|
opt.install_force = True
|
|
elif choice == 'ff':
|
|
opt.install = True
|
|
opt.install_force_full = True
|
|
elif choice == 'i':
|
|
pass
|
|
else:
|
|
self.eeprint('Unknown answer')
|
|
|
|
if (host_xxh_version in ['dir_not_found','dir_empty'] or host_info['xxh_shell_exists'] == '0') and opt.install_force == False and opt.install_force_full == False:
|
|
yn = input(f"{host}:{host_xxh_home}/xxh/shells/{self.shell} not found. Install {self.shell}? [Y/n] ").strip().lower()
|
|
if yn == 'y' or yn == '':
|
|
opt.install = True
|
|
else:
|
|
self.eeprint('Unknown answer')
|
|
|
|
if opt.install:
|
|
self.eprint("\033[0;33m", end='')
|
|
|
|
self.eprint(f"Install {self.shell} to {host}:{host_xxh_home}" )
|
|
|
|
shells_dir = self.local_xxh_home / 'xxh/shells'
|
|
shells = sorted(shells_dir.glob('*'))
|
|
|
|
shell_dir = shells_dir / f'{self.shell}'
|
|
if not shell_dir.exists():
|
|
self.eprint(f'First time download {self.shell} shell from {self.shell_source}')
|
|
if self.shell_source[:6] in ['http:/', 'https:'] and 'git' in self.shell_source:
|
|
git clone -q --depth 1 @(self.shell_source) @(shells_dir / self.shell)
|
|
elif fp'{self.shell_source}'.exists():
|
|
cp -r @(self.shell_source) @(shells_dir)
|
|
else:
|
|
self.eeprint(f'Unknown shell source: {self.shell_source}')
|
|
|
|
shell_build_dir = shell_dir / 'build'
|
|
if not shell_build_dir.exists():
|
|
self.eprint(f"First time build {self.shell}")
|
|
xonsh @(shell_build_dir.parent / 'build.xsh')
|
|
|
|
if opt.install_force:
|
|
self.eprint(f'Remove {host}:{host_xxh_home}/xxh')
|
|
echo @(f"rm -rf {host_xxh_home}/xxh") | @(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -T "bash -s"
|
|
|
|
if opt.install_force_full:
|
|
self.eprint(f'Remove {host}:{host_xxh_home}')
|
|
echo @(f"rm -rf {host_xxh_home}") | @(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -T "bash -s"
|
|
|
|
host_xxh_plugins_dir = host_xxh_home / 'xxh/plugins'
|
|
host_xxh_dirs_str = ''
|
|
for local_plugin_dir in local_plugins_dir.glob(f'*-{short_shell_name}-*'):
|
|
local_plugin_build_file = local_plugin_dir / 'build.xsh'
|
|
local_plugin_build_dir = local_plugin_dir / 'build'
|
|
if not local_plugin_build_dir.exists():
|
|
if not local_plugin_build_file.exists():
|
|
self.eeprint(f'Please update {local_plugin_dir.name} and {self.shell}. Build file not found: {local_plugin_build_file}')
|
|
|
|
self.eprint(f'First time build {local_plugin_dir}')
|
|
@(local_plugin_build_file) 1>&2
|
|
host_xxh_dirs_str += ' ' + str(fp'{str(local_plugin_build_dir).replace(str(self.local_xxh_home), str(host_xxh_home))}')
|
|
|
|
host_xxh_package_dir = host_xxh_home / 'xxh/package'
|
|
host_xxh_shell_dir = host_xxh_home / f'xxh/shells/{self.shell}'
|
|
host_xxh_shell_build_dir = host_xxh_shell_dir / 'build'
|
|
echo @(f"mkdir -p {host_xxh_package_dir} {host_xxh_shell_build_dir} {host_xxh_dirs_str}") | @(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -T "bash -s"
|
|
|
|
arg_q = ['-q'] if self.quiet else []
|
|
if which('rsync') and host_info['rsync']:
|
|
self.eprint('Upload using rsync')
|
|
rsync @(self.ssh_arg_v) -e @(f"{''.join(self.sshpass)} ssh {'' if self.ssh_arg_v == [] else '-v'} {' '.join(self.ssh_arguments)}") @(arg_q) -az --info=progress2 --cvs-exclude @(self.package_dir_path)/settings.py @(host):@(host_xxh_package_dir)/ 1>&2
|
|
rsync @(self.ssh_arg_v) -e @(f"{''.join(self.sshpass)} ssh {'' if self.ssh_arg_v == [] else '-v'} {' '.join(self.ssh_arguments)}") @(arg_q) -az --info=progress2 --cvs-exclude @(shell_build_dir)/ @(host):@(host_xxh_shell_build_dir)/ 1>&2
|
|
for local_plugin_dir in local_plugins_dir.glob(f'*-{short_shell_name}-*'):
|
|
local_plugin_build_dir = local_plugin_dir/'build'
|
|
local_plugin_name = local_plugin_dir.name
|
|
rsync @(self.ssh_arg_v) -e @(f"{''.join(self.sshpass)} ssh {'' if self.ssh_arg_v == [] else '-v'} {' '.join(self.ssh_arguments)}") @(arg_q) -az --info=progress2 --cvs-exclude @(local_plugin_build_dir)/* @(host):@(host_xxh_plugins_dir)/@(local_plugin_name)/build/ 1>&2
|
|
elif which('scp') and host_info['scp']:
|
|
self.eprint("Upload using scp. Note: install rsync on local and remote host to increase speed.")
|
|
@(self.sshpass) scp @(self.ssh_arg_v) @(self.ssh_arguments) -r -C @(arg_q) @(self.package_dir_path)/settings.py @(f"{host}:{host_xxh_package_dir}/") 1>&2
|
|
@(self.sshpass) scp @(self.ssh_arg_v) @(self.ssh_arguments) -r -C @(arg_q) @(shell_build_dir) @(f"{host}:{host_xxh_shell_dir}/") 1>&2
|
|
for local_plugin_dir in local_plugins_dir.glob(f'*-{short_shell_name}-*'):
|
|
local_plugin_build_dir = local_plugin_dir/'build'
|
|
local_plugin_name = local_plugin_dir.name
|
|
@(self.sshpass) scp @(self.ssh_arg_v) @(self.ssh_arguments) -r -C @([] if self.vverbose else ['-q']) @(local_plugin_build_dir)/* @(f"{host}:{host_xxh_plugins_dir}/{local_plugin_name}/build/") 1>&2
|
|
else:
|
|
self.eprint('Please install rsync or scp!')
|
|
|
|
self.eprint(f'First run {self.shell} on {host}\033[0m')
|
|
|
|
host_execute_file = host_execute_command = []
|
|
if opt.host_execute_file:
|
|
host_execute_file = ['-f', opt.host_execute_file]
|
|
elif opt.host_execute_command:
|
|
host_execute_command = ['-c', '"%s"' % opt.host_execute_command.replace('"', '\\"')]
|
|
|
|
host_entrypoint_verbose = []
|
|
if self.vverbose:
|
|
host_entrypoint_verbose = ['-v', '2']
|
|
elif self.verbose:
|
|
host_entrypoint_verbose = ['-v', '1']
|
|
|
|
if opt.env_mode:
|
|
# TODO: This PoC code should be in `xxh.zsh`. Every shell should has own `xxh.shell`
|
|
# which should be "sourced". While "sourcing" the script should prepare
|
|
# and pass `-e` arguments to xxh.
|
|
xxh_env = self.get_xxh_env()
|
|
if xxh_env:
|
|
host_xxh_plugins_dir = host_xxh_home / 'xxh/plugins'
|
|
for local_plugin_dir in local_plugins_dir.glob(f'*-{short_shell_name}-*'):
|
|
local_plugin_env = local_plugin_dir / 'env'
|
|
if local_plugin_env.exists():
|
|
with open(local_plugin_env) as f:
|
|
plugin_envs = f.read().split('\n')
|
|
for e in plugin_envs:
|
|
if e in xxh_env:
|
|
if self.vverbose:
|
|
self.eprint(f'Plugin {local_plugin_dir.name} environment: {e}={xxh_env[e]}')
|
|
env_args += ['-e', "%s=%s" % ( e, self.b64e(xxh_env[e]) ) ]
|
|
|
|
@(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -t bash @(str(host_xxh_home/'xxh/shells'/self.shell/'build/entrypoint.sh')) @(host_execute_file) @(host_execute_command) @(host_entrypoint_verbose) @(env_args)
|
|
|
|
if opt.host_xxh_home_remove:
|
|
if self.verbose:
|
|
self.eprint(f'Remove {host}:{host_xxh_home}')
|
|
echo @(f"rm -rf {host_xxh_home}") | @(self.sshpass) ssh @(self.ssh_arg_v) @(self.ssh_arguments) @(host) -T "bash -s"
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if os.name == 'nt':
|
|
self.eeprint(f"Windows is not supported. WSL1 is not recommended also. WSL2 is not tested yet.\nContribution: {self.url_xxh_github}")
|
|
if not which('ssh'):
|
|
self.eeprint('Install OpenSSH client before using xxh: https://duckduckgo.com/?q=how+to+install+openssh+client+in+linux')
|
|
|
|
try:
|
|
this_file = os.readlink(__file__)
|
|
except:
|
|
this_file = __file__
|
|
sys.path.append(str(pf"{this_file}".absolute().parent))
|
|
|
|
import xonssh_xxh
|
|
from xonssh_xxh.settings import global_settings
|
|
|
|
xxh = Xxh(package_dir_path=pf"{xonssh_xxh.__file__}".parent, version=global_settings['XXH_VERSION'])
|
|
xxh.main() |