switch to strict type hints with NamedTuples instead of dicts

This commit is contained in:
Nick Sweeting 2019-03-26 05:33:34 -04:00
parent 0a44779b21
commit 76abc58135
8 changed files with 201 additions and 98 deletions

View file

@ -12,6 +12,9 @@ Usage & Documentation:
import os
import sys
from typing import List
from schema import Link
from links import links_after_timestamp
from index import write_links_index, load_links_index
from archive_methods import archive_link
@ -50,7 +53,7 @@ def print_help():
print(" ./archive 15109948213.123\n")
def main(*args):
def main(*args) -> List[Link]:
if set(args).intersection(('-h', '--help', 'help')) or len(args) > 2:
print_help()
raise SystemExit(0)
@ -95,10 +98,10 @@ def main(*args):
import_path = save_remote_source(import_path)
### Run the main archive update process
update_archive_data(import_path=import_path, resume=resume)
return update_archive_data(import_path=import_path, resume=resume)
def update_archive_data(import_path=None, resume=None):
def update_archive_data(import_path: str=None, resume: float=None) -> List[Link]:
"""The main ArchiveBox entrancepoint. Everything starts here."""
# Step 1: Load list of links from the existing index
@ -111,14 +114,14 @@ def update_archive_data(import_path=None, resume=None):
# Step 3: Run the archive methods for each link
links = new_links if ONLY_NEW else all_links
log_archiving_started(len(links), resume)
idx, link = 0, 0
idx, link = 0, {'timestamp': 0}
try:
for idx, link in enumerate(links_after_timestamp(links, resume)):
link_dir = os.path.join(ARCHIVE_DIR, link['timestamp'])
archive_link(link_dir, link)
except KeyboardInterrupt:
log_archiving_paused(len(links), idx, link and link['timestamp'])
log_archiving_paused(len(links), idx, link['timestamp'])
raise SystemExit(0)
except:
@ -130,7 +133,7 @@ def update_archive_data(import_path=None, resume=None):
# Step 4: Re-write links index with updated titles, icons, and resources
all_links, _ = load_links_index(out_dir=OUTPUT_DIR)
write_links_index(out_dir=OUTPUT_DIR, links=all_links, finished=True)
return all_links
if __name__ == '__main__':
main(*sys.argv)

View file

@ -1,10 +1,10 @@
import os
import json
from typing import Union, Dict, List, Tuple, NamedTuple
from typing import Dict, List, Tuple
from collections import defaultdict
from datetime import datetime
from schema import Link, ArchiveResult, ArchiveError
from index import (
write_link_index,
patch_links_index,
@ -102,7 +102,7 @@ def archive_link(link_dir: str, link: Link, page=None) -> Link:
link['history'][method_name].append(result._asdict())
stats[result.status] += 1
log_archive_method_finished(result._asdict())
log_archive_method_finished(result)
else:
stats['skipped'] += 1

View file

@ -11,6 +11,7 @@ except ImportError:
print('[X] Missing "distutils" python package. To install it, run:')
print(' pip install distutils')
from schema import Link, ArchiveIndex
from config import (
OUTPUT_DIR,
TEMPLATES_DIR,
@ -25,7 +26,7 @@ from util import (
check_links_structure,
wget_output_path,
latest_output,
Link,
ExtendedEncoder,
)
from parse import parse_links
from links import validate_links
@ -56,6 +57,7 @@ def write_links_index(out_dir: str, links: List[Link], finished: bool=False) ->
write_html_links_index(out_dir, links, finished=finished)
log_indexing_finished(out_dir, 'index.html')
def load_links_index(out_dir: str=OUTPUT_DIR, import_path: str=None) -> Tuple[List[Link], List[Link]]:
"""parse and load existing index with any new links from import_path merged in"""
@ -82,6 +84,7 @@ def load_links_index(out_dir: str=OUTPUT_DIR, import_path: str=None) -> Tuple[Li
return all_links, new_links
def write_json_links_index(out_dir: str, links: List[Link]) -> None:
"""write the json link index to a given path"""
@ -89,20 +92,24 @@ def write_json_links_index(out_dir: str, links: List[Link]) -> None:
path = os.path.join(out_dir, 'index.json')
index_json = {
'info': 'ArchiveBox Index',
'help': 'https://github.com/pirate/ArchiveBox',
'version': GIT_SHA,
'num_links': len(links),
'updated': str(datetime.now().timestamp()),
'links': links,
}
index_json = ArchiveIndex(
info='ArchiveBox Index',
source='https://github.com/pirate/ArchiveBox',
docs='https://github.com/pirate/ArchiveBox/wiki',
version=GIT_SHA,
num_links=len(links),
updated=str(datetime.now().timestamp()),
links=links,
)
assert isinstance(index_json._asdict(), dict)
with open(path, 'w', encoding='utf-8') as f:
json.dump(index_json, f, indent=4, default=str)
json.dump(index_json._asdict(), f, indent=4, cls=ExtendedEncoder)
chmod_file(path)
def parse_json_links_index(out_dir: str=OUTPUT_DIR) -> List[Link]:
"""parse a archive index json file and return the list of links"""
index_path = os.path.join(out_dir, 'index.json')
@ -114,6 +121,7 @@ def parse_json_links_index(out_dir: str=OUTPUT_DIR) -> List[Link]:
return []
def write_html_links_index(out_dir: str, links: List[Link], finished: bool=False) -> None:
"""write the html link index to a given path"""
@ -208,6 +216,7 @@ def write_link_index(out_dir: str, link: Link) -> None:
write_json_link_index(out_dir, link)
write_html_link_index(out_dir, link)
def write_json_link_index(out_dir: str, link: Link) -> None:
"""write a json file with some info about the link"""
@ -215,10 +224,11 @@ def write_json_link_index(out_dir: str, link: Link) -> None:
path = os.path.join(out_dir, 'index.json')
with open(path, 'w', encoding='utf-8') as f:
json.dump(link, f, indent=4, default=str)
json.dump(link, f, indent=4, cls=ExtendedEncoder)
chmod_file(path)
def parse_json_link_index(out_dir: str) -> dict:
"""load the json link index from a given directory"""
existing_index = os.path.join(out_dir, 'index.json')
@ -229,6 +239,7 @@ def parse_json_link_index(out_dir: str) -> dict:
return link_json
return {}
def load_json_link_index(out_dir: str, link: Link) -> Link:
"""check for an existing link archive in the given directory,
and load+merge it into the given link dict
@ -244,6 +255,7 @@ def load_json_link_index(out_dir: str, link: Link) -> Link:
check_link_structure(link)
return link
def write_html_link_index(out_dir: str, link: Link) -> None:
check_link_structure(link)
with open(os.path.join(TEMPLATES_DIR, 'link_index.html'), 'r', encoding='utf-8') as f:

View file

@ -19,17 +19,19 @@ Link {
}
"""
from html import unescape
from typing import List, Iterable
from collections import OrderedDict
from schema import Link
from util import (
merge_links,
check_link_structure,
check_links_structure,
htmldecode,
)
def validate_links(links):
def validate_links(links: Iterable[Link]) -> List[Link]:
check_links_structure(links)
links = archivable_links(links) # remove chrome://, about:, mailto: etc.
links = uniquefied_links(links) # merge/dedupe duplicate timestamps & urls
@ -40,13 +42,13 @@ def validate_links(links):
raise SystemExit(1)
for link in links:
link['title'] = unescape(link['title'].strip()) if link['title'] else None
link['title'] = htmldecode(link['title'].strip()) if link['title'] else None
check_link_structure(link)
return list(links)
def archivable_links(links):
def archivable_links(links: Iterable[Link]) -> Iterable[Link]:
"""remove chrome://, about:// or other schemed links that cant be archived"""
return (
link
@ -55,12 +57,12 @@ def archivable_links(links):
)
def uniquefied_links(sorted_links):
def uniquefied_links(sorted_links: Iterable[Link]) -> Iterable[Link]:
"""
ensures that all non-duplicate links have monotonically increasing timestamps
"""
unique_urls = OrderedDict()
unique_urls: OrderedDict[str, Link] = OrderedDict()
lower = lambda url: url.lower().strip()
without_www = lambda url: url.replace('://www.', '://', 1)
@ -73,7 +75,7 @@ def uniquefied_links(sorted_links):
link = merge_links(unique_urls[fuzzy_url], link)
unique_urls[fuzzy_url] = link
unique_timestamps = OrderedDict()
unique_timestamps: OrderedDict[str, Link] = OrderedDict()
for link in unique_urls.values():
link['timestamp'] = lowest_uniq_timestamp(unique_timestamps, link['timestamp'])
unique_timestamps[link['timestamp']] = link
@ -81,12 +83,12 @@ def uniquefied_links(sorted_links):
return unique_timestamps.values()
def sorted_links(links):
def sorted_links(links: Iterable[Link]) -> Iterable[Link]:
sort_func = lambda link: (link['timestamp'].split('.', 1)[0], link['url'])
return sorted(links, key=sort_func, reverse=True)
def links_after_timestamp(links, timestamp=None):
def links_after_timestamp(links: Iterable[Link], timestamp: str=None) -> Iterable[Link]:
if not timestamp:
yield from links
return
@ -99,7 +101,7 @@ def links_after_timestamp(links, timestamp=None):
print('Resume value and all timestamp values must be valid numbers.')
def lowest_uniq_timestamp(used_timestamps, timestamp):
def lowest_uniq_timestamp(used_timestamps: OrderedDict, timestamp: str) -> str:
"""resolve duplicate timestamps by appending a decimal 1234, 1234 -> 1234.1, 1234.2"""
timestamp = timestamp.split('.')[0]

View file

@ -1,43 +1,44 @@
import sys
from datetime import datetime
from schema import Link, ArchiveResult, RuntimeStats
from config import ANSI, REPO_DIR, OUTPUT_DIR
# globals are bad, mmkay
_LAST_RUN_STATS = {
'skipped': 0,
'succeeded': 0,
'failed': 0,
_LAST_RUN_STATS = RuntimeStats(
skipped=0,
succeeded=0,
failed=0,
'parsing_start_ts': 0,
'parsing_end_ts': 0,
parse_start_ts=0,
parse_end_ts=0,
'indexing_start_ts': 0,
'indexing_end_ts': 0,
index_start_ts=0,
index_end_ts=0,
'archiving_start_ts': 0,
'archiving_end_ts': 0,
archiving_start_ts=0,
archiving_end_ts=0,
)
'links': {},
}
def pretty_path(path):
def pretty_path(path: str) -> str:
"""convert paths like .../ArchiveBox/archivebox/../output/abc into output/abc"""
return path.replace(REPO_DIR + '/', '')
### Parsing Stage
def log_parsing_started(source_file):
def log_parsing_started(source_file: str):
start_ts = datetime.now()
_LAST_RUN_STATS['parse_start_ts'] = start_ts
_LAST_RUN_STATS.parse_start_ts = start_ts
print('{green}[*] [{}] Parsing new links from output/sources/{}...{reset}'.format(
start_ts.strftime('%Y-%m-%d %H:%M:%S'),
source_file.rsplit('/', 1)[-1],
**ANSI,
))
def log_parsing_finished(num_new_links, parser_name):
def log_parsing_finished(num_new_links: int, parser_name: str):
end_ts = datetime.now()
_LAST_RUN_STATS.parse_end_ts = end_ts
print(' > Adding {} new links to index (parsed import as {})'.format(
num_new_links,
parser_name,
@ -48,26 +49,26 @@ def log_parsing_finished(num_new_links, parser_name):
def log_indexing_process_started():
start_ts = datetime.now()
_LAST_RUN_STATS['index_start_ts'] = start_ts
_LAST_RUN_STATS.index_start_ts = start_ts
print('{green}[*] [{}] Saving main index files...{reset}'.format(
start_ts.strftime('%Y-%m-%d %H:%M:%S'),
**ANSI,
))
def log_indexing_started(out_dir, out_file):
def log_indexing_started(out_dir: str, out_file: str):
sys.stdout.write(' > {}/{}'.format(pretty_path(out_dir), out_file))
def log_indexing_finished(out_dir, out_file):
def log_indexing_finished(out_dir: str, out_file: str):
end_ts = datetime.now()
_LAST_RUN_STATS['index_end_ts'] = end_ts
_LAST_RUN_STATS.index_end_ts = end_ts
print('\r{}/{}'.format(pretty_path(out_dir), out_file))
### Archiving Stage
def log_archiving_started(num_links, resume):
def log_archiving_started(num_links: int, resume: float):
start_ts = datetime.now()
_LAST_RUN_STATS['start_ts'] = start_ts
_LAST_RUN_STATS.archiving_start_ts = start_ts
if resume:
print('{green}[▶] [{}] Resuming archive updating for {} pages starting from {}...{reset}'.format(
start_ts.strftime('%Y-%m-%d %H:%M:%S'),
@ -82,9 +83,9 @@ def log_archiving_started(num_links, resume):
**ANSI,
))
def log_archiving_paused(num_links, idx, timestamp):
def log_archiving_paused(num_links: int, idx: int, timestamp: str):
end_ts = datetime.now()
_LAST_RUN_STATS['end_ts'] = end_ts
_LAST_RUN_STATS.archiving_end_ts = end_ts
print()
print('\n{lightyellow}[X] [{now}] Downloading paused on link {timestamp} ({idx}/{total}){reset}'.format(
**ANSI,
@ -100,10 +101,10 @@ def log_archiving_paused(num_links, idx, timestamp):
timestamp,
))
def log_archiving_finished(num_links):
def log_archiving_finished(num_links: int):
end_ts = datetime.now()
_LAST_RUN_STATS['end_ts'] = end_ts
seconds = end_ts.timestamp() - _LAST_RUN_STATS['start_ts'].timestamp()
_LAST_RUN_STATS.archiving_end_ts = end_ts
seconds = end_ts.timestamp() - _LAST_RUN_STATS.archiving_start_ts.timestamp()
if seconds > 60:
duration = '{0:.2f} min'.format(seconds / 60, 2)
else:
@ -116,13 +117,13 @@ def log_archiving_finished(num_links):
duration,
ANSI['reset'],
))
print(' - {} links skipped'.format(_LAST_RUN_STATS['skipped']))
print(' - {} links updated'.format(_LAST_RUN_STATS['succeeded']))
print(' - {} links had errors'.format(_LAST_RUN_STATS['failed']))
print(' - {} links skipped'.format(_LAST_RUN_STATS.skipped))
print(' - {} links updated'.format(_LAST_RUN_STATS.succeeded))
print(' - {} links had errors'.format(_LAST_RUN_STATS.failed))
print(' To view your archive, open: {}/index.html'.format(OUTPUT_DIR.replace(REPO_DIR + '/', '')))
def log_link_archiving_started(link_dir, link, is_new):
def log_link_archiving_started(link_dir: str, link: Link, is_new: bool):
# [*] [2019-03-22 13:46:45] "Log Structured Merge Trees - ben stopford"
# http://www.benstopford.com/2015/02/14/log-structured-merge-trees/
# > output/archive/1478739709
@ -140,40 +141,34 @@ def log_link_archiving_started(link_dir, link, is_new):
pretty_path(link_dir),
))
def log_link_archiving_finished(link_dir, link, is_new, stats):
def log_link_archiving_finished(link_dir: str, link: Link, is_new: bool, stats: dict):
total = sum(stats.values())
if stats['failed'] > 0 :
_LAST_RUN_STATS['failed'] += 1
_LAST_RUN_STATS.failed += 1
elif stats['skipped'] == total:
_LAST_RUN_STATS['skipped'] += 1
_LAST_RUN_STATS.skipped += 1
else:
_LAST_RUN_STATS['succeeded'] += 1
_LAST_RUN_STATS.succeeded += 1
def log_archive_method_started(method):
def log_archive_method_started(method: str):
print(' > {}'.format(method))
def log_archive_method_finished(result):
def log_archive_method_finished(result: ArchiveResult):
"""quote the argument with whitespace in a command so the user can
copy-paste the outputted string directly to run the cmd
"""
required_keys = ('cmd', 'pwd', 'output', 'status', 'start_ts', 'end_ts')
assert (
isinstance(result, dict)
and all(key in result for key in required_keys)
and ('output' in result)
), 'Archive method did not return a valid result.'
# Prettify CMD string and make it safe to copy-paste by quoting arguments
quoted_cmd = ' '.join(
'"{}"'.format(arg) if ' ' in arg else arg
for arg in result['cmd']
for arg in result.cmd
)
if result['status'] == 'failed':
if result.status == 'failed':
# Prettify error output hints string and limit to five lines
hints = getattr(result['output'], 'hints', None) or ()
hints = getattr(result.output, 'hints', None) or ()
if hints:
hints = hints if isinstance(hints, (list, tuple)) else hints.split('\n')
hints = (
@ -185,13 +180,13 @@ def log_archive_method_finished(result):
output_lines = [
'{}Failed:{} {}{}'.format(
ANSI['red'],
result['output'].__class__.__name__.replace('ArchiveError', ''),
result['output'],
result.output.__class__.__name__.replace('ArchiveError', ''),
result.output,
ANSI['reset']
),
*hints,
'{}Run to see full output:{}'.format(ANSI['lightred'], ANSI['reset']),
' cd {};'.format(result['pwd']),
*((' cd {};'.format(result.pwd),) if result.pwd else ()),
' {}'.format(quoted_cmd),
]
print('\n'.join(

View file

@ -20,6 +20,7 @@ Link: {
import re
import json
from typing import Tuple, List, IO, Iterable
from datetime import datetime
import xml.etree.ElementTree as etree
@ -29,10 +30,11 @@ from util import (
URL_REGEX,
check_url_parsing_invariants,
TimedProgress,
Link,
)
def parse_links(source_file):
def parse_links(source_file: str) -> Tuple[List[Link], str]:
"""parse a list of URLs with their metadata from an
RSS feed, bookmarks export, or text file
"""
@ -74,7 +76,7 @@ def parse_links(source_file):
### Import Parser Functions
def parse_pocket_html_export(html_file):
def parse_pocket_html_export(html_file: IO[str]) -> Iterable[Link]:
"""Parse Pocket-format bookmarks export files (produced by getpocket.com/export/)"""
html_file.seek(0)
@ -98,7 +100,7 @@ def parse_pocket_html_export(html_file):
}
def parse_json_export(json_file):
def parse_json_export(json_file: IO[str]) -> Iterable[Link]:
"""Parse JSON-format bookmarks export files (produced by pinboard.in/export/, or wallabag)"""
json_file.seek(0)
@ -150,7 +152,7 @@ def parse_json_export(json_file):
}
def parse_rss_export(rss_file):
def parse_rss_export(rss_file: IO[str]) -> Iterable[Link]:
"""Parse RSS XML-format files into links"""
rss_file.seek(0)
@ -187,7 +189,7 @@ def parse_rss_export(rss_file):
}
def parse_shaarli_rss_export(rss_file):
def parse_shaarli_rss_export(rss_file: IO[str]) -> Iterable[Link]:
"""Parse Shaarli-specific RSS XML-format files into links"""
rss_file.seek(0)
@ -224,7 +226,7 @@ def parse_shaarli_rss_export(rss_file):
}
def parse_netscape_html_export(html_file):
def parse_netscape_html_export(html_file: IO[str]) -> Iterable[Link]:
"""Parse netscape-format bookmarks export files (produced by all browsers)"""
html_file.seek(0)
@ -247,7 +249,7 @@ def parse_netscape_html_export(html_file):
}
def parse_pinboard_rss_export(rss_file):
def parse_pinboard_rss_export(rss_file: IO[str]) -> Iterable[Link]:
"""Parse Pinboard RSS feed files into links"""
rss_file.seek(0)
@ -278,7 +280,7 @@ def parse_pinboard_rss_export(rss_file):
}
def parse_medium_rss_export(rss_file):
def parse_medium_rss_export(rss_file: IO[str]) -> Iterable[Link]:
"""Parse Medium RSS feed files into links"""
rss_file.seek(0)
@ -299,7 +301,7 @@ def parse_medium_rss_export(rss_file):
}
def parse_plain_text_export(text_file):
def parse_plain_text_export(text_file: IO[str]) -> Iterable[Link]:
"""Parse raw links from each line in a text file"""
text_file.seek(0)

55
archivebox/schema.py Normal file
View file

@ -0,0 +1,55 @@
from datetime import datetime
from typing import List, Dict, Any, Optional, Union, NamedTuple
from recordclass import RecordClass
Link = Dict[str, Any]
class ArchiveIndex(NamedTuple):
info: str
version: str
source: str
docs: str
num_links: int
updated: str
links: List[Link]
class ArchiveResult(NamedTuple):
cmd: List[str]
pwd: Optional[str]
cmd_version: Optional[str]
output: Union[str, Exception, None]
status: str
start_ts: datetime
end_ts: datetime
duration: int
class ArchiveError(Exception):
def __init__(self, message, hints=None):
super().__init__(message)
self.hints = hints
class LinkDict(NamedTuple):
timestamp: str
url: str
title: Optional[str]
tags: str
sources: List[str]
history: Dict[str, ArchiveResult]
class RuntimeStats(RecordClass):
skipped: int
succeeded: int
failed: int
parse_start_ts: datetime
parse_end_ts: datetime
index_start_ts: datetime
index_end_ts: datetime
archiving_start_ts: datetime
archiving_end_ts: datetime

View file

@ -3,11 +3,13 @@ import re
import sys
import time
from typing import List, Dict, Any, Optional, Union
from json import JSONEncoder
from typing import List, Dict, Optional, Iterable
from urllib.request import Request, urlopen
from urllib.parse import urlparse, quote
from decimal import Decimal
from urllib.parse import urlparse, quote, unquote
from html import escape, unescape
from datetime import datetime
from multiprocessing import Process
from subprocess import (
@ -19,6 +21,7 @@ from subprocess import (
CalledProcessError,
)
from schema import Link
from config import (
ANSI,
TERM_WIDTH,
@ -38,7 +41,8 @@ from logs import pretty_path
### Parsing Helpers
# Url Parsing: https://docs.python.org/3/library/urllib.parse.html#url-parsing
# All of these are (str) -> str
# shortcuts to: https://docs.python.org/3/library/urllib.parse.html#url-parsing
scheme = lambda url: urlparse(url).scheme
without_scheme = lambda url: urlparse(url)._replace(scheme='').geturl().strip('//')
without_query = lambda url: urlparse(url)._replace(query='').geturl().strip('//')
@ -54,6 +58,9 @@ base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
short_ts = lambda ts: ts.split('.')[0]
urlencode = lambda s: quote(s, encoding='utf-8', errors='replace')
urldecode = lambda s: unquote(s)
htmlencode = lambda s: escape(s, quote=True)
htmldecode = lambda s: unescape(s)
URL_REGEX = re.compile(
r'http[s]?://' # start matching from allowed schemes
@ -89,7 +96,7 @@ STATICFILE_EXTENSIONS = {
# html, htm, shtml, xhtml, xml, aspx, php, cgi
}
Link = Dict[str, Any]
### Checks & Tests
@ -105,7 +112,7 @@ def check_link_structure(link: Link) -> None:
assert isinstance(key, str)
assert isinstance(val, list), 'history must be a Dict[str, List], got: {}'.format(link['history'])
def check_links_structure(links: List[Link]) -> None:
def check_links_structure(links: Iterable[Link]) -> None:
"""basic sanity check invariants to make sure the data is valid"""
assert isinstance(links, list)
if links:
@ -334,7 +341,7 @@ def derived_link_info(link: Link) -> dict:
url = link['url']
to_date_str = lambda ts: datetime.fromtimestamp(Decimal(ts)).strftime('%Y-%m-%d %H:%M')
to_date_str = lambda ts: datetime.fromtimestamp(float(ts)).strftime('%Y-%m-%d %H:%M')
extended_info = {
**link,
@ -582,3 +589,30 @@ def chrome_args(**options) -> List[str]:
cmd_args.append('--user-data-dir={}'.format(options['CHROME_USER_DATA_DIR']))
return cmd_args
class ExtendedEncoder(JSONEncoder):
"""
Extended json serializer that supports serializing several model
fields and objects
"""
def default(self, obj):
cls_name = obj.__class__.__name__
if hasattr(obj, '_asdict'):
return obj._asdict()
elif isinstance(obj, bytes):
return obj.decode()
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, Exception):
return '{}: {}'.format(obj.__class__.__name__, obj)
elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
return tuple(obj)
return JSONEncoder.default(self, obj)