feat: Initial oneshot command proposal

This commit is contained in:
Cristian 2020-07-29 11:19:06 -05:00
parent 685f85aaae
commit c073ea141d
6 changed files with 148 additions and 33 deletions

View file

@ -0,0 +1,62 @@
#!/usr/bin/env python3
__package__ = 'archivebox.cli'
__command__ = 'archivebox oneshot'
import sys
import argparse
from pathlib import Path
from typing import List, Optional, IO
from ..main import oneshot
from ..util import docstring
from ..config import OUTPUT_DIR
from ..logging_util import SmartFormatter, accept_stdin, stderr
@docstring(oneshot.__doc__)
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=oneshot.__doc__,
add_help=True,
formatter_class=SmartFormatter,
)
parser.add_argument(
'url',
type=str,
default=None,
help=(
'URLs or paths to archive e.g.:\n'
' https://getpocket.com/users/USERNAME/feed/all\n'
' https://example.com/some/rss/feed.xml\n'
' https://example.com\n'
' ~/Downloads/firefox_bookmarks_export.html\n'
' ~/Desktop/sites_list.csv\n'
)
)
parser.add_argument(
'--out-dir',
type=str,
default=OUTPUT_DIR,
help= "Path to save the single archive folder to, e.g. ./example.com_archive"
)
command = parser.parse_args(args or ())
url = command.url
stdin_url = accept_stdin(stdin)
if (stdin_url and url) or (not stdin and not url):
stderr(
'[X] You must pass URLs/paths to add via stdin or CLI arguments.\n',
color='red',
)
raise SystemExit(2)
oneshot(
url=stdin_url or url,
out_dir=str(Path(command.out_dir).absolute()),
)
if __name__ == '__main__':
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -34,7 +34,7 @@ from .archive_org import should_save_archive_dot_org, save_archive_dot_org
@enforce_types
def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[str]]=None, out_dir: Optional[str]=None) -> Link:
def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[str]]=None, out_dir: Optional[str]=None, skip_index: bool=False) -> Link:
"""download the DOM, PDF, and a screenshot into a folder named after the link's timestamp"""
ARCHIVE_METHODS = [
@ -61,7 +61,7 @@ def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[s
os.makedirs(out_dir)
link = load_link_details(link, out_dir=out_dir)
write_link_details(link, out_dir=link.link_dir)
write_link_details(link, out_dir=out_dir, skip_sql_index=skip_index)
log_link_archiving_started(link, out_dir, is_new)
link = link.overwrite(updated=datetime.now())
stats = {'skipped': 0, 'succeeded': 0, 'failed': 0}
@ -97,8 +97,9 @@ def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[s
except Exception:
pass
write_link_details(link, out_dir=link.link_dir)
patch_main_index(link)
write_link_details(link, out_dir=out_dir, skip_sql_index=skip_index)
if not skip_index:
patch_main_index(link)
# # If any changes were made, update the main links index json and html
# was_changed = stats['succeeded'] or stats['failed']
@ -122,7 +123,7 @@ def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[s
@enforce_types
def archive_links(links: List[Link], overwrite: bool=False, methods: Optional[Iterable[str]]=None, out_dir: Optional[str]=None) -> List[Link]:
def archive_links(links: List[Link], overwrite: bool=False, methods: Optional[Iterable[str]]=None, out_dir: Optional[str]=None, skip_index: bool=False) -> List[Link]:
if not links:
return []
@ -131,7 +132,8 @@ def archive_links(links: List[Link], overwrite: bool=False, methods: Optional[It
link: Link = links[0]
try:
for idx, link in enumerate(links):
archive_link(link, overwrite=overwrite, methods=methods, out_dir=link.link_dir)
link_out_dir = out_dir or link.link_dir
archive_link(link, overwrite=overwrite, methods=methods, link_out_dir=out_dir, skip_index=skip_index)
except KeyboardInterrupt:
log_archiving_paused(len(links), idx, link.timestamp)
raise SystemExit(0)

View file

@ -354,12 +354,13 @@ def patch_main_index(link: Link, out_dir: str=OUTPUT_DIR) -> None:
### Link Details Index
@enforce_types
def write_link_details(link: Link, out_dir: Optional[str]=None) -> None:
def write_link_details(link: Link, out_dir: Optional[str]=None, skip_sql_index: bool=False) -> None:
out_dir = out_dir or link.link_dir
write_json_link_details(link, out_dir=out_dir)
write_html_link_details(link, out_dir=out_dir)
write_sql_link_details(link)
if not skip_sql_index:
write_sql_link_details(link)
@enforce_types

View file

@ -18,6 +18,7 @@ from .cli import (
from .parsers import (
save_text_as_source,
save_file_as_source,
parse_links_memory,
)
from .index.schema import Link
from .util import enforce_types # type: ignore
@ -493,6 +494,13 @@ def status(out_dir: str=OUTPUT_DIR) -> None:
print(ANSI['black'], ' ...', ANSI['reset'])
@enforce_types
def oneshot(url: str, out_dir: str=OUTPUT_DIR):
oneshot_links, _ = parse_links_memory([url])
oneshot_links, _ = dedupe_links([], oneshot_links)
archive_links(oneshot_links, out_dir=out_dir, skip_index=True)
return oneshot_links
@enforce_types
def add(urls: Union[str, List[str]],
depth: int=0,
@ -1055,3 +1063,4 @@ def shell(out_dir: str=OUTPUT_DIR) -> None:
setup_django(OUTPUT_DIR)
from django.core.management import call_command
call_command("shell_plus")

View file

@ -9,8 +9,9 @@ __package__ = 'archivebox.parsers'
import re
import os
from io import StringIO
from typing import Tuple, List
from typing import IO, Tuple, List
from datetime import datetime
from ..system import atomic_write
@ -37,15 +38,7 @@ from .generic_rss import parse_generic_rss_export
from .generic_json import parse_generic_json_export
from .generic_txt import parse_generic_txt_export
@enforce_types
def parse_links(source_file: str) -> Tuple[List[Link], str]:
"""parse a list of URLs with their metadata from an
RSS feed, bookmarks export, or text file
"""
check_url_parsing_invariants()
PARSERS = (
PARSERS = (
# Specialized parsers
('Pocket HTML', parse_pocket_html_export),
('Pinboard RSS', parse_pinboard_rss_export),
@ -60,30 +53,66 @@ def parse_links(source_file: str) -> Tuple[List[Link], str]:
# Fallback parser
('Plain Text', parse_generic_txt_export),
)
@enforce_types
def parse_links_memory(urls: List[str]):
"""
parse a list of URLS without touching the filesystem
"""
check_url_parsing_invariants()
timer = TimedProgress(TIMEOUT * 4)
with open(source_file, 'r', encoding='utf-8') as file:
for parser_name, parser_func in PARSERS:
try:
links = list(parser_func(file))
if links:
timer.end()
return links, parser_name
except Exception as err: # noqa
pass
# Parsers are tried one by one down the list, and the first one
# that succeeds is used. To see why a certain parser was not used
# due to error or format incompatibility, uncomment this line:
# print('[!] Parser {} failed: {} {}'.format(parser_name, err.__class__.__name__, err))
# raise
#urls = list(map(lambda x: x + "\n", urls))
file = StringIO()
file.writelines(urls)
file.name = "io_string"
output = _parse(file, timer)
if output is not None:
return output
timer.end()
return [], 'Failed to parse'
@enforce_types
def parse_links(source_file: str) -> Tuple[List[Link], str]:
"""parse a list of URLs with their metadata from an
RSS feed, bookmarks export, or text file
"""
check_url_parsing_invariants()
timer = TimedProgress(TIMEOUT * 4)
with open(source_file, 'r', encoding='utf-8') as file:
output = _parse(file, timer)
if output is not None:
return output
timer.end()
return [], 'Failed to parse'
def _parse(to_parse: IO[str], timer) -> Tuple[List[Link], str]:
for parser_name, parser_func in PARSERS:
try:
links = list(parser_func(to_parse))
if links:
timer.end()
return links, parser_name
except Exception as err: # noqa
pass
# Parsers are tried one by one down the list, and the first one
# that succeeds is used. To see why a certain parser was not used
# due to error or format incompatibility, uncomment this line:
# print('[!] Parser {} failed: {} {}'.format(parser_name, err.__class__.__name__, err))
# raise
@enforce_types
def save_text_as_source(raw_text: str, filename: str='{ts}-stdin.txt', out_dir: str=OUTPUT_DIR) -> str:
ts = str(datetime.now().timestamp()).split('.', 1)[0]
source_path = os.path.join(OUTPUT_DIR, SOURCES_DIR_NAME, filename.format(ts=ts))
source_path = os.path.join(out_dir, SOURCES_DIR_NAME, filename.format(ts=ts))
atomic_write(source_path, raw_text)
log_source_saved(source_file=source_path)
return source_path

12
tests/test_oneshot.py Normal file
View file

@ -0,0 +1,12 @@
from .fixtures import *
def test_oneshot_command_exists(tmp_path):
os.chdir(tmp_path)
process = subprocess.run(['archivebox', 'oneshot'], capture_output=True)
assert not "invalid choice: 'oneshot'" in process.stderr.decode("utf-8")
def test_oneshot_commad_saves_page_in_right_folder(tmp_path):
process = subprocess.run(["archivebox", "oneshot", f"--out-dir={tmp_path}", "http://127.0.0.1:8080/static/example.com.html"], capture_output=True)
items = ' '.join([str(x) for x in tmp_path.iterdir()])
assert "index.json" in items