mirror of
https://github.com/ArchiveBox/ArchiveBox
synced 2024-11-14 00:17:08 +00:00
enforce utf8 on literally all file operations because windows sucks
This commit is contained in:
parent
185d2f9f9b
commit
bd6d9c165b
9 changed files with 29 additions and 28 deletions
|
@ -75,8 +75,8 @@ def output_hidden(show_failing=True):
|
||||||
yield
|
yield
|
||||||
return
|
return
|
||||||
|
|
||||||
sys.stdout = open('stdout.txt', 'w+')
|
sys.stdout = open('stdout.txt', 'w+', encoding='utf-8')
|
||||||
sys.stderr = open('stderr.txt', 'w+')
|
sys.stderr = open('stderr.txt', 'w+', encoding='utf-8')
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
sys.stdout.close()
|
sys.stdout.close()
|
||||||
|
@ -89,9 +89,9 @@ def output_hidden(show_failing=True):
|
||||||
sys.stdout = stdout
|
sys.stdout = stdout
|
||||||
sys.stderr = stderr
|
sys.stderr = stderr
|
||||||
if show_failing:
|
if show_failing:
|
||||||
with open('stdout.txt', 'r') as f:
|
with open('stdout.txt', 'r', encoding='utf-8') as f:
|
||||||
print(f.read())
|
print(f.read())
|
||||||
with open('stderr.txt', 'r') as f:
|
with open('stderr.txt', 'r', encoding='utf-8') as f:
|
||||||
print(f.read())
|
print(f.read())
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
|
@ -116,7 +116,7 @@ class TestInit(unittest.TestCase):
|
||||||
assert len(load_main_index(out_dir=OUTPUT_DIR)) == 0
|
assert len(load_main_index(out_dir=OUTPUT_DIR)) == 0
|
||||||
|
|
||||||
def test_conflicting_init(self):
|
def test_conflicting_init(self):
|
||||||
with open(Path(OUTPUT_DIR) / 'test_conflict.txt', 'w+') as f:
|
with open(Path(OUTPUT_DIR) / 'test_conflict.txt', 'w+', encoding='utf-8') as f:
|
||||||
f.write('test')
|
f.write('test')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -161,7 +161,7 @@ class TestAdd(unittest.TestCase):
|
||||||
|
|
||||||
def test_add_arg_file(self):
|
def test_add_arg_file(self):
|
||||||
test_file = Path(OUTPUT_DIR) / 'test.txt'
|
test_file = Path(OUTPUT_DIR) / 'test.txt'
|
||||||
with open(test_file, 'w+') as f:
|
with open(test_file, 'w+', encoding='utf') as f:
|
||||||
f.write(test_urls)
|
f.write(test_urls)
|
||||||
|
|
||||||
with output_hidden():
|
with output_hidden():
|
||||||
|
|
|
@ -462,7 +462,7 @@ def write_config_file(config: Dict[str, str], out_dir: str=None) -> ConfigDict:
|
||||||
config_file.optionxform = str
|
config_file.optionxform = str
|
||||||
config_file.read(config_path)
|
config_file.read(config_path)
|
||||||
|
|
||||||
with open(config_path, 'r') as old:
|
with open(config_path, 'r', encoding='utf-8') as old:
|
||||||
atomic_write(f'{config_path}.bak', old.read())
|
atomic_write(f'{config_path}.bak', old.read())
|
||||||
|
|
||||||
find_section = lambda key: [name for name, opts in CONFIG_SCHEMA.items() if key in opts][0]
|
find_section = lambda key: [name for name, opts in CONFIG_SCHEMA.items() if key in opts][0]
|
||||||
|
@ -490,7 +490,7 @@ def write_config_file(config: Dict[str, str], out_dir: str=None) -> ConfigDict:
|
||||||
else:
|
else:
|
||||||
config_file['SERVER_CONFIG'] = {'SECRET_KEY': random_secret_key}
|
config_file['SERVER_CONFIG'] = {'SECRET_KEY': random_secret_key}
|
||||||
|
|
||||||
with open(config_path, 'w+') as new:
|
with open(config_path, 'w+', encoding='utf-8') as new:
|
||||||
config_file.write(new)
|
config_file.write(new)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -502,7 +502,7 @@ def write_config_file(config: Dict[str, str], out_dir: str=None) -> ConfigDict:
|
||||||
}
|
}
|
||||||
except:
|
except:
|
||||||
# something went horribly wrong, rever to the previous version
|
# something went horribly wrong, rever to the previous version
|
||||||
with open(f'{config_path}.bak', 'r') as old:
|
with open(f'{config_path}.bak', 'r', encoding='utf-8') as old:
|
||||||
atomic_write(config_path, old.read())
|
atomic_write(config_path, old.read())
|
||||||
|
|
||||||
if Path(f'{config_path}.bak').exists():
|
if Path(f'{config_path}.bak').exists():
|
||||||
|
@ -1099,7 +1099,7 @@ def setup_django(out_dir: Path=None, check_db=False, config: ConfigDict=CONFIG,
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
# log startup message to the error log
|
# log startup message to the error log
|
||||||
with open(settings.ERROR_LOG, "a+") as f:
|
with open(settings.ERROR_LOG, "a+", encoding='utf-8') as f:
|
||||||
command = ' '.join(sys.argv)
|
command = ' '.join(sys.argv)
|
||||||
ts = datetime.now().strftime('%Y-%m-%d__%H:%M:%S')
|
ts = datetime.now().strftime('%Y-%m-%d__%H:%M:%S')
|
||||||
f.write(f"\n> {command}; ts={ts} version={config['VERSION']} docker={config['IN_DOCKER']} is_tty={config['IS_TTY']}\n")
|
f.write(f"\n> {command}; ts={ts} version={config['VERSION']} docker={config['IN_DOCKER']} is_tty={config['IS_TTY']}\n")
|
||||||
|
|
|
@ -31,7 +31,7 @@ def should_save_archive_dot_org(link: Link, out_dir: Optional[Path]=None, overwr
|
||||||
|
|
||||||
out_dir = out_dir or Path(link.link_dir)
|
out_dir = out_dir or Path(link.link_dir)
|
||||||
if not overwrite and (out_dir / 'archive.org.txt').exists():
|
if not overwrite and (out_dir / 'archive.org.txt').exists():
|
||||||
# if open(path, 'r').read().strip() != 'None':
|
# if open(path, 'r', encoding='utf-8').read().strip() != 'None':
|
||||||
return False
|
return False
|
||||||
|
|
||||||
return SAVE_ARCHIVE_DOT_ORG
|
return SAVE_ARCHIVE_DOT_ORG
|
||||||
|
|
|
@ -35,7 +35,7 @@ def get_html(link: Link, path: Path) -> str:
|
||||||
document = None
|
document = None
|
||||||
for source in sources:
|
for source in sources:
|
||||||
try:
|
try:
|
||||||
with open(abs_path / source, "r") as f:
|
with open(abs_path / source, "r", encoding="utf-8") as f:
|
||||||
document = f.read()
|
document = f.read()
|
||||||
break
|
break
|
||||||
except (FileNotFoundError, TypeError):
|
except (FileNotFoundError, TypeError):
|
||||||
|
|
|
@ -16,7 +16,7 @@ def get_file_result_content(res, extra_path, use_pwd=False):
|
||||||
if extra_path:
|
if extra_path:
|
||||||
fpath = f'{fpath}/{extra_path}'
|
fpath = f'{fpath}/{extra_path}'
|
||||||
|
|
||||||
with open(fpath, 'r') as file:
|
with open(fpath, 'r', encoding='utf-8') as file:
|
||||||
data = file.read()
|
data = file.read()
|
||||||
if data:
|
if data:
|
||||||
return [data]
|
return [data]
|
||||||
|
|
|
@ -37,10 +37,11 @@ def atomic_write(path: Union[Path, str], contents: Union[dict, str, bytes], over
|
||||||
"""Safe atomic write to filesystem by writing to temp file + atomic rename"""
|
"""Safe atomic write to filesystem by writing to temp file + atomic rename"""
|
||||||
|
|
||||||
mode = 'wb+' if isinstance(contents, bytes) else 'w'
|
mode = 'wb+' if isinstance(contents, bytes) else 'w'
|
||||||
|
encoding = None if isinstance(contents, bytes) else 'utf-8'
|
||||||
|
|
||||||
# print('\n> Atomic Write:', mode, path, len(contents), f'overwrite={overwrite}')
|
# print('\n> Atomic Write:', mode, path, len(contents), f'overwrite={overwrite}')
|
||||||
try:
|
try:
|
||||||
with lib_atomic_write(path, mode=mode, overwrite=overwrite) as f:
|
with lib_atomic_write(path, mode=mode, overwrite=overwrite, encoding=encoding) as f:
|
||||||
if isinstance(contents, dict):
|
if isinstance(contents, dict):
|
||||||
dump(contents, f, indent=4, sort_keys=True, cls=ExtendedEncoder)
|
dump(contents, f, indent=4, sort_keys=True, cls=ExtendedEncoder)
|
||||||
elif isinstance(contents, (bytes, str)):
|
elif isinstance(contents, (bytes, str)):
|
||||||
|
|
|
@ -33,7 +33,7 @@ def test_depth_flag_0_crawls_only_the_arg_page(tmp_path, process, disable_extrac
|
||||||
)
|
)
|
||||||
|
|
||||||
archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
|
archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
|
||||||
with open(archived_item_path / "index.json", "r") as f:
|
with open(archived_item_path / "index.json", "r", encoding='utf-8') as f:
|
||||||
output_json = json.load(f)
|
output_json = json.load(f)
|
||||||
assert output_json["base_url"] == "127.0.0.1:8080/static/example.com.html"
|
assert output_json["base_url"] == "127.0.0.1:8080/static/example.com.html"
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ def test_add_updates_history_json_index(tmp_path, process, disable_extractors_di
|
||||||
|
|
||||||
archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
|
archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
|
||||||
|
|
||||||
with open(archived_item_path / "index.json", "r") as f:
|
with open(archived_item_path / "index.json", "r", encoding="utf-8") as f:
|
||||||
output_json = json.load(f)
|
output_json = json.load(f)
|
||||||
assert output_json["history"] != {}
|
assert output_json["history"] != {}
|
||||||
|
|
||||||
|
|
|
@ -86,7 +86,7 @@ def test_headers_retrieved(tmp_path, process, disable_extractors_dict):
|
||||||
output_file = archived_item_path / "headers.json"
|
output_file = archived_item_path / "headers.json"
|
||||||
assert output_file.exists()
|
assert output_file.exists()
|
||||||
headers_file = archived_item_path / 'headers.json'
|
headers_file = archived_item_path / 'headers.json'
|
||||||
with open(headers_file) as f:
|
with open(headers_file, 'r', encoding='utf-8') as f:
|
||||||
headers = pyjson.load(f)
|
headers = pyjson.load(f)
|
||||||
assert headers['Content-Language'] == 'en'
|
assert headers['Content-Language'] == 'en'
|
||||||
assert headers['Content-Script-Type'] == 'text/javascript'
|
assert headers['Content-Script-Type'] == 'text/javascript'
|
||||||
|
@ -98,7 +98,7 @@ def test_headers_redirect_chain(tmp_path, process, disable_extractors_dict):
|
||||||
capture_output=True, env=disable_extractors_dict)
|
capture_output=True, env=disable_extractors_dict)
|
||||||
archived_item_path = list(tmp_path.glob("archive/**/*"))[0]
|
archived_item_path = list(tmp_path.glob("archive/**/*"))[0]
|
||||||
output_file = archived_item_path / "headers.json"
|
output_file = archived_item_path / "headers.json"
|
||||||
with open(output_file) as f:
|
with open(output_file, 'r', encoding='utf-8') as f:
|
||||||
headers = pyjson.load(f)
|
headers = pyjson.load(f)
|
||||||
assert headers['Content-Language'] == 'en'
|
assert headers['Content-Language'] == 'en'
|
||||||
assert headers['Content-Script-Type'] == 'text/javascript'
|
assert headers['Content-Script-Type'] == 'text/javascript'
|
||||||
|
@ -110,6 +110,6 @@ def test_headers_400_plus(tmp_path, process, disable_extractors_dict):
|
||||||
capture_output=True, env=disable_extractors_dict)
|
capture_output=True, env=disable_extractors_dict)
|
||||||
archived_item_path = list(tmp_path.glob("archive/**/*"))[0]
|
archived_item_path = list(tmp_path.glob("archive/**/*"))[0]
|
||||||
output_file = archived_item_path / "headers.json"
|
output_file = archived_item_path / "headers.json"
|
||||||
with open(output_file) as f:
|
with open(output_file, 'r', encoding='utf-8') as f:
|
||||||
headers = pyjson.load(f)
|
headers = pyjson.load(f)
|
||||||
assert headers["Status-Code"] == "200"
|
assert headers["Status-Code"] == "200"
|
|
@ -28,11 +28,11 @@ def test_add_link(tmp_path, process, disable_extractors_dict):
|
||||||
|
|
||||||
assert "index.json" in [x.name for x in archived_item_path.iterdir()]
|
assert "index.json" in [x.name for x in archived_item_path.iterdir()]
|
||||||
|
|
||||||
with open(archived_item_path / "index.json", "r") as f:
|
with open(archived_item_path / "index.json", "r", encoding="utf-8") as f:
|
||||||
output_json = json.load(f)
|
output_json = json.load(f)
|
||||||
assert "Example Domain" == output_json['history']['title'][0]['output']
|
assert "Example Domain" == output_json['history']['title'][0]['output']
|
||||||
|
|
||||||
with open(archived_item_path / "index.html", "r") as f:
|
with open(archived_item_path / "index.html", "r", encoding="utf-8") as f:
|
||||||
output_html = f.read()
|
output_html = f.read()
|
||||||
assert "Example Domain" in output_html
|
assert "Example Domain" in output_html
|
||||||
|
|
||||||
|
@ -47,7 +47,7 @@ def test_add_link_support_stdin(tmp_path, process, disable_extractors_dict):
|
||||||
|
|
||||||
assert "index.json" in [x.name for x in archived_item_path.iterdir()]
|
assert "index.json" in [x.name for x in archived_item_path.iterdir()]
|
||||||
|
|
||||||
with open(archived_item_path / "index.json", "r") as f:
|
with open(archived_item_path / "index.json", "r", encoding="utf-8") as f:
|
||||||
output_json = json.load(f)
|
output_json = json.load(f)
|
||||||
assert "Example Domain" == output_json['history']['title'][0]['output']
|
assert "Example Domain" == output_json['history']['title'][0]['output']
|
||||||
|
|
||||||
|
@ -75,11 +75,11 @@ def test_collision_urls_different_timestamps(tmp_path, process, disable_extracto
|
||||||
|
|
||||||
first_archive = tmp_path / "archive" / str(min([float(folder) for folder in archive_folders]))
|
first_archive = tmp_path / "archive" / str(min([float(folder) for folder in archive_folders]))
|
||||||
json_index = str(first_archive / "index.json")
|
json_index = str(first_archive / "index.json")
|
||||||
with open(json_index, "r") as f:
|
with open(json_index, "r", encoding="utf-8") as f:
|
||||||
link_details = json.loads(f.read())
|
link_details = json.loads(f.read())
|
||||||
|
|
||||||
link_details["url"] = "http://127.0.0.1:8080/static/iana.org.html"
|
link_details["url"] = "http://127.0.0.1:8080/static/iana.org.html"
|
||||||
with open(json_index, "w") as f:
|
with open(json_index, "w", encoding="utf-8") as f:
|
||||||
json.dump(link_details, f)
|
json.dump(link_details, f)
|
||||||
|
|
||||||
init_process = subprocess.run(['archivebox', 'init'], capture_output=True, env=disable_extractors_dict)
|
init_process = subprocess.run(['archivebox', 'init'], capture_output=True, env=disable_extractors_dict)
|
||||||
|
@ -98,12 +98,12 @@ def test_collision_timestamps_different_urls(tmp_path, process, disable_extracto
|
||||||
archive_folders.remove(first_archive.name)
|
archive_folders.remove(first_archive.name)
|
||||||
json_index = str(first_archive / "index.json")
|
json_index = str(first_archive / "index.json")
|
||||||
|
|
||||||
with open(json_index, "r") as f:
|
with open(json_index, "r", encoding="utf-8") as f:
|
||||||
link_details = json.loads(f.read())
|
link_details = json.loads(f.read())
|
||||||
|
|
||||||
link_details["timestamp"] = archive_folders[0]
|
link_details["timestamp"] = archive_folders[0]
|
||||||
|
|
||||||
with open(json_index, "w") as f:
|
with open(json_index, "w", encoding="utf-8") as f:
|
||||||
json.dump(link_details, f)
|
json.dump(link_details, f)
|
||||||
|
|
||||||
init_process = subprocess.run(['archivebox', 'init'], capture_output=True, env=disable_extractors_dict)
|
init_process = subprocess.run(['archivebox', 'init'], capture_output=True, env=disable_extractors_dict)
|
||||||
|
|
Loading…
Reference in a new issue