add command: --parser option

This commit is contained in:
FliegendeWurst 2021-03-20 17:38:00 +01:00
parent 308be35367
commit 60bd9a902e
6 changed files with 73 additions and 21 deletions

View file

@ -10,6 +10,7 @@ from typing import List, Optional, IO
from ..main import add
from ..util import docstring
from ..parsers import PARSERS
from ..config import OUTPUT_DIR, ONLY_NEW
from ..logging_util import SmartFormatter, accept_stdin, stderr
@ -73,6 +74,13 @@ def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional
This does not take precedence over the configuration",
default=""
)
parser.add_argument(
"--parser",
type=str,
help="Parser used to read inputted URLs.",
default="auto",
choices=["auto"] + list(PARSERS.keys())
)
command = parser.parse_args(args or ())
urls = command.urls
stdin_urls = accept_stdin(stdin)
@ -90,6 +98,7 @@ def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional
overwrite=command.overwrite,
init=command.init,
extractors=command.extract,
parser=command.parser,
out_dir=pwd or OUTPUT_DIR,
)

View file

@ -265,14 +265,14 @@ def load_main_index_meta(out_dir: Path=OUTPUT_DIR) -> Optional[dict]:
@enforce_types
def parse_links_from_source(source_path: str, root_url: Optional[str]=None) -> Tuple[List[Link], List[Link]]:
def parse_links_from_source(source_path: str, root_url: Optional[str]=None, parser: str="auto") -> Tuple[List[Link], List[Link]]:
from ..parsers import parse_links
new_links: List[Link] = []
# parse and validate the import file
raw_links, parser_name = parse_links(source_path, root_url=root_url)
raw_links, parser_name = parse_links(source_path, root_url=root_url, parser=parser)
new_links = validate_links(raw_links)
if parser_name:

View file

@ -537,6 +537,7 @@ def add(urls: Union[str, List[str]],
overwrite: bool=False,
init: bool=False,
extractors: str="",
parser: str="auto",
out_dir: Path=OUTPUT_DIR) -> List[Link]:
"""Add a new URL or list of URLs to your archive"""
@ -561,7 +562,7 @@ def add(urls: Union[str, List[str]],
# save verbatim args to sources
write_ahead_log = save_text_as_source('\n'.join(urls), filename='{ts}-import.txt', out_dir=out_dir)
new_links += parse_links_from_source(write_ahead_log, root_url=None)
new_links += parse_links_from_source(write_ahead_log, root_url=None, parser=parser)
# If we're going one level deeper, download each link and look for more links
new_links_depth = []

View file

@ -42,25 +42,29 @@ from .generic_rss import parse_generic_rss_export
from .generic_json import parse_generic_json_export
from .generic_html import parse_generic_html_export
from .generic_txt import parse_generic_txt_export
from .url_list import parse_url_list
PARSERS = (
PARSERS = {
# Specialized parsers
('Pocket API', parse_pocket_api_export),
('Wallabag ATOM', parse_wallabag_atom_export),
('Pocket HTML', parse_pocket_html_export),
('Pinboard RSS', parse_pinboard_rss_export),
('Shaarli RSS', parse_shaarli_rss_export),
('Medium RSS', parse_medium_rss_export),
'pocket-api': ('Pocket API', parse_pocket_api_export),
'wallabag': ('Wallabag ATOM', parse_wallabag_atom_export),
'pocket-html': ('Pocket HTML', parse_pocket_html_export),
'pinboard-rss': ('Pinboard RSS', parse_pinboard_rss_export),
'shaarli-rss': ('Shaarli RSS', parse_shaarli_rss_export),
'medium-rss': ('Medium RSS', parse_medium_rss_export),
# General parsers
('Netscape HTML', parse_netscape_html_export),
('Generic RSS', parse_generic_rss_export),
('Generic JSON', parse_generic_json_export),
('Generic HTML', parse_generic_html_export),
'netscape-html': ('Netscape HTML', parse_netscape_html_export),
'rss': ('Generic RSS', parse_generic_rss_export),
'json': ('Generic JSON', parse_generic_json_export),
'html': ('Generic HTML', parse_generic_html_export),
# Fallback parser
('Plain Text', parse_generic_txt_export),
)
'plain-text': ('Plain Text', parse_generic_txt_export),
# Explicitly specified parsers
'url-list': ('URL list', parse_url_list),
}
@enforce_types
@ -84,7 +88,7 @@ def parse_links_memory(urls: List[str], root_url: Optional[str]=None):
@enforce_types
def parse_links(source_file: str, root_url: Optional[str]=None) -> Tuple[List[Link], str]:
def parse_links(source_file: str, root_url: Optional[str]=None, parser: str="auto") -> Tuple[List[Link], str]:
"""parse a list of URLs with their metadata from an
RSS feed, bookmarks export, or text file
"""
@ -93,7 +97,7 @@ def parse_links(source_file: str, root_url: Optional[str]=None) -> Tuple[List[Li
timer = TimedProgress(TIMEOUT * 4)
with open(source_file, 'r', encoding='utf-8') as file:
links, parser = run_parser_functions(file, timer, root_url=root_url)
links, parser = run_parser_functions(file, timer, root_url=root_url, parser=parser)
timer.end()
if parser is None:
@ -101,11 +105,20 @@ def parse_links(source_file: str, root_url: Optional[str]=None) -> Tuple[List[Li
return links, parser
def run_parser_functions(to_parse: IO[str], timer, root_url: Optional[str]=None) -> Tuple[List[Link], Optional[str]]:
def run_parser_functions(to_parse: IO[str], timer, root_url: Optional[str]=None, parser: str="auto") -> Tuple[List[Link], Optional[str]]:
most_links: List[Link] = []
best_parser_name = None
for parser_name, parser_func in PARSERS:
if parser != "auto":
parser_name, parser_func = PARSERS[parser]
parsed_links = list(parser_func(to_parse, root_url=root_url))
if not parsed_links:
raise Exception('no links found')
timer.end()
return parsed_links, parser_name
for parser_id in PARSERS:
parser_name, parser_func = PARSERS[parser_id]
try:
parsed_links = list(parser_func(to_parse, root_url=root_url))
if not parsed_links:

View file

@ -17,7 +17,7 @@ from ..util import (
@enforce_types
def parse_generic_txt_export(text_file: IO[str], **_kwargs) -> Iterable[Link]:
"""Parse raw links from each line in a text file"""
"""Parse links from a text file, ignoring other text"""
text_file.seek(0)
for line in text_file.readlines():

View file

@ -0,0 +1,29 @@
__package__ = 'archivebox.parsers'
__description__ = 'URL list'
from typing import IO, Iterable
from datetime import datetime
from ..index.schema import Link
from ..util import (
enforce_types
)
@enforce_types
def parse_url_list(text_file: IO[str], **_kwargs) -> Iterable[Link]:
"""Parse raw URLs from each line in a text file"""
text_file.seek(0)
for line in text_file.readlines():
url = line.strip()
if len(url) == 0:
continue
yield Link(
url=url,
timestamp=str(datetime.now().timestamp()),
title=None,
tags=None,
sources=[text_file.name],
)