diff --git a/archivebox/archive_methods.py b/archivebox/archive_methods.py index b3915e2f..f0223bbb 100644 --- a/archivebox/archive_methods.py +++ b/archivebox/archive_methods.py @@ -1,5 +1,7 @@ import os +import json +from typing import Union, Dict, List, Tuple, NamedTuple from collections import defaultdict from datetime import datetime @@ -40,13 +42,15 @@ from util import ( without_query, without_fragment, fetch_page_title, + read_js_script, is_static_file, TimedProgress, chmod_file, wget_output_path, chrome_args, check_link_structure, - run, PIPE, DEVNULL + run, PIPE, DEVNULL, + Link, ) from logs import ( log_link_archiving_started, @@ -55,15 +59,22 @@ from logs import ( log_archive_method_finished, ) - - class ArchiveError(Exception): def __init__(self, message, hints=None): super().__init__(message) self.hints = hints +class ArchiveResult(NamedTuple): + cmd: List[str] + pwd: str + output: Union[str, Exception, None] + status: str + start_ts: datetime + end_ts: datetime + duration: int -def archive_link(link_dir, link): + +def archive_link(link_dir: str, link: Link, page=None) -> Link: """download the DOM, PDF, and a screenshot into a folder named after the link's timestamp""" ARCHIVE_METHODS = ( @@ -95,10 +106,11 @@ def archive_link(link_dir, link): log_archive_method_started(method_name) result = method_function(link_dir, link) - link['history'][method_name].append(result) - stats[result['status']] += 1 - log_archive_method_finished(result) + link['history'][method_name].append(result._asdict()) + + stats[result.status] += 1 + log_archive_method_finished(result._asdict()) else: stats['skipped'] += 1 @@ -117,7 +129,7 @@ def archive_link(link_dir, link): ### Archive Method Functions -def should_fetch_title(link_dir, link): +def should_fetch_title(link_dir: str, link: Link) -> bool: # if link already has valid title, skip it if link['title'] and not link['title'].lower().startswith('http'): return False @@ -127,7 +139,7 @@ def should_fetch_title(link_dir, link): return FETCH_TITLE -def fetch_title(link_dir, link, timeout=TIMEOUT): +def fetch_title(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult: """try to guess the page's title from its content""" output = None @@ -150,22 +162,22 @@ def fetch_title(link_dir, link, timeout=TIMEOUT): finally: timer.end() - return { - 'cmd': cmd, - 'pwd': link_dir, - 'output': output, - 'status': status, + return ArchiveResult( + cmd=cmd, + pwd=link_dir, + output=output, + status=status, **timer.stats, - } + ) -def should_fetch_favicon(link_dir, link): +def should_fetch_favicon(link_dir: str, link: Link) -> bool: if os.path.exists(os.path.join(link_dir, 'favicon.ico')): return False return FETCH_FAVICON -def fetch_favicon(link_dir, link, timeout=TIMEOUT): +def fetch_favicon(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult: """download site favicon from google's favicon api""" output = 'favicon.ico' @@ -188,15 +200,15 @@ def fetch_favicon(link_dir, link, timeout=TIMEOUT): finally: timer.end() - return { - 'cmd': cmd, - 'pwd': link_dir, - 'output': output, - 'status': status, + return ArchiveResult( + cmd=cmd, + pwd=link_dir, + output=output, + status=status, **timer.stats, - } + ) -def should_fetch_wget(link_dir, link): +def should_fetch_wget(link_dir: str, link: Link) -> bool: output_path = wget_output_path(link) if output_path and os.path.exists(os.path.join(link_dir, output_path)): return False @@ -204,7 +216,7 @@ def should_fetch_wget(link_dir, link): return FETCH_WGET -def fetch_wget(link_dir, link, timeout=TIMEOUT): +def fetch_wget(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult: """download full site using wget""" if FETCH_WARC: @@ -274,15 +286,15 @@ def fetch_wget(link_dir, link, timeout=TIMEOUT): finally: timer.end() - return { - 'cmd': cmd, - 'pwd': link_dir, - 'output': output, - 'status': status, + return ArchiveResult( + cmd=cmd, + pwd=link_dir, + output=output, + status=status, **timer.stats, - } + ) -def should_fetch_pdf(link_dir, link): +def should_fetch_pdf(link_dir: str, link: Link) -> bool: if is_static_file(link['url']): return False @@ -292,7 +304,7 @@ def should_fetch_pdf(link_dir, link): return FETCH_PDF -def fetch_pdf(link_dir, link, timeout=TIMEOUT): +def fetch_pdf(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult: """print PDF of site to file using chrome --headless""" output = 'output.pdf' @@ -317,15 +329,15 @@ def fetch_pdf(link_dir, link, timeout=TIMEOUT): finally: timer.end() - return { - 'cmd': cmd, - 'pwd': link_dir, - 'output': output, - 'status': status, + return ArchiveResult( + cmd=cmd, + pwd=link_dir, + output=output, + status=status, **timer.stats, - } + ) -def should_fetch_screenshot(link_dir, link): +def should_fetch_screenshot(link_dir: str, link: Link) -> bool: if is_static_file(link['url']): return False @@ -334,7 +346,7 @@ def should_fetch_screenshot(link_dir, link): return FETCH_SCREENSHOT -def fetch_screenshot(link_dir, link, timeout=TIMEOUT): +def fetch_screenshot(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult: """take screenshot of site using chrome --headless""" output = 'screenshot.png' @@ -359,15 +371,15 @@ def fetch_screenshot(link_dir, link, timeout=TIMEOUT): finally: timer.end() - return { - 'cmd': cmd, - 'pwd': link_dir, - 'output': output, - 'status': status, + return ArchiveResult( + cmd=cmd, + pwd=link_dir, + output=output, + status=status, **timer.stats, - } + ) -def should_fetch_dom(link_dir, link): +def should_fetch_dom(link_dir: str, link: Link) -> bool: if is_static_file(link['url']): return False @@ -376,7 +388,7 @@ def should_fetch_dom(link_dir, link): return FETCH_DOM -def fetch_dom(link_dir, link, timeout=TIMEOUT): +def fetch_dom(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult: """print HTML of site to file using chrome --dump-html""" output = 'output.html' @@ -403,15 +415,15 @@ def fetch_dom(link_dir, link, timeout=TIMEOUT): finally: timer.end() - return { - 'cmd': cmd, - 'pwd': link_dir, - 'output': output, - 'status': status, + return ArchiveResult( + cmd=cmd, + pwd=link_dir, + output=output, + status=status, **timer.stats, - } + ) -def should_fetch_git(link_dir, link): +def should_fetch_git(link_dir: str, link: Link) -> bool: if is_static_file(link['url']): return False @@ -428,7 +440,7 @@ def should_fetch_git(link_dir, link): return FETCH_GIT -def fetch_git(link_dir, link, timeout=TIMEOUT): +def fetch_git(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult: """download full site using git""" output = 'git' @@ -460,16 +472,16 @@ def fetch_git(link_dir, link, timeout=TIMEOUT): finally: timer.end() - return { - 'cmd': cmd, - 'pwd': link_dir, - 'output': output, - 'status': status, + return ArchiveResult( + cmd=cmd, + pwd=link_dir, + output=output, + status=status, **timer.stats, - } + ) -def should_fetch_media(link_dir, link): +def should_fetch_media(link_dir: str, link: Link) -> bool: if is_static_file(link['url']): return False @@ -478,7 +490,7 @@ def should_fetch_media(link_dir, link): return FETCH_MEDIA -def fetch_media(link_dir, link, timeout=MEDIA_TIMEOUT): +def fetch_media(link_dir: str, link: Link, timeout: int=MEDIA_TIMEOUT) -> ArchiveResult: """Download playlists or individual video, audio, and subtitles using youtube-dl""" output = 'media' @@ -531,16 +543,16 @@ def fetch_media(link_dir, link, timeout=MEDIA_TIMEOUT): finally: timer.end() - return { - 'cmd': cmd, - 'pwd': link_dir, - 'output': output, - 'status': status, + return ArchiveResult( + cmd=cmd, + pwd=link_dir, + output=output, + status=status, **timer.stats, - } + ) -def should_fetch_archive_dot_org(link_dir, link): +def should_fetch_archive_dot_org(link_dir: str, link: Link) -> bool: if is_static_file(link['url']): return False @@ -550,7 +562,7 @@ def should_fetch_archive_dot_org(link_dir, link): return SUBMIT_ARCHIVE_DOT_ORG -def archive_dot_org(link_dir, link, timeout=TIMEOUT): +def archive_dot_org(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult: """submit site to archive.org for archiving via their service, save returned archive url""" output = 'archive.org.txt' @@ -596,17 +608,17 @@ def archive_dot_org(link_dir, link, timeout=TIMEOUT): chmod_file('archive.org.txt', cwd=link_dir) output = archive_org_url - return { - 'cmd': cmd, - 'pwd': link_dir, - 'output': output, - 'status': status, + return ArchiveResult( + cmd=cmd, + pwd=link_dir, + output=output, + status=status, **timer.stats, - } + ) -def parse_archive_dot_org_response(response): +def parse_archive_dot_org_response(response: bytes) -> Tuple[List[str], List[str]]: # Parse archive.org response headers - headers = defaultdict(list) + headers: Dict[str, List[str]] = defaultdict(list) # lowercase all the header names and store in dict for header in response.splitlines(): diff --git a/archivebox/index.py b/archivebox/index.py index 3f4ada3f..503b82ad 100644 --- a/archivebox/index.py +++ b/archivebox/index.py @@ -3,6 +3,8 @@ import json from datetime import datetime from string import Template +from typing import List, Tuple + try: from distutils.dir_util import copy_tree except ImportError: @@ -23,6 +25,7 @@ from util import ( check_links_structure, wget_output_path, latest_output, + Link, ) from parse import parse_links from links import validate_links @@ -39,7 +42,7 @@ TITLE_LOADING_MSG = 'Not yet archived...' ### Homepage index for all the links -def write_links_index(out_dir, links, finished=False): +def write_links_index(out_dir: str, links: List[Link], finished: bool=False) -> None: """create index.html file for a given list of links""" log_indexing_process_started() @@ -53,15 +56,15 @@ def write_links_index(out_dir, links, finished=False): write_html_links_index(out_dir, links, finished=finished) log_indexing_finished(out_dir, 'index.html') -def load_links_index(out_dir=OUTPUT_DIR, import_path=None): +def load_links_index(out_dir: str=OUTPUT_DIR, import_path: str=None) -> Tuple[List[Link], List[Link]]: """parse and load existing index with any new links from import_path merged in""" - existing_links = [] + existing_links: List[Link] = [] if out_dir: existing_links = parse_json_links_index(out_dir) check_links_structure(existing_links) - new_links = [] + new_links: List[Link] = [] if import_path: # parse and validate the import file log_parsing_started(import_path) @@ -79,7 +82,7 @@ def load_links_index(out_dir=OUTPUT_DIR, import_path=None): return all_links, new_links -def write_json_links_index(out_dir, links): +def write_json_links_index(out_dir: str, links: List[Link]) -> None: """write the json link index to a given path""" check_links_structure(links) @@ -100,7 +103,7 @@ def write_json_links_index(out_dir, links): chmod_file(path) -def parse_json_links_index(out_dir=OUTPUT_DIR): +def parse_json_links_index(out_dir: str=OUTPUT_DIR) -> List[Link]: """parse a archive index json file and return the list of links""" index_path = os.path.join(out_dir, 'index.json') if os.path.exists(index_path): @@ -111,7 +114,7 @@ def parse_json_links_index(out_dir=OUTPUT_DIR): return [] -def write_html_links_index(out_dir, links, finished=False): +def write_html_links_index(out_dir: str, links: List[Link], finished: bool=False) -> None: """write the html link index to a given path""" check_links_structure(links) @@ -166,7 +169,7 @@ def write_html_links_index(out_dir, links, finished=False): chmod_file(path) -def patch_links_index(link, out_dir=OUTPUT_DIR): +def patch_links_index(link: Link, out_dir: str=OUTPUT_DIR) -> None: """hack to in-place update one row's info in the generated index html""" title = link['title'] or latest_output(link)['title'] @@ -200,12 +203,12 @@ def patch_links_index(link, out_dir=OUTPUT_DIR): ### Individual link index -def write_link_index(out_dir, link): +def write_link_index(out_dir: str, link: Link) -> None: link['updated'] = str(datetime.now().timestamp()) write_json_link_index(out_dir, link) write_html_link_index(out_dir, link) -def write_json_link_index(out_dir, link): +def write_json_link_index(out_dir: str, link: Link) -> None: """write a json file with some info about the link""" check_link_structure(link) @@ -216,7 +219,7 @@ def write_json_link_index(out_dir, link): chmod_file(path) -def parse_json_link_index(out_dir): +def parse_json_link_index(out_dir: str) -> dict: """load the json link index from a given directory""" existing_index = os.path.join(out_dir, 'index.json') if os.path.exists(existing_index): @@ -226,7 +229,7 @@ def parse_json_link_index(out_dir): return link_json return {} -def load_json_link_index(out_dir, link): +def load_json_link_index(out_dir: str, link: Link) -> Link: """check for an existing link archive in the given directory, and load+merge it into the given link dict """ @@ -241,7 +244,7 @@ def load_json_link_index(out_dir, link): check_link_structure(link) return link -def write_html_link_index(out_dir, link): +def write_html_link_index(out_dir: str, link: Link) -> None: check_link_structure(link) with open(os.path.join(TEMPLATES_DIR, 'link_index.html'), 'r', encoding='utf-8') as f: link_html = f.read() diff --git a/archivebox/util.py b/archivebox/util.py index cec23035..1a8a445e 100644 --- a/archivebox/util.py +++ b/archivebox/util.py @@ -3,6 +3,8 @@ import re import sys import time +from typing import List, Dict, Any, Optional, Union + from urllib.request import Request, urlopen from urllib.parse import urlparse, quote from decimal import Decimal @@ -30,6 +32,7 @@ from config import ( CHECK_SSL_VALIDITY, WGET_USER_AGENT, CHROME_OPTIONS, + PYTHON_PATH, ) from logs import pretty_path @@ -86,9 +89,11 @@ STATICFILE_EXTENSIONS = { # html, htm, shtml, xhtml, xml, aspx, php, cgi } +Link = Dict[str, Any] + ### Checks & Tests -def check_link_structure(link): +def check_link_structure(link: Link) -> None: """basic sanity check invariants to make sure the data is valid""" assert isinstance(link, dict) assert isinstance(link.get('url'), str) @@ -100,13 +105,13 @@ def check_link_structure(link): assert isinstance(key, str) assert isinstance(val, list), 'history must be a Dict[str, List], got: {}'.format(link['history']) -def check_links_structure(links): +def check_links_structure(links: List[Link]) -> None: """basic sanity check invariants to make sure the data is valid""" assert isinstance(links, list) if links: check_link_structure(links[0]) -def check_url_parsing_invariants(): +def check_url_parsing_invariants() -> None: """Check that plain text regex URL parsing works as expected""" # this is last-line-of-defense to make sure the URL_REGEX isn't @@ -137,7 +142,7 @@ def check_url_parsing_invariants(): ### Random Helpers -def save_stdin_source(raw_text): +def save_stdin_source(raw_text: str) -> str: if not os.path.exists(SOURCES_DIR): os.makedirs(SOURCES_DIR) @@ -150,7 +155,7 @@ def save_stdin_source(raw_text): return source_path -def save_remote_source(url, timeout=TIMEOUT): +def save_remote_source(url: str, timeout: int=TIMEOUT) -> str: """download a given url's content into output/sources/domain-.txt""" if not os.path.exists(SOURCES_DIR): @@ -187,7 +192,7 @@ def save_remote_source(url, timeout=TIMEOUT): return source_path -def fetch_page_title(url, timeout=10, progress=SHOW_PROGRESS): +def fetch_page_title(url: str, timeout: int=10, progress: bool=SHOW_PROGRESS) -> Optional[str]: """Attempt to guess a page's title by downloading the html""" if not FETCH_TITLE: @@ -209,7 +214,7 @@ def fetch_page_title(url, timeout=10, progress=SHOW_PROGRESS): # )) return None -def wget_output_path(link): +def wget_output_path(link: Link) -> Optional[str]: """calculate the path to the wgetted .html file, since wget may adjust some paths to be different than the base_url path. @@ -278,9 +283,15 @@ def wget_output_path(link): return None +def read_js_script(script_name: str) -> str: + script_path = os.path.join(PYTHON_PATH, 'scripts', script_name) + + with open(script_path, 'r') as f: + return f.read().split('// INFO BELOW HERE')[0].strip() + ### String Manipulation & Logging Helpers -def str_between(string, start, end=None): +def str_between(string: str, start: str, end: str=None) -> str: """(12345, , ) -> 12345""" content = string.split(start, 1)[-1] @@ -292,7 +303,7 @@ def str_between(string, start, end=None): ### Link Helpers -def merge_links(a, b): +def merge_links(a: Link, b: Link) -> Link: """deterministially merge two links, favoring longer field values over shorter, and "cleaner" values over worse ones. """ @@ -310,7 +321,7 @@ def merge_links(a, b): 'sources': list(set(a.get('sources', []) + b.get('sources', []))), } -def is_static_file(url): +def is_static_file(url: str) -> bool: """Certain URLs just point to a single static file, and don't need to be re-archived in many formats """ @@ -318,7 +329,7 @@ def is_static_file(url): # TODO: the proper way is with MIME type detection, not using extension return extension(url) in STATICFILE_EXTENSIONS -def derived_link_info(link): +def derived_link_info(link: Link) -> dict: """extend link info with the archive urls and other derived data""" url = link['url'] @@ -373,7 +384,7 @@ def derived_link_info(link): return extended_info -def latest_output(link, status=None): +def latest_output(link: Link, status: str=None) -> Dict[str, Optional[str]]: """get the latest output that each archive method produced for link""" latest = { @@ -440,7 +451,42 @@ def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, return CompletedProcess(process.args, retcode, stdout, stderr) -def progress_bar(seconds, prefix): +class TimedProgress: + """Show a progress bar and measure elapsed time until .end() is called""" + + def __init__(self, seconds, prefix=''): + if SHOW_PROGRESS: + self.p = Process(target=progress_bar, args=(seconds, prefix)) + self.p.start() + + self.stats = { + 'start_ts': datetime.now(), + 'end_ts': None, + 'duration': None, + } + + def end(self): + """immediately end progress, clear the progressbar line, and save end_ts""" + + end_ts = datetime.now() + self.stats.update({ + 'end_ts': end_ts, + 'duration': (end_ts - self.stats['start_ts']).seconds, + }) + + if SHOW_PROGRESS: + # protect from double termination + #if p is None or not hasattr(p, 'kill'): + # return + if self.p is not None: + self.p.terminate() + self.p = None + + sys.stdout.write('\r{}{}\r'.format((' ' * TERM_WIDTH), ANSI['reset'])) # clear whole terminal line + sys.stdout.flush() + + +def progress_bar(seconds: int, prefix: str='') -> None: """show timer in the form of progress bar, with percentage and seconds remaining""" chunk = '█' if sys.stdout.encoding == 'UTF-8' else '#' chunks = TERM_WIDTH - len(prefix) - 20 # number of progress chunks to show (aka max bar width) @@ -477,41 +523,8 @@ def progress_bar(seconds, prefix): print() pass -class TimedProgress: - """Show a progress bar and measure elapsed time until .end() is called""" - def __init__(self, seconds, prefix=''): - if SHOW_PROGRESS: - self.p = Process(target=progress_bar, args=(seconds, prefix)) - self.p.start() - - self.stats = { - 'start_ts': datetime.now(), - 'end_ts': None, - 'duration': None, - } - - def end(self): - """immediately end progress, clear the progressbar line, and save end_ts""" - - end_ts = datetime.now() - self.stats.update({ - 'end_ts': end_ts, - 'duration': (end_ts - self.stats['start_ts']).seconds, - }) - - if SHOW_PROGRESS: - # protect from double termination - #if p is None or not hasattr(p, 'kill'): - # return - if self.p is not None: - self.p.terminate() - self.p = None - - sys.stdout.write('\r{}{}\r'.format((' ' * TERM_WIDTH), ANSI['reset'])) # clear whole terminal line - sys.stdout.flush() - -def download_url(url, timeout=TIMEOUT): +def download_url(url: str, timeout: int=TIMEOUT) -> str: """Download the contents of a remote url and return the text""" req = Request(url, headers={'User-Agent': WGET_USER_AGENT}) @@ -526,7 +539,7 @@ def download_url(url, timeout=TIMEOUT): encoding = resp.headers.get_content_charset() or 'utf-8' return resp.read().decode(encoding) -def chmod_file(path, cwd='.', permissions=OUTPUT_PERMISSIONS, timeout=30): +def chmod_file(path: str, cwd: str='.', permissions: str=OUTPUT_PERMISSIONS, timeout: int=30) -> None: """chmod -R /""" if not os.path.exists(os.path.join(cwd, path)): @@ -538,7 +551,7 @@ def chmod_file(path, cwd='.', permissions=OUTPUT_PERMISSIONS, timeout=30): raise Exception('Failed to chmod {}/{}'.format(cwd, path)) -def chrome_args(**options): +def chrome_args(**options) -> List[str]: """helper to build up a chrome shell command with arguments""" options = {**CHROME_OPTIONS, **options}