add mypy type hints

This commit is contained in:
Nick Sweeting 2019-03-26 03:20:41 -04:00
parent 58c9b47d43
commit 63abc8980c
3 changed files with 171 additions and 143 deletions

View file

@ -1,5 +1,7 @@
import os import os
import json
from typing import Union, Dict, List, Tuple, NamedTuple
from collections import defaultdict from collections import defaultdict
from datetime import datetime from datetime import datetime
@ -40,13 +42,15 @@ from util import (
without_query, without_query,
without_fragment, without_fragment,
fetch_page_title, fetch_page_title,
read_js_script,
is_static_file, is_static_file,
TimedProgress, TimedProgress,
chmod_file, chmod_file,
wget_output_path, wget_output_path,
chrome_args, chrome_args,
check_link_structure, check_link_structure,
run, PIPE, DEVNULL run, PIPE, DEVNULL,
Link,
) )
from logs import ( from logs import (
log_link_archiving_started, log_link_archiving_started,
@ -55,15 +59,22 @@ from logs import (
log_archive_method_finished, log_archive_method_finished,
) )
class ArchiveError(Exception): class ArchiveError(Exception):
def __init__(self, message, hints=None): def __init__(self, message, hints=None):
super().__init__(message) super().__init__(message)
self.hints = hints self.hints = hints
class ArchiveResult(NamedTuple):
cmd: List[str]
pwd: str
output: Union[str, Exception, None]
status: str
start_ts: datetime
end_ts: datetime
duration: int
def archive_link(link_dir, link):
def archive_link(link_dir: str, link: Link, page=None) -> Link:
"""download the DOM, PDF, and a screenshot into a folder named after the link's timestamp""" """download the DOM, PDF, and a screenshot into a folder named after the link's timestamp"""
ARCHIVE_METHODS = ( ARCHIVE_METHODS = (
@ -95,10 +106,11 @@ def archive_link(link_dir, link):
log_archive_method_started(method_name) log_archive_method_started(method_name)
result = method_function(link_dir, link) result = method_function(link_dir, link)
link['history'][method_name].append(result)
stats[result['status']] += 1 link['history'][method_name].append(result._asdict())
log_archive_method_finished(result)
stats[result.status] += 1
log_archive_method_finished(result._asdict())
else: else:
stats['skipped'] += 1 stats['skipped'] += 1
@ -117,7 +129,7 @@ def archive_link(link_dir, link):
### Archive Method Functions ### Archive Method Functions
def should_fetch_title(link_dir, link): def should_fetch_title(link_dir: str, link: Link) -> bool:
# if link already has valid title, skip it # if link already has valid title, skip it
if link['title'] and not link['title'].lower().startswith('http'): if link['title'] and not link['title'].lower().startswith('http'):
return False return False
@ -127,7 +139,7 @@ def should_fetch_title(link_dir, link):
return FETCH_TITLE return FETCH_TITLE
def fetch_title(link_dir, link, timeout=TIMEOUT): def fetch_title(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
"""try to guess the page's title from its content""" """try to guess the page's title from its content"""
output = None output = None
@ -150,22 +162,22 @@ def fetch_title(link_dir, link, timeout=TIMEOUT):
finally: finally:
timer.end() timer.end()
return { return ArchiveResult(
'cmd': cmd, cmd=cmd,
'pwd': link_dir, pwd=link_dir,
'output': output, output=output,
'status': status, status=status,
**timer.stats, **timer.stats,
} )
def should_fetch_favicon(link_dir, link): def should_fetch_favicon(link_dir: str, link: Link) -> bool:
if os.path.exists(os.path.join(link_dir, 'favicon.ico')): if os.path.exists(os.path.join(link_dir, 'favicon.ico')):
return False return False
return FETCH_FAVICON return FETCH_FAVICON
def fetch_favicon(link_dir, link, timeout=TIMEOUT): def fetch_favicon(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
"""download site favicon from google's favicon api""" """download site favicon from google's favicon api"""
output = 'favicon.ico' output = 'favicon.ico'
@ -188,15 +200,15 @@ def fetch_favicon(link_dir, link, timeout=TIMEOUT):
finally: finally:
timer.end() timer.end()
return { return ArchiveResult(
'cmd': cmd, cmd=cmd,
'pwd': link_dir, pwd=link_dir,
'output': output, output=output,
'status': status, status=status,
**timer.stats, **timer.stats,
} )
def should_fetch_wget(link_dir, link): def should_fetch_wget(link_dir: str, link: Link) -> bool:
output_path = wget_output_path(link) output_path = wget_output_path(link)
if output_path and os.path.exists(os.path.join(link_dir, output_path)): if output_path and os.path.exists(os.path.join(link_dir, output_path)):
return False return False
@ -204,7 +216,7 @@ def should_fetch_wget(link_dir, link):
return FETCH_WGET return FETCH_WGET
def fetch_wget(link_dir, link, timeout=TIMEOUT): def fetch_wget(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
"""download full site using wget""" """download full site using wget"""
if FETCH_WARC: if FETCH_WARC:
@ -274,15 +286,15 @@ def fetch_wget(link_dir, link, timeout=TIMEOUT):
finally: finally:
timer.end() timer.end()
return { return ArchiveResult(
'cmd': cmd, cmd=cmd,
'pwd': link_dir, pwd=link_dir,
'output': output, output=output,
'status': status, status=status,
**timer.stats, **timer.stats,
} )
def should_fetch_pdf(link_dir, link): def should_fetch_pdf(link_dir: str, link: Link) -> bool:
if is_static_file(link['url']): if is_static_file(link['url']):
return False return False
@ -292,7 +304,7 @@ def should_fetch_pdf(link_dir, link):
return FETCH_PDF return FETCH_PDF
def fetch_pdf(link_dir, link, timeout=TIMEOUT): def fetch_pdf(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
"""print PDF of site to file using chrome --headless""" """print PDF of site to file using chrome --headless"""
output = 'output.pdf' output = 'output.pdf'
@ -317,15 +329,15 @@ def fetch_pdf(link_dir, link, timeout=TIMEOUT):
finally: finally:
timer.end() timer.end()
return { return ArchiveResult(
'cmd': cmd, cmd=cmd,
'pwd': link_dir, pwd=link_dir,
'output': output, output=output,
'status': status, status=status,
**timer.stats, **timer.stats,
} )
def should_fetch_screenshot(link_dir, link): def should_fetch_screenshot(link_dir: str, link: Link) -> bool:
if is_static_file(link['url']): if is_static_file(link['url']):
return False return False
@ -334,7 +346,7 @@ def should_fetch_screenshot(link_dir, link):
return FETCH_SCREENSHOT return FETCH_SCREENSHOT
def fetch_screenshot(link_dir, link, timeout=TIMEOUT): def fetch_screenshot(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
"""take screenshot of site using chrome --headless""" """take screenshot of site using chrome --headless"""
output = 'screenshot.png' output = 'screenshot.png'
@ -359,15 +371,15 @@ def fetch_screenshot(link_dir, link, timeout=TIMEOUT):
finally: finally:
timer.end() timer.end()
return { return ArchiveResult(
'cmd': cmd, cmd=cmd,
'pwd': link_dir, pwd=link_dir,
'output': output, output=output,
'status': status, status=status,
**timer.stats, **timer.stats,
} )
def should_fetch_dom(link_dir, link): def should_fetch_dom(link_dir: str, link: Link) -> bool:
if is_static_file(link['url']): if is_static_file(link['url']):
return False return False
@ -376,7 +388,7 @@ def should_fetch_dom(link_dir, link):
return FETCH_DOM return FETCH_DOM
def fetch_dom(link_dir, link, timeout=TIMEOUT): def fetch_dom(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
"""print HTML of site to file using chrome --dump-html""" """print HTML of site to file using chrome --dump-html"""
output = 'output.html' output = 'output.html'
@ -403,15 +415,15 @@ def fetch_dom(link_dir, link, timeout=TIMEOUT):
finally: finally:
timer.end() timer.end()
return { return ArchiveResult(
'cmd': cmd, cmd=cmd,
'pwd': link_dir, pwd=link_dir,
'output': output, output=output,
'status': status, status=status,
**timer.stats, **timer.stats,
} )
def should_fetch_git(link_dir, link): def should_fetch_git(link_dir: str, link: Link) -> bool:
if is_static_file(link['url']): if is_static_file(link['url']):
return False return False
@ -428,7 +440,7 @@ def should_fetch_git(link_dir, link):
return FETCH_GIT return FETCH_GIT
def fetch_git(link_dir, link, timeout=TIMEOUT): def fetch_git(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
"""download full site using git""" """download full site using git"""
output = 'git' output = 'git'
@ -460,16 +472,16 @@ def fetch_git(link_dir, link, timeout=TIMEOUT):
finally: finally:
timer.end() timer.end()
return { return ArchiveResult(
'cmd': cmd, cmd=cmd,
'pwd': link_dir, pwd=link_dir,
'output': output, output=output,
'status': status, status=status,
**timer.stats, **timer.stats,
} )
def should_fetch_media(link_dir, link): def should_fetch_media(link_dir: str, link: Link) -> bool:
if is_static_file(link['url']): if is_static_file(link['url']):
return False return False
@ -478,7 +490,7 @@ def should_fetch_media(link_dir, link):
return FETCH_MEDIA return FETCH_MEDIA
def fetch_media(link_dir, link, timeout=MEDIA_TIMEOUT): def fetch_media(link_dir: str, link: Link, timeout: int=MEDIA_TIMEOUT) -> ArchiveResult:
"""Download playlists or individual video, audio, and subtitles using youtube-dl""" """Download playlists or individual video, audio, and subtitles using youtube-dl"""
output = 'media' output = 'media'
@ -531,16 +543,16 @@ def fetch_media(link_dir, link, timeout=MEDIA_TIMEOUT):
finally: finally:
timer.end() timer.end()
return { return ArchiveResult(
'cmd': cmd, cmd=cmd,
'pwd': link_dir, pwd=link_dir,
'output': output, output=output,
'status': status, status=status,
**timer.stats, **timer.stats,
} )
def should_fetch_archive_dot_org(link_dir, link): def should_fetch_archive_dot_org(link_dir: str, link: Link) -> bool:
if is_static_file(link['url']): if is_static_file(link['url']):
return False return False
@ -550,7 +562,7 @@ def should_fetch_archive_dot_org(link_dir, link):
return SUBMIT_ARCHIVE_DOT_ORG return SUBMIT_ARCHIVE_DOT_ORG
def archive_dot_org(link_dir, link, timeout=TIMEOUT): def archive_dot_org(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
"""submit site to archive.org for archiving via their service, save returned archive url""" """submit site to archive.org for archiving via their service, save returned archive url"""
output = 'archive.org.txt' output = 'archive.org.txt'
@ -596,17 +608,17 @@ def archive_dot_org(link_dir, link, timeout=TIMEOUT):
chmod_file('archive.org.txt', cwd=link_dir) chmod_file('archive.org.txt', cwd=link_dir)
output = archive_org_url output = archive_org_url
return { return ArchiveResult(
'cmd': cmd, cmd=cmd,
'pwd': link_dir, pwd=link_dir,
'output': output, output=output,
'status': status, status=status,
**timer.stats, **timer.stats,
} )
def parse_archive_dot_org_response(response): def parse_archive_dot_org_response(response: bytes) -> Tuple[List[str], List[str]]:
# Parse archive.org response headers # Parse archive.org response headers
headers = defaultdict(list) headers: Dict[str, List[str]] = defaultdict(list)
# lowercase all the header names and store in dict # lowercase all the header names and store in dict
for header in response.splitlines(): for header in response.splitlines():

View file

@ -3,6 +3,8 @@ import json
from datetime import datetime from datetime import datetime
from string import Template from string import Template
from typing import List, Tuple
try: try:
from distutils.dir_util import copy_tree from distutils.dir_util import copy_tree
except ImportError: except ImportError:
@ -23,6 +25,7 @@ from util import (
check_links_structure, check_links_structure,
wget_output_path, wget_output_path,
latest_output, latest_output,
Link,
) )
from parse import parse_links from parse import parse_links
from links import validate_links from links import validate_links
@ -39,7 +42,7 @@ TITLE_LOADING_MSG = 'Not yet archived...'
### Homepage index for all the links ### Homepage index for all the links
def write_links_index(out_dir, links, finished=False): def write_links_index(out_dir: str, links: List[Link], finished: bool=False) -> None:
"""create index.html file for a given list of links""" """create index.html file for a given list of links"""
log_indexing_process_started() log_indexing_process_started()
@ -53,15 +56,15 @@ def write_links_index(out_dir, links, finished=False):
write_html_links_index(out_dir, links, finished=finished) write_html_links_index(out_dir, links, finished=finished)
log_indexing_finished(out_dir, 'index.html') log_indexing_finished(out_dir, 'index.html')
def load_links_index(out_dir=OUTPUT_DIR, import_path=None): def load_links_index(out_dir: str=OUTPUT_DIR, import_path: str=None) -> Tuple[List[Link], List[Link]]:
"""parse and load existing index with any new links from import_path merged in""" """parse and load existing index with any new links from import_path merged in"""
existing_links = [] existing_links: List[Link] = []
if out_dir: if out_dir:
existing_links = parse_json_links_index(out_dir) existing_links = parse_json_links_index(out_dir)
check_links_structure(existing_links) check_links_structure(existing_links)
new_links = [] new_links: List[Link] = []
if import_path: if import_path:
# parse and validate the import file # parse and validate the import file
log_parsing_started(import_path) log_parsing_started(import_path)
@ -79,7 +82,7 @@ def load_links_index(out_dir=OUTPUT_DIR, import_path=None):
return all_links, new_links return all_links, new_links
def write_json_links_index(out_dir, links): def write_json_links_index(out_dir: str, links: List[Link]) -> None:
"""write the json link index to a given path""" """write the json link index to a given path"""
check_links_structure(links) check_links_structure(links)
@ -100,7 +103,7 @@ def write_json_links_index(out_dir, links):
chmod_file(path) chmod_file(path)
def parse_json_links_index(out_dir=OUTPUT_DIR): def parse_json_links_index(out_dir: str=OUTPUT_DIR) -> List[Link]:
"""parse a archive index json file and return the list of links""" """parse a archive index json file and return the list of links"""
index_path = os.path.join(out_dir, 'index.json') index_path = os.path.join(out_dir, 'index.json')
if os.path.exists(index_path): if os.path.exists(index_path):
@ -111,7 +114,7 @@ def parse_json_links_index(out_dir=OUTPUT_DIR):
return [] return []
def write_html_links_index(out_dir, links, finished=False): def write_html_links_index(out_dir: str, links: List[Link], finished: bool=False) -> None:
"""write the html link index to a given path""" """write the html link index to a given path"""
check_links_structure(links) check_links_structure(links)
@ -166,7 +169,7 @@ def write_html_links_index(out_dir, links, finished=False):
chmod_file(path) chmod_file(path)
def patch_links_index(link, out_dir=OUTPUT_DIR): def patch_links_index(link: Link, out_dir: str=OUTPUT_DIR) -> None:
"""hack to in-place update one row's info in the generated index html""" """hack to in-place update one row's info in the generated index html"""
title = link['title'] or latest_output(link)['title'] title = link['title'] or latest_output(link)['title']
@ -200,12 +203,12 @@ def patch_links_index(link, out_dir=OUTPUT_DIR):
### Individual link index ### Individual link index
def write_link_index(out_dir, link): def write_link_index(out_dir: str, link: Link) -> None:
link['updated'] = str(datetime.now().timestamp()) link['updated'] = str(datetime.now().timestamp())
write_json_link_index(out_dir, link) write_json_link_index(out_dir, link)
write_html_link_index(out_dir, link) write_html_link_index(out_dir, link)
def write_json_link_index(out_dir, link): def write_json_link_index(out_dir: str, link: Link) -> None:
"""write a json file with some info about the link""" """write a json file with some info about the link"""
check_link_structure(link) check_link_structure(link)
@ -216,7 +219,7 @@ def write_json_link_index(out_dir, link):
chmod_file(path) chmod_file(path)
def parse_json_link_index(out_dir): def parse_json_link_index(out_dir: str) -> dict:
"""load the json link index from a given directory""" """load the json link index from a given directory"""
existing_index = os.path.join(out_dir, 'index.json') existing_index = os.path.join(out_dir, 'index.json')
if os.path.exists(existing_index): if os.path.exists(existing_index):
@ -226,7 +229,7 @@ def parse_json_link_index(out_dir):
return link_json return link_json
return {} return {}
def load_json_link_index(out_dir, link): def load_json_link_index(out_dir: str, link: Link) -> Link:
"""check for an existing link archive in the given directory, """check for an existing link archive in the given directory,
and load+merge it into the given link dict and load+merge it into the given link dict
""" """
@ -241,7 +244,7 @@ def load_json_link_index(out_dir, link):
check_link_structure(link) check_link_structure(link)
return link return link
def write_html_link_index(out_dir, link): def write_html_link_index(out_dir: str, link: Link) -> None:
check_link_structure(link) check_link_structure(link)
with open(os.path.join(TEMPLATES_DIR, 'link_index.html'), 'r', encoding='utf-8') as f: with open(os.path.join(TEMPLATES_DIR, 'link_index.html'), 'r', encoding='utf-8') as f:
link_html = f.read() link_html = f.read()

View file

@ -3,6 +3,8 @@ import re
import sys import sys
import time import time
from typing import List, Dict, Any, Optional, Union
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
from urllib.parse import urlparse, quote from urllib.parse import urlparse, quote
from decimal import Decimal from decimal import Decimal
@ -30,6 +32,7 @@ from config import (
CHECK_SSL_VALIDITY, CHECK_SSL_VALIDITY,
WGET_USER_AGENT, WGET_USER_AGENT,
CHROME_OPTIONS, CHROME_OPTIONS,
PYTHON_PATH,
) )
from logs import pretty_path from logs import pretty_path
@ -86,9 +89,11 @@ STATICFILE_EXTENSIONS = {
# html, htm, shtml, xhtml, xml, aspx, php, cgi # html, htm, shtml, xhtml, xml, aspx, php, cgi
} }
Link = Dict[str, Any]
### Checks & Tests ### Checks & Tests
def check_link_structure(link): def check_link_structure(link: Link) -> None:
"""basic sanity check invariants to make sure the data is valid""" """basic sanity check invariants to make sure the data is valid"""
assert isinstance(link, dict) assert isinstance(link, dict)
assert isinstance(link.get('url'), str) assert isinstance(link.get('url'), str)
@ -100,13 +105,13 @@ def check_link_structure(link):
assert isinstance(key, str) assert isinstance(key, str)
assert isinstance(val, list), 'history must be a Dict[str, List], got: {}'.format(link['history']) assert isinstance(val, list), 'history must be a Dict[str, List], got: {}'.format(link['history'])
def check_links_structure(links): def check_links_structure(links: List[Link]) -> None:
"""basic sanity check invariants to make sure the data is valid""" """basic sanity check invariants to make sure the data is valid"""
assert isinstance(links, list) assert isinstance(links, list)
if links: if links:
check_link_structure(links[0]) check_link_structure(links[0])
def check_url_parsing_invariants(): def check_url_parsing_invariants() -> None:
"""Check that plain text regex URL parsing works as expected""" """Check that plain text regex URL parsing works as expected"""
# this is last-line-of-defense to make sure the URL_REGEX isn't # this is last-line-of-defense to make sure the URL_REGEX isn't
@ -137,7 +142,7 @@ def check_url_parsing_invariants():
### Random Helpers ### Random Helpers
def save_stdin_source(raw_text): def save_stdin_source(raw_text: str) -> str:
if not os.path.exists(SOURCES_DIR): if not os.path.exists(SOURCES_DIR):
os.makedirs(SOURCES_DIR) os.makedirs(SOURCES_DIR)
@ -150,7 +155,7 @@ def save_stdin_source(raw_text):
return source_path return source_path
def save_remote_source(url, timeout=TIMEOUT): def save_remote_source(url: str, timeout: int=TIMEOUT) -> str:
"""download a given url's content into output/sources/domain-<timestamp>.txt""" """download a given url's content into output/sources/domain-<timestamp>.txt"""
if not os.path.exists(SOURCES_DIR): if not os.path.exists(SOURCES_DIR):
@ -187,7 +192,7 @@ def save_remote_source(url, timeout=TIMEOUT):
return source_path return source_path
def fetch_page_title(url, timeout=10, progress=SHOW_PROGRESS): def fetch_page_title(url: str, timeout: int=10, progress: bool=SHOW_PROGRESS) -> Optional[str]:
"""Attempt to guess a page's title by downloading the html""" """Attempt to guess a page's title by downloading the html"""
if not FETCH_TITLE: if not FETCH_TITLE:
@ -209,7 +214,7 @@ def fetch_page_title(url, timeout=10, progress=SHOW_PROGRESS):
# )) # ))
return None return None
def wget_output_path(link): def wget_output_path(link: Link) -> Optional[str]:
"""calculate the path to the wgetted .html file, since wget may """calculate the path to the wgetted .html file, since wget may
adjust some paths to be different than the base_url path. adjust some paths to be different than the base_url path.
@ -278,9 +283,15 @@ def wget_output_path(link):
return None return None
def read_js_script(script_name: str) -> str:
script_path = os.path.join(PYTHON_PATH, 'scripts', script_name)
with open(script_path, 'r') as f:
return f.read().split('// INFO BELOW HERE')[0].strip()
### String Manipulation & Logging Helpers ### String Manipulation & Logging Helpers
def str_between(string, start, end=None): def str_between(string: str, start: str, end: str=None) -> str:
"""(<abc>12345</def>, <abc>, </def>) -> 12345""" """(<abc>12345</def>, <abc>, </def>) -> 12345"""
content = string.split(start, 1)[-1] content = string.split(start, 1)[-1]
@ -292,7 +303,7 @@ def str_between(string, start, end=None):
### Link Helpers ### Link Helpers
def merge_links(a, b): def merge_links(a: Link, b: Link) -> Link:
"""deterministially merge two links, favoring longer field values over shorter, """deterministially merge two links, favoring longer field values over shorter,
and "cleaner" values over worse ones. and "cleaner" values over worse ones.
""" """
@ -310,7 +321,7 @@ def merge_links(a, b):
'sources': list(set(a.get('sources', []) + b.get('sources', []))), 'sources': list(set(a.get('sources', []) + b.get('sources', []))),
} }
def is_static_file(url): def is_static_file(url: str) -> bool:
"""Certain URLs just point to a single static file, and """Certain URLs just point to a single static file, and
don't need to be re-archived in many formats don't need to be re-archived in many formats
""" """
@ -318,7 +329,7 @@ def is_static_file(url):
# TODO: the proper way is with MIME type detection, not using extension # TODO: the proper way is with MIME type detection, not using extension
return extension(url) in STATICFILE_EXTENSIONS return extension(url) in STATICFILE_EXTENSIONS
def derived_link_info(link): def derived_link_info(link: Link) -> dict:
"""extend link info with the archive urls and other derived data""" """extend link info with the archive urls and other derived data"""
url = link['url'] url = link['url']
@ -373,7 +384,7 @@ def derived_link_info(link):
return extended_info return extended_info
def latest_output(link, status=None): def latest_output(link: Link, status: str=None) -> Dict[str, Optional[str]]:
"""get the latest output that each archive method produced for link""" """get the latest output that each archive method produced for link"""
latest = { latest = {
@ -440,7 +451,42 @@ def run(*popenargs, input=None, capture_output=False, timeout=None, check=False,
return CompletedProcess(process.args, retcode, stdout, stderr) return CompletedProcess(process.args, retcode, stdout, stderr)
def progress_bar(seconds, prefix): class TimedProgress:
"""Show a progress bar and measure elapsed time until .end() is called"""
def __init__(self, seconds, prefix=''):
if SHOW_PROGRESS:
self.p = Process(target=progress_bar, args=(seconds, prefix))
self.p.start()
self.stats = {
'start_ts': datetime.now(),
'end_ts': None,
'duration': None,
}
def end(self):
"""immediately end progress, clear the progressbar line, and save end_ts"""
end_ts = datetime.now()
self.stats.update({
'end_ts': end_ts,
'duration': (end_ts - self.stats['start_ts']).seconds,
})
if SHOW_PROGRESS:
# protect from double termination
#if p is None or not hasattr(p, 'kill'):
# return
if self.p is not None:
self.p.terminate()
self.p = None
sys.stdout.write('\r{}{}\r'.format((' ' * TERM_WIDTH), ANSI['reset'])) # clear whole terminal line
sys.stdout.flush()
def progress_bar(seconds: int, prefix: str='') -> None:
"""show timer in the form of progress bar, with percentage and seconds remaining""" """show timer in the form of progress bar, with percentage and seconds remaining"""
chunk = '' if sys.stdout.encoding == 'UTF-8' else '#' chunk = '' if sys.stdout.encoding == 'UTF-8' else '#'
chunks = TERM_WIDTH - len(prefix) - 20 # number of progress chunks to show (aka max bar width) chunks = TERM_WIDTH - len(prefix) - 20 # number of progress chunks to show (aka max bar width)
@ -477,41 +523,8 @@ def progress_bar(seconds, prefix):
print() print()
pass pass
class TimedProgress:
"""Show a progress bar and measure elapsed time until .end() is called"""
def __init__(self, seconds, prefix=''): def download_url(url: str, timeout: int=TIMEOUT) -> str:
if SHOW_PROGRESS:
self.p = Process(target=progress_bar, args=(seconds, prefix))
self.p.start()
self.stats = {
'start_ts': datetime.now(),
'end_ts': None,
'duration': None,
}
def end(self):
"""immediately end progress, clear the progressbar line, and save end_ts"""
end_ts = datetime.now()
self.stats.update({
'end_ts': end_ts,
'duration': (end_ts - self.stats['start_ts']).seconds,
})
if SHOW_PROGRESS:
# protect from double termination
#if p is None or not hasattr(p, 'kill'):
# return
if self.p is not None:
self.p.terminate()
self.p = None
sys.stdout.write('\r{}{}\r'.format((' ' * TERM_WIDTH), ANSI['reset'])) # clear whole terminal line
sys.stdout.flush()
def download_url(url, timeout=TIMEOUT):
"""Download the contents of a remote url and return the text""" """Download the contents of a remote url and return the text"""
req = Request(url, headers={'User-Agent': WGET_USER_AGENT}) req = Request(url, headers={'User-Agent': WGET_USER_AGENT})
@ -526,7 +539,7 @@ def download_url(url, timeout=TIMEOUT):
encoding = resp.headers.get_content_charset() or 'utf-8' encoding = resp.headers.get_content_charset() or 'utf-8'
return resp.read().decode(encoding) return resp.read().decode(encoding)
def chmod_file(path, cwd='.', permissions=OUTPUT_PERMISSIONS, timeout=30): def chmod_file(path: str, cwd: str='.', permissions: str=OUTPUT_PERMISSIONS, timeout: int=30) -> None:
"""chmod -R <permissions> <cwd>/<path>""" """chmod -R <permissions> <cwd>/<path>"""
if not os.path.exists(os.path.join(cwd, path)): if not os.path.exists(os.path.join(cwd, path)):
@ -538,7 +551,7 @@ def chmod_file(path, cwd='.', permissions=OUTPUT_PERMISSIONS, timeout=30):
raise Exception('Failed to chmod {}/{}'.format(cwd, path)) raise Exception('Failed to chmod {}/{}'.format(cwd, path))
def chrome_args(**options): def chrome_args(**options) -> List[str]:
"""helper to build up a chrome shell command with arguments""" """helper to build up a chrome shell command with arguments"""
options = {**CHROME_OPTIONS, **options} options = {**CHROME_OPTIONS, **options}