Plex-Meta-Manager/modules/util.py

915 lines
39 KiB
Python
Raw Normal View History

2024-05-28 20:22:51 +00:00
import glob, os, re, signal, sys, time
2021-08-06 23:02:33 +00:00
from datetime import datetime, timedelta
from modules.logs import MyLogger
2022-07-26 18:30:40 +00:00
from num2words import num2words
2021-05-19 21:30:20 +00:00
from pathvalidate import is_valid_filename, sanitize_filename
from plexapi.audio import Album, Track
2021-12-17 14:24:46 +00:00
from plexapi.video import Season, Episode, Movie
from requests.exceptions import HTTPError
from tenacity import retry_if_exception
from tenacity.wait import wait_base
2021-01-20 21:37:59 +00:00
try:
import msvcrt
windows = True
except ModuleNotFoundError:
windows = False
2023-04-28 03:43:26 +00:00
logger: MyLogger = None # noqa
2021-01-20 21:37:59 +00:00
class TimeoutExpired(Exception):
pass
2022-10-17 16:06:32 +00:00
class LimitReached(Exception):
pass
2021-01-20 21:37:59 +00:00
class Failed(Exception):
pass
2022-11-11 16:59:39 +00:00
class FilterFailed(Failed):
pass
2022-11-30 21:12:52 +00:00
class Continue(Exception):
pass
2022-05-28 16:01:51 +00:00
class Deleted(Exception):
pass
class NonExisting(Exception):
pass
class NotScheduled(Exception):
pass
2022-01-06 19:16:12 +00:00
class NotScheduledRange(NotScheduled):
pass
2021-01-20 21:37:59 +00:00
class retry_if_http_429_error(retry_if_exception):
def __init__(self):
def is_http_429_error(exception: BaseException) -> bool:
return isinstance(exception, HTTPError) and exception.response.status_code == 429
super().__init__(predicate=is_http_429_error)
class wait_for_retry_after_header(wait_base):
def __init__(self, fallback):
self.fallback = fallback
def __call__(self, retry_state):
exc = retry_state.outcome.exception()
if isinstance(exc, HTTPError):
retry_after = exc.response.headers.get("Retry-After", None)
try:
if retry_after is not None:
return int(retry_after)
except (TypeError, ValueError):
pass
return self.fallback(retry_state)
2021-05-11 01:22:18 +00:00
2021-01-20 21:37:59 +00:00
days_alias = {
"monday": 0, "mon": 0, "m": 0,
"tuesday": 1, "tues": 1, "tue": 1, "tu": 1, "t": 1,
"wednesday": 2, "wed": 2, "w": 2,
"thursday": 3, "thurs": 3, "thur": 3, "thu": 3, "th": 3, "r": 3,
"friday": 4, "fri": 4, "f": 4,
"saturday": 5, "sat": 5, "s": 5,
"sunday": 6, "sun": 6, "su": 6, "u": 6
}
2021-08-14 22:59:35 +00:00
mod_displays = {
"": "is", ".not": "is not", ".is": "is", ".isnot": "is not", ".begins": "begins with", ".ends": "ends with", ".before": "is before", ".after": "is after",
2022-04-21 16:36:44 +00:00
".gt": "is greater than", ".gte": "is greater than or equal", ".lt": "is less than", ".lte": "is less than or equal", ".regex": "is"
2021-08-14 22:59:35 +00:00
}
2021-07-23 19:44:21 +00:00
pretty_days = {0: "Monday", 1: "Tuesday", 2: "Wednesday", 3: "Thursday", 4: "Friday", 5: "Saturday", 6: "Sunday"}
2024-01-15 21:38:54 +00:00
lower_days = {v.lower(): k for k, v in pretty_days.items()}
2021-01-20 21:37:59 +00:00
pretty_months = {
2021-07-23 19:44:21 +00:00
1: "January", 2: "February", 3: "March", 4: "April", 5: "May", 6: "June",
7: "July", 8: "August", 9: "September", 10: "October", 11: "November", 12: "December"
2021-01-20 21:37:59 +00:00
}
2024-01-15 21:38:54 +00:00
lower_months = {v.lower(): k for k, v in pretty_months.items()}
2022-03-03 14:43:00 +00:00
seasons = ["current", "winter", "spring", "summer", "fall"]
advance_tags_to_edit = {
2024-03-26 16:15:46 +00:00
"Movie": ["metadata_language", "use_original_title", "credits_detection"],
"Show": ["episode_sorting", "keep_episodes", "delete_episodes", "season_display", "episode_ordering", "metadata_language", "use_original_title", "credits_detection", "audio_language", "subtitle_language", "subtitle_mode"],
"Season": ["audio_language", "subtitle_language", "subtitle_mode"],
"Artist": ["album_sorting"]
}
tags_to_edit = {
"Movie": ["genre", "label", "collection", "country", "director", "producer", "writer"],
2022-06-07 07:06:20 +00:00
"Video": ["genre", "label", "collection", "country", "director", "producer", "writer"],
"Show": ["genre", "label", "collection"],
"Artist": ["genre", "label", "style", "mood", "country", "collection", "similar_artist"]
}
2022-04-13 04:30:59 +00:00
collection_mode_options = {
"default": "default", "hide": "hide",
"hide_items": "hideItems", "hideitems": "hideItems",
"show_items": "showItems", "showitems": "showItems"
}
image_content_types = ["image/png", "image/jpeg", "image/webp"]
2024-11-01 19:28:56 +00:00
parental_types = {"Sex & Nudity": "Nudity", "Violence & Gore": "Violence", "Profanity": "Profanity", "Alcohol, Drugs & Smoking": "Alcohol", "Frightening & Intense Scenes": "Frightening"}
parental_values = ["None", "Mild", "Moderate", "Severe"]
2023-03-01 22:10:46 +00:00
parental_levels = {"none": [], "mild": ["None"], "moderate": ["None", "Mild"], "severe": ["None", "Mild", "Moderate"]}
2024-11-01 19:28:56 +00:00
parental_labels = [f"{t}:{v}" for t in parental_types.values() for v in parental_values]
2022-04-21 18:24:56 +00:00
previous_time = None
start_time = None
2021-01-20 21:37:59 +00:00
def get_image_dicts(group, alias):
posters = {}
backgrounds = {}
for attr in ["url_poster", "file_poster", "url_background", "file_background"]:
if attr in alias:
if group[alias[attr]]:
if "poster" in attr:
2022-05-20 20:35:11 +00:00
posters[attr] = group[alias[attr]]
else:
2022-05-20 20:35:11 +00:00
backgrounds[attr] = group[alias[attr]]
else:
logger.error(f"Metadata Error: {attr} attribute is blank")
return posters, backgrounds
2021-08-04 14:20:52 +00:00
def add_dict_list(keys, value, dict_map):
for key in keys:
if key in dict_map:
2022-09-22 15:17:22 +00:00
dict_map[key].append(int(value))
2021-08-04 14:20:52 +00:00
else:
2022-09-22 15:17:22 +00:00
dict_map[key] = [int(value)]
2021-08-04 14:20:52 +00:00
2022-05-09 15:22:41 +00:00
def get_list(data, lower=False, upper=False, split=True, int_list=False, trim=True):
2022-05-05 12:48:04 +00:00
if split is True: split = ","
if data is None: return None
2022-03-11 14:10:23 +00:00
elif isinstance(data, list): list_data = data
2021-01-20 21:37:59 +00:00
elif isinstance(data, dict): return [data]
2022-03-11 14:36:04 +00:00
elif split is False: list_data = [str(data)]
else: list_data = [s.strip() for s in str(data).split(split)]
2022-03-11 14:10:23 +00:00
2022-05-09 15:22:41 +00:00
def get_str(input_data):
return str(input_data).strip() if trim else str(input_data)
if lower is True: return [get_str(d).lower() for d in list_data]
elif upper is True: return [get_str(d).upper() for d in list_data]
2021-11-28 08:18:12 +00:00
elif int_list is True:
2022-05-09 15:22:41 +00:00
try: return [int(get_str(d)) for d in list_data]
2021-11-28 08:18:12 +00:00
except ValueError: return []
2022-05-09 15:22:41 +00:00
else: return [d if isinstance(d, dict) else get_str(d) for d in list_data]
2021-01-20 21:37:59 +00:00
def get_int_list(data, id_type):
int_values = []
2021-07-30 19:19:43 +00:00
for value in get_list(data):
2021-01-20 21:37:59 +00:00
try: int_values.append(regex_first_int(value, id_type))
except Failed as e: logger.error(e)
return int_values
2023-12-07 19:49:32 +00:00
def validate_date(date_text, return_as=None):
2022-03-23 07:45:26 +00:00
if isinstance(date_text, datetime):
date_obg = date_text
else:
try:
date_obg = datetime.strptime(str(date_text), "%Y-%m-%d" if "-" in str(date_text) else "%m/%d/%Y")
except ValueError:
2023-12-07 19:49:32 +00:00
raise Failed(f"{date_text} must match pattern YYYY-MM-DD (e.g. 2020-12-25) or MM/DD/YYYY (e.g. 12/25/2020)")
2021-07-21 19:25:29 +00:00
return datetime.strftime(date_obg, return_as) if return_as else date_obg
2022-04-20 16:03:08 +00:00
def validate_regex(data, col_type, validate=True):
regex_list = get_list(data, split=False)
valid_regex = []
for reg in regex_list:
try:
re.compile(reg)
valid_regex.append(reg)
except re.error:
err = f"{col_type} Error: Regular Expression Invalid: {reg}"
if validate:
raise Failed(err)
else:
logger.error(err)
return valid_regex
2021-01-20 21:37:59 +00:00
def logger_input(prompt, timeout=60):
if windows: return windows_input(prompt, timeout)
elif hasattr(signal, "SIGALRM"): return unix_input(prompt, timeout)
else: raise SystemError("Input Timeout not supported on this system")
2021-07-14 14:47:20 +00:00
def header(language="en-US,en;q=0.5"):
2023-05-26 14:20:47 +00:00
return {"Accept-Language": "eng" if language == "default" else language, "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/113.0"}
2021-07-14 14:47:20 +00:00
2021-01-20 21:37:59 +00:00
def alarm_handler(signum, frame):
raise TimeoutExpired
def unix_input(prompt, timeout=60):
2021-02-24 06:44:06 +00:00
prompt = f"| {prompt}: "
2021-01-20 21:37:59 +00:00
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(timeout)
2021-07-04 04:13:06 +00:00
try: return input(prompt)
except EOFError: raise Failed("Input Failed")
finally: signal.alarm(0)
2021-01-20 21:37:59 +00:00
def windows_input(prompt, timeout=5):
2021-02-24 06:44:06 +00:00
sys.stdout.write(f"| {prompt}: ")
2021-01-20 21:37:59 +00:00
sys.stdout.flush()
result = []
2023-04-28 03:43:26 +00:00
s_time = time.time()
2021-01-20 21:37:59 +00:00
while True:
if msvcrt.kbhit():
2021-02-24 06:42:58 +00:00
char = msvcrt.getwche()
if ord(char) == 13: # enter_key
2021-01-20 21:37:59 +00:00
out = "".join(result)
print("")
2021-02-24 06:44:06 +00:00
logger.debug(f"{prompt}: {out}")
2021-01-20 21:37:59 +00:00
return out
2021-02-24 06:42:58 +00:00
elif ord(char) >= 32: #space_char
result.append(char)
2023-04-28 03:43:26 +00:00
if (time.time() - s_time) > timeout:
2021-01-20 21:37:59 +00:00
print("")
raise TimeoutExpired
def get_id_from_imdb_url(imdb_url):
match = re.search("(tt\\d+)", str(imdb_url))
if match: return match.group(1)
2021-02-24 06:44:06 +00:00
else: raise Failed(f"Regex Error: Failed to parse IMDb ID from IMDb URL: {imdb_url}")
2021-01-20 21:37:59 +00:00
def regex_first_int(data, id_type, default=None):
match = re.search("(\\d+)", str(data))
if match:
return int(match.group(1))
elif default:
2021-02-24 06:44:06 +00:00
logger.warning(f"Regex Warning: Failed to parse {id_type} from {data} using {default} as default")
2021-01-20 21:37:59 +00:00
return int(default)
else:
2021-02-24 06:44:06 +00:00
raise Failed(f"Regex Error: Failed to parse {id_type} from {data}")
2021-01-20 21:37:59 +00:00
2021-05-19 21:30:20 +00:00
def validate_filename(filename):
2022-05-31 15:22:51 +00:00
if is_valid_filename(str(filename)):
2021-05-20 20:38:48 +00:00
return filename, None
2021-05-19 21:30:20 +00:00
else:
2022-05-31 15:22:51 +00:00
mapping_name = sanitize_filename(str(filename))
2021-05-20 20:38:48 +00:00
return mapping_name, f"Log Folder Name: {filename} is invalid using {mapping_name}"
2021-07-06 15:46:29 +00:00
2021-12-17 14:24:46 +00:00
def item_title(item):
if isinstance(item, Season):
if f"Season {item.index}" == item.title:
return f"{item.parentTitle} {item.title}"
else:
return f"{item.parentTitle} Season {item.index}: {item.title}"
elif isinstance(item, Episode):
2022-08-23 15:30:53 +00:00
season = item.parentIndex if item.parentIndex else 0
episode = item.index if item.index else 0
show_title = item.grandparentTitle if item.grandparentTitle else ""
season_title = f"{item.parentTitle}: " if item.parentTitle and f"Season {season}" == item.parentTitle else ""
return f"{show_title} S{season:02}E{episode:02}: {season_title}{item.title if item.title else ''}"
2021-12-17 14:24:46 +00:00
elif isinstance(item, Movie) and item.year:
return f"{item.title} ({item.year})"
elif isinstance(item, Album):
return f"{item.parentTitle}: {item.title}"
elif isinstance(item, Track):
return f"{item.grandparentTitle}: {item.parentTitle}: {item.title}"
2021-12-17 14:24:46 +00:00
else:
return item.title
def item_set(item, item_id):
return {"title": item_title(item), "tmdb" if isinstance(item, Movie) else "tvdb": item_id}
2021-07-06 15:46:29 +00:00
def is_locked(filepath):
locked = None
file_object = None
if os.path.exists(filepath):
try:
file_object = open(filepath, 'a', 8)
if file_object:
locked = False
2021-07-30 19:19:43 +00:00
except IOError:
2021-07-06 15:46:29 +00:00
locked = True
finally:
if file_object:
file_object.close()
return locked
2021-07-22 21:00:45 +00:00
2021-12-18 03:02:24 +00:00
def time_window(tw):
2021-11-27 00:30:41 +00:00
today = datetime.now()
2021-12-18 03:02:24 +00:00
if tw == "today":
2021-11-27 00:30:41 +00:00
return f"{today:%Y-%m-%d}"
2021-12-18 03:02:24 +00:00
elif tw == "yesterday":
2021-11-27 00:30:41 +00:00
return f"{today - timedelta(days=1):%Y-%m-%d}"
2021-12-18 03:02:24 +00:00
elif tw == "this_week":
2021-11-27 00:30:41 +00:00
return f"{today:%Y-0%V}"
2021-12-18 03:02:24 +00:00
elif tw == "last_week":
2021-11-27 00:30:41 +00:00
return f"{today - timedelta(weeks=1):%Y-0%V}"
2021-12-18 03:02:24 +00:00
elif tw == "this_month":
2021-11-27 00:30:41 +00:00
return f"{today:%Y-%m}"
2023-01-10 02:11:11 +00:00
elif tw == "last_month" and today.month == 1:
return f"{today.year - 1}-12"
2021-12-18 03:02:24 +00:00
elif tw == "last_month":
2023-01-10 02:11:11 +00:00
return f"{today.year}-{today.month - 1:02}"
2021-12-18 03:02:24 +00:00
elif tw == "this_year":
2021-11-27 00:30:41 +00:00
return f"{today.year}"
2021-12-18 03:02:24 +00:00
elif tw == "last_year":
2021-11-27 00:30:41 +00:00
return f"{today.year - 1}"
else:
2021-12-18 03:02:24 +00:00
return tw
2021-11-27 00:30:41 +00:00
2023-02-24 15:52:23 +00:00
def load_files(files_to_load, method, err_type="Config", schedule=None, lib_vars=None, single=False):
2022-04-10 02:28:05 +00:00
files = []
2023-04-18 18:37:08 +00:00
had_scheduled = False
2022-06-01 04:50:38 +00:00
if not lib_vars:
lib_vars = {}
2023-02-24 15:52:23 +00:00
files_to_load = get_list(files_to_load, split=False)
if single and len(files_to_load) > 1:
raise Failed(f"{err_type} Error: {method} can only have one entry")
for file in files_to_load:
2023-12-31 16:45:00 +00:00
logger.info("")
2022-04-18 18:16:39 +00:00
if isinstance(file, dict):
2022-04-26 05:34:06 +00:00
current = []
2022-04-10 02:28:05 +00:00
def check_dict(attr, name):
2024-04-22 14:20:12 +00:00
if attr in file and (method != "metadata_files" or attr not in ["pmm", "default"]):
2023-12-31 16:45:00 +00:00
logger.info(f"Reading {attr}: {file[attr]}")
2022-04-18 18:16:39 +00:00
if file[attr]:
2024-04-22 14:20:12 +00:00
if attr in ["pmm", "default"] and file[attr] == "other_award":
logger.error(f"{err_type} Error: The Kometa Default other_award has been deprecated. Please visit the wiki for the full list of available award files")
2024-01-07 15:35:05 +00:00
elif attr == "git" and file[attr].startswith("PMM/"):
2024-04-22 14:20:12 +00:00
current.append(("Default", file[attr][4:]))
2022-09-27 06:19:29 +00:00
else:
2023-12-31 16:45:00 +00:00
current.append((name, file[attr]))
2022-04-10 02:28:05 +00:00
else:
2023-02-24 15:52:23 +00:00
logger.error(f"{err_type} Error: {method} {attr} is blank")
2023-12-31 16:45:00 +00:00
return ""
2022-04-10 02:28:05 +00:00
check_dict("url", "URL")
check_dict("git", "Git")
2024-04-22 14:20:12 +00:00
check_dict("pmm", "Default")
check_dict("default", "Default")
2022-04-10 02:28:05 +00:00
check_dict("repo", "Repo")
check_dict("file", "File")
2023-02-24 15:52:23 +00:00
if not single and "folder" in file:
2023-12-31 16:45:00 +00:00
logger.info(f"Reading folder: {file['folder']}")
2022-04-18 18:16:39 +00:00
if file["folder"] is None:
2023-02-24 15:52:23 +00:00
logger.error(f"{err_type} Error: {method} folder is blank")
2022-04-18 18:16:39 +00:00
elif not os.path.isdir(file["folder"]):
2023-02-24 15:52:23 +00:00
logger.error(f"{err_type} Error: Folder not found: {file['folder']}")
2022-04-10 02:28:05 +00:00
else:
2022-10-02 17:19:32 +00:00
yml_files = glob_filter(os.path.join(file["folder"], "*.yml"))
yml_files.extend(glob_filter(os.path.join(file["folder"], "*.yaml")))
2022-04-10 02:28:05 +00:00
if yml_files:
2023-12-31 16:45:00 +00:00
current.extend([("File", yml) for yml in yml_files])
2022-04-10 02:28:05 +00:00
else:
2023-02-24 15:52:23 +00:00
logger.error(f"{err_type} Error: No YAML (.yml|.yaml) files found in {file['folder']}")
2022-04-26 05:34:06 +00:00
2023-12-31 16:45:00 +00:00
temp_vars = {}
if "template_variables" in file and file["template_variables"] and isinstance(file["template_variables"], dict):
temp_vars = file["template_variables"]
for k, v in lib_vars.items():
if k not in temp_vars:
temp_vars[k] = v
if temp_vars:
logger.info(f"Template Variables: {temp_vars}")
asset_directory = []
if "asset_directory" in file and file["asset_directory"]:
logger.info(f"Asset Directory: {file['asset_directory']}")
for asset_path in get_list(file["asset_directory"], split=False):
if os.path.exists(asset_path):
asset_directory.append(asset_path)
else:
logger.error(f"{err_type} Error: Asset Directory Does Not Exist: {asset_path}")
2022-04-26 05:34:06 +00:00
if schedule and "schedule" in file and file["schedule"]:
current_time, run_hour, ignore_schedules = schedule
2023-12-31 16:45:00 +00:00
logger.info(f"Schedule: {file['schedule']}")
2022-04-26 05:34:06 +00:00
err = None
2023-12-31 16:45:00 +00:00
schedule_str = None
2022-04-26 05:34:06 +00:00
try:
2023-12-31 16:45:00 +00:00
schedule_str = schedule_check("schedule", file["schedule"], current_time, run_hour)
2022-04-26 05:34:06 +00:00
except NotScheduledRange as e:
err = e
2023-12-31 16:45:00 +00:00
schedule_str = e
2022-04-26 05:34:06 +00:00
except NotScheduled as e:
if not ignore_schedules:
err = e
2023-12-31 16:45:00 +00:00
schedule_str = e
if schedule_str:
logger.info(f"Schedule Read:{schedule_str}\n")
2022-04-26 05:34:06 +00:00
if err:
2023-04-18 18:37:08 +00:00
had_scheduled = True
2023-12-31 16:45:00 +00:00
logger.warning(f"This {'set of files' if len(current) > 1 else 'file'} not scheduled to run")
2022-04-26 05:34:06 +00:00
continue
2023-12-31 16:45:00 +00:00
files.extend([(ft, fp, temp_vars, asset_directory) for ft, fp in current])
2022-04-10 02:28:05 +00:00
else:
2023-12-31 16:45:00 +00:00
logger.info(f"Reading file: {file}")
2022-04-18 18:16:39 +00:00
if os.path.exists(file):
2022-04-21 05:35:07 +00:00
files.append(("File", file, {}, None))
2022-04-10 02:28:05 +00:00
else:
2023-02-24 15:52:23 +00:00
logger.error(f"{err_type} Error: Path not found: {file}")
2023-04-18 18:37:08 +00:00
return files, had_scheduled
2022-04-10 02:28:05 +00:00
2022-02-06 07:33:09 +00:00
def check_num(num, is_int=True):
try:
return int(str(num)) if is_int else float(str(num))
except (ValueError, TypeError):
return None
2022-04-13 04:30:59 +00:00
def check_collection_mode(collection_mode):
if collection_mode and str(collection_mode).lower() in collection_mode_options:
return collection_mode_options[str(collection_mode).lower()]
else:
raise Failed(f"Config Error: {collection_mode} collection_mode invalid\n\tdefault (Library default)\n\thide (Hide Collection)\n\thide_items (Hide Items in this Collection)\n\tshow_items (Show this Collection and its Items)")
2021-08-10 15:33:32 +00:00
def glob_filter(filter_in):
filter_in = filter_in.translate({ord("["): "[[]", ord("]"): "[]]"}) if "[" in filter_in else filter_in
2021-12-13 07:30:19 +00:00
return glob.glob(filter_in)
2021-08-10 15:33:32 +00:00
2021-08-07 06:01:21 +00:00
def is_date_filter(value, modifier, data, final, current_time):
if value is None:
return True
2021-08-06 23:02:33 +00:00
if modifier in ["", ".not"]:
threshold_date = current_time - timedelta(days=data)
2021-08-07 06:01:21 +00:00
if (modifier == "" and (value is None or value < threshold_date)) \
or (modifier == ".not" and value and value >= threshold_date):
return True
2021-08-06 23:02:33 +00:00
elif modifier in [".before", ".after"]:
2023-12-07 19:49:32 +00:00
try:
filter_date = validate_date(data)
except Failed as e:
raise Failed(f"Collection Error: {final}: {e}")
2021-08-07 06:01:21 +00:00
if (modifier == ".before" and value >= filter_date) or (modifier == ".after" and value <= filter_date):
return True
2021-08-06 23:02:33 +00:00
elif modifier == ".regex":
2024-01-09 16:59:17 +00:00
jailbreak = False
2021-08-06 23:02:33 +00:00
for check_data in data:
2021-08-07 06:01:21 +00:00
if re.compile(check_data).match(value.strftime("%m/%d/%Y")):
2021-08-06 23:02:33 +00:00
jailbreak = True
break
if not jailbreak:
2021-08-07 06:01:21 +00:00
return True
return False
def is_number_filter(value, modifier, data):
return value is None or (modifier == "" and value == data) \
or (modifier == ".not" and value != data) \
or (modifier == ".gt" and value <= data) \
2021-08-07 06:01:21 +00:00
or (modifier == ".gte" and value < data) \
or (modifier == ".lt" and value >= data) \
or (modifier == ".lte" and value > data)
2021-12-10 16:17:50 +00:00
def is_boolean_filter(value, data):
return (data and not value) or (not data and value)
2021-08-07 06:01:21 +00:00
def is_string_filter(values, modifier, data):
jailbreak = False
if modifier == ".regex":
logger.trace(f"Regex Values: {values}")
2021-08-07 06:01:21 +00:00
for value in values:
for check_value in data:
if (modifier in ["", ".not"] and check_value.lower() in value.lower()) \
or (modifier in [".is", ".isnot"] and value.lower() == check_value.lower()) \
2021-08-07 06:01:21 +00:00
or (modifier == ".begins" and value.lower().startswith(check_value.lower())) \
or (modifier == ".ends" and value.lower().endswith(check_value.lower())) \
2022-04-13 04:30:59 +00:00
or (modifier == ".regex" and re.compile(check_value).search(value)):
2021-08-07 06:01:21 +00:00
jailbreak = True
break
if jailbreak: break
return (jailbreak and modifier in [".not", ".isnot"]) or (not jailbreak and modifier in ["", ".is", ".begins", ".ends", ".regex"])
2021-08-06 23:02:33 +00:00
def check_day(_m, _d):
if _m in [1, 3, 5, 7, 8, 10, 12] and _d > 31:
return _m, 31
elif _m in [4, 6, 9, 11] and _d > 30:
return _m, 30
elif _m == 2 and _d > 28:
return _m, 28
else:
return _m, _d
2022-05-12 14:09:08 +00:00
def schedule_check(attribute, data, current_time, run_hour, is_all=False):
2022-01-06 19:16:12 +00:00
range_collection = False
non_existing = False
2022-04-30 22:25:29 +00:00
all_check = 0
schedules_run = 0
2021-12-18 03:02:24 +00:00
next_month = current_time.replace(day=28) + timedelta(days=4)
last_day = next_month - timedelta(days=next_month.day)
schedule_str = ""
2022-05-12 14:09:08 +00:00
if isinstance(data, str) and (("all" in data and not data.endswith("]")) or data.count("all") > 1):
raise Failed("Schedule Error: each all schedule must be on its own line")
elif isinstance(data, str) and "all" in data:
data = [data]
2022-04-30 22:25:29 +00:00
for schedule in get_list(data):
2021-12-18 03:02:24 +00:00
run_time = str(schedule).lower()
2022-01-06 19:16:12 +00:00
display = f"{attribute} attribute {schedule} invalid"
2022-04-30 22:25:29 +00:00
schedules_run += 1
2022-05-12 14:09:08 +00:00
if run_time.startswith("all"):
match = re.search("\\[([^\\]]+)\\]", run_time)
if not match:
logger.error(f"Schedule Error: failed to parse {attribute}: {schedule}")
continue
try:
2022-05-12 15:04:52 +00:00
schedule_str += f"\nScheduled to meet all of these:\n\t"
schedule_str += schedule_check(attribute, match.group(1), current_time, run_hour, is_all=True)
2022-05-12 14:09:08 +00:00
all_check += 1
2022-05-12 15:04:52 +00:00
except NotScheduled as e:
schedule_str += str(e)
2022-05-12 14:09:08 +00:00
continue
2022-04-30 22:25:29 +00:00
elif run_time.startswith(("day", "daily")):
all_check += 1
elif run_time.startswith("non_existing"):
all_check += 1
non_existing = True
2021-12-18 03:02:24 +00:00
elif run_time == "never":
schedule_str += f"\nNever scheduled to run"
elif run_time.startswith(("hour", "week", "month", "year", "range")):
match = re.search("\\(([^)]+)\\)", run_time)
if not match:
2021-12-19 05:41:58 +00:00
logger.error(f"Schedule Error: failed to parse {attribute}: {schedule}")
2021-12-18 03:02:24 +00:00
continue
param = match.group(1)
if run_time.startswith("hour"):
2023-08-24 15:11:22 +00:00
if "-" in run_time:
2023-08-30 15:18:20 +00:00
start, end = param.split("-")
2023-08-24 15:11:22 +00:00
try:
start = int(start)
end = int(end)
if start != end and 0 <= start <= 23 and 0 <= end <= 23:
schedule_str += f"\nScheduled to run between the {num2words(start, to='ordinal_num')} hour and the {num2words(end, to='ordinal_num')} hour"
if end > start and start <= run_hour <= end:
all_check += 1
elif start > end and (start <= run_hour or run_hour <= end):
all_check += 1
else:
raise ValueError
except ValueError:
logger.error(f"Schedule Error: hourly {start}-{end} each must be a different integer between 0 and 23")
else:
try:
if 0 <= int(param) <= 23:
schedule_str += f"\nScheduled to run on the {num2words(param, to='ordinal_num')} hour"
if run_hour == int(param):
all_check += 1
else:
raise ValueError
except ValueError:
logger.error(f"Schedule Error: hourly {display} must be an integer between 0 and 23")
2021-12-18 03:02:24 +00:00
elif run_time.startswith("week"):
2023-08-24 15:11:22 +00:00
ok_days = param.lower().split("|")
err = None
for ok_day in ok_days:
if ok_day not in days_alias:
err = f"Schedule Error: weekly {display} must be a day of the week i.e. weekly(Monday)"
if err:
logger.error(err)
2021-12-18 03:02:24 +00:00
continue
2023-08-24 15:11:22 +00:00
pass_day = False
for ok_day in ok_days:
weekday = days_alias[ok_day]
schedule_str += f"\nScheduled weekly on {pretty_days[weekday]}"
if weekday == current_time.weekday():
pass_day = True
if pass_day:
2022-04-30 22:25:29 +00:00
all_check += 1
2021-12-18 03:02:24 +00:00
elif run_time.startswith("month"):
try:
if 1 <= int(param) <= 31:
2022-07-26 18:30:40 +00:00
schedule_str += f"\nScheduled monthly on the {num2words(param, to='ordinal_num')}"
if current_time.day == int(param) or (current_time.day == last_day.day and int(param) > last_day.day):
2022-04-30 22:25:29 +00:00
all_check += 1
2021-12-18 03:02:24 +00:00
else:
raise ValueError
except ValueError:
2022-01-06 19:16:12 +00:00
logger.error(f"Schedule Error: monthly {display} must be an integer between 1 and 31")
2021-12-18 03:02:24 +00:00
elif run_time.startswith("year"):
try:
if "/" in param:
opt = param.split("/")
month = int(opt[0])
day = int(opt[1])
2022-07-26 18:30:40 +00:00
schedule_str += f"\nScheduled yearly on {pretty_months[month]} {num2words(day, to='ordinal_num')}"
if current_time.month == month and (current_time.day == day or (current_time.day == last_day.day and day > last_day.day)):
2022-04-30 22:25:29 +00:00
all_check += 1
2021-12-18 03:02:24 +00:00
else:
raise ValueError
except ValueError:
2022-01-06 19:16:12 +00:00
logger.error(f"Schedule Error: yearly {display} must be in the MM/DD format i.e. yearly(11/22)")
2021-12-18 03:02:24 +00:00
elif run_time.startswith("range"):
2023-09-19 19:28:03 +00:00
ranges = []
range_pass = False
for ok_range in param.lower().split("|"):
2023-09-16 20:55:12 +00:00
match = re.match("^(1[0-2]|0?[1-9])/(3[01]|[12][0-9]|0?[1-9])-(1[0-2]|0?[1-9])/(3[01]|[12][0-9]|0?[1-9])$", ok_range)
if not match:
logger.error(f"Schedule Error: range {display} must be in the MM/DD-MM/DD format i.e. range(12/01-12/25)")
continue
month_start, day_start = check_day(int(match.group(1)), int(match.group(2)))
month_end, day_end = check_day(int(match.group(3)), int(match.group(4)))
month_check, day_check = check_day(current_time.month, current_time.day)
check = datetime.strptime(f"{month_check}/{day_check}", "%m/%d")
start = datetime.strptime(f"{month_start}/{day_start}", "%m/%d")
end = datetime.strptime(f"{month_end}/{day_end}", "%m/%d")
range_collection = True
2023-09-19 19:28:03 +00:00
ranges.append(f"{pretty_months[month_start]} {num2words(day_start, to='ordinal_num')} and {pretty_months[month_end]} {num2words(day_end, to='ordinal_num')}")
2023-09-16 20:55:12 +00:00
if start <= check <= end if start < end else (check <= end or check >= start):
2023-09-19 19:28:03 +00:00
range_pass = True
if ranges:
schedule_str += f"\nScheduled {' or '.join(ranges)}"
if range_pass:
all_check += 1
2021-12-18 03:02:24 +00:00
else:
2022-01-06 19:16:12 +00:00
logger.error(f"Schedule Error: {display}")
2022-05-12 15:04:52 +00:00
if is_all:
schedule_str.replace("\n", "\n\t")
2022-05-12 14:09:08 +00:00
if (all_check == 0 and not is_all) or (is_all and schedules_run != all_check):
if non_existing:
raise NonExisting(schedule_str)
elif range_collection:
2022-04-30 22:25:29 +00:00
raise NotScheduledRange(schedule_str)
else:
raise NotScheduled(schedule_str)
2022-05-12 15:04:52 +00:00
return schedule_str
2022-01-28 18:36:21 +00:00
def check_int(value, datatype="int", minimum=1, maximum=None, throw=False):
2022-03-28 16:15:34 +00:00
try:
value = int(str(value)) if datatype == "int" else float(str(value))
if (maximum is None and minimum <= value) or (maximum is not None and minimum <= value <= maximum):
return value
except ValueError:
if throw:
message = f"{value} must be {'an integer' if datatype == 'int' else 'a number'}"
raise Failed(f"{message} {minimum} or greater" if maximum is None else f"{message} between {minimum} and {maximum}")
return None
2022-03-28 16:15:34 +00:00
2023-01-24 19:07:42 +00:00
def parse_and_or(error, attribute, data, test_list):
2022-12-13 17:15:50 +00:00
out = ""
2023-01-24 19:07:42 +00:00
final = ""
2022-12-13 21:12:59 +00:00
ands = [d.strip() for d in data.split(",")]
2022-12-13 17:15:50 +00:00
for an in ands:
2022-12-13 21:12:59 +00:00
ors = [a.strip() for a in an.split("|")]
2023-01-24 19:07:42 +00:00
or_num = []
2022-12-13 17:15:50 +00:00
for item in ors:
if not item:
raise Failed(f"{error} Error: Cannot have a blank {attribute}")
2023-01-24 19:07:42 +00:00
if str(item) not in test_list:
2022-12-13 17:15:50 +00:00
raise Failed(f"{error} Error: {attribute} {item} is invalid")
2023-01-24 22:03:30 +00:00
or_num.append(str(test_list[str(item)]))
2023-01-24 19:07:42 +00:00
if final:
final += ","
final += "|".join(or_num)
2022-12-13 17:15:50 +00:00
if out:
out += f" and "
if len(ands) > 1 and len(ors) > 1:
out += "("
if len(ors) > 1:
2023-01-23 19:08:39 +00:00
out += ' or '.join([test_list[test_list[str(o)]] if test_list else o for o in ors])
2022-12-13 17:15:50 +00:00
else:
2023-01-23 19:08:39 +00:00
out += test_list[test_list[str(ors[0])]] if test_list else ors[0]
2022-12-13 17:15:50 +00:00
if len(ands) > 1 and len(ors) > 1:
out += ")"
2023-01-24 19:07:42 +00:00
return out, final
2022-12-13 17:15:50 +00:00
2023-12-07 19:49:32 +00:00
def parse(error, attribute, data, datatype=None, methods=None, parent=None, default=None, options=None, translation=None, minimum=1, maximum=None, regex=None, range_split=None, date_return=None):
2022-01-28 18:36:21 +00:00
display = f"{parent + ' ' if parent else ''}{attribute} attribute"
if options is None and translation is not None:
options = [o for o in translation]
value = data[methods[attribute]] if methods and attribute in methods else data
2023-12-07 19:49:32 +00:00
if datatype in ["list", "commalist", "strlist", "lowerlist", "upperlist"]:
2022-01-28 21:20:09 +00:00
final_list = []
2022-01-28 18:36:21 +00:00
if value:
2023-12-07 19:49:32 +00:00
if isinstance(value, dict):
2022-04-14 13:29:54 +00:00
raise Failed(f"{error} Error: {display} {value} must be a list or string")
2022-01-28 18:36:21 +00:00
if datatype == "commalist":
value = get_list(value)
2023-01-27 18:33:22 +00:00
if datatype == "lowerlist":
value = get_list(value, lower=True)
2023-12-07 19:49:32 +00:00
if datatype == "upperlist":
value = get_list(value, upper=True)
2022-01-28 21:20:09 +00:00
if not isinstance(value, list):
value = [value]
for v in value:
2024-01-01 23:19:29 +00:00
if v or v == 0:
2022-03-25 20:13:51 +00:00
if options is None or (options and (v in options or (datatype == "strlist" and str(v) in options))):
final_list.append(str(v) if datatype == "strlist" else v)
2022-01-28 21:20:09 +00:00
elif options:
2024-01-15 21:38:54 +00:00
raise Failed(f"{error} Error: {display} {v} is invalid; Options include: {', '.join([o for o in options])}")
2022-01-28 21:20:09 +00:00
return final_list
2022-01-28 18:36:21 +00:00
elif datatype == "intlist":
if value:
try:
return [int(v) for v in value if v] if isinstance(value, list) else get_list(value, int_list=True)
2022-01-28 18:36:21 +00:00
except ValueError:
pass
return []
2022-01-30 07:48:56 +00:00
elif datatype == "listdict":
2022-01-28 18:36:21 +00:00
final_list = []
for dict_data in get_list(value, split=False):
2022-01-28 18:36:21 +00:00
if isinstance(dict_data, dict):
2022-02-13 22:47:08 +00:00
final_list.append(dict_data)
2022-01-28 18:36:21 +00:00
else:
raise Failed(f"{error} Error: {display} {dict_data} is not a dictionary")
return final_list
elif datatype in ["dict", "dictlist", "dictdict", "strdict", "dictliststr"]:
2022-01-28 18:36:21 +00:00
if isinstance(value, dict):
2022-01-30 07:48:56 +00:00
if datatype == "dict":
return value
elif datatype == "dictlist":
2023-03-21 14:31:43 +00:00
return {k: v if isinstance(v, list) else [v] if v else [] for k, v in value.items()}
elif datatype == "dictliststr":
return {str(k): [str(y) for y in v] if isinstance(v, list) else [str(v)] for k, v in value.items()}
2022-03-12 15:34:43 +00:00
elif datatype == "strdict":
return {str(k): str(v) for k, v in value.items()}
2022-01-30 07:48:56 +00:00
else:
final_dict = {}
for dict_key, dict_data in value.items():
if isinstance(dict_data, dict) and dict_data:
2022-03-22 20:06:59 +00:00
new_data = {}
for dict_data_key, dict_data_data in dict_data.items():
new_data[str(dict_data_key)] = dict_data_data
final_dict[dict_key] = new_data
2022-01-30 07:48:56 +00:00
else:
2022-09-30 15:56:13 +00:00
raise Failed(f"{error} Warning: {display} {dict_key} is not a dictionary")
2022-01-30 07:48:56 +00:00
return final_dict
else:
raise Failed(f"{error} Error: {display} {value} is not a dictionary")
2022-01-28 18:36:21 +00:00
elif methods and attribute not in methods:
message = f"{display} not found"
elif value is None:
message = f"{display} is blank"
elif regex is not None:
regex_str, example = regex
if re.compile(regex_str).match(str(value)):
return str(value)
else:
message = f"{display}: {value} must match pattern {regex_str} e.g. {example}"
elif datatype == "bool":
if isinstance(value, bool):
return value
elif isinstance(value, (int, float)):
return value > 0
2023-02-23 18:57:00 +00:00
elif str(value).lower() in ["t", "true", "y", "yes"]:
2022-01-28 18:36:21 +00:00
return True
2023-02-23 18:57:00 +00:00
elif str(value).lower() in ["f", "false", "n", "no"]:
2022-01-28 18:36:21 +00:00
return False
else:
message = f"{display} must be either true or false"
elif datatype in ["int", "float"]:
2022-03-28 16:15:34 +00:00
if range_split:
range_values = str(value).split(range_split)
if len(range_values) == 2:
2024-01-11 20:02:42 +00:00
start = check_int(range_values[0], datatype=datatype, minimum=minimum, maximum=maximum)
end = check_int(range_values[1], datatype=datatype, minimum=minimum, maximum=maximum)
2022-03-28 16:15:34 +00:00
if start and end and start < end:
return f"{start}{range_split}{end}"
2022-01-28 18:36:21 +00:00
else:
2022-10-27 06:39:30 +00:00
new_value = check_int(value, datatype=datatype, minimum=minimum, maximum=maximum)
if new_value is not None:
return new_value
2022-03-28 16:15:34 +00:00
message = f"{display} {value} must {'each ' if range_split else ''}be {'an integer' if datatype == 'int' else 'a number'}"
message = f"{message} {minimum} or greater" if maximum is None else f"{message} between {minimum} and {maximum}"
if range_split:
message = f"{message} separated by a {range_split}"
2023-12-07 19:49:32 +00:00
elif datatype == "date":
try:
2024-01-15 21:38:54 +00:00
if default in ["today", "current"]:
default = validate_date(datetime.now(), return_as=date_return)
return validate_date(datetime.now() if data in ["today", "current"] else data, return_as=date_return)
2023-12-07 19:49:32 +00:00
except Failed as e:
message = f"{e}"
2022-01-28 18:36:21 +00:00
elif (translation is not None and str(value).lower() not in translation) or \
(options is not None and translation is None and str(value).lower() not in options):
2023-12-12 21:24:37 +00:00
message = f"{display} {value} must be in [{', '.join([str(o) for o in options])}]"
2022-01-28 18:36:21 +00:00
else:
2023-12-07 19:49:32 +00:00
return translation[str(value).lower()] if translation is not None else value
2022-01-28 18:36:21 +00:00
if default is None:
raise Failed(f"{error} Error: {message}")
else:
logger.warning(f"{error} Warning: {message} using {default} as default")
return translation[default] if translation is not None else default
2022-02-13 18:28:53 +00:00
def parse_cords(data, parent, required=False, err_type="Overlay", default=None):
dho, dha, dvo, dva = default if default else (None, None, None, None)
horizontal_align = parse(err_type, "horizontal_align", data["horizontal_align"], parent=parent,
2022-10-31 16:28:26 +00:00
options=["left", "center", "right"]) if "horizontal_align" in data else None
if horizontal_align is None:
if required:
raise Failed(f"{err_type} Error: {parent} horizontal_align is required")
horizontal_align = dha
2022-10-31 16:28:26 +00:00
vertical_align = parse(err_type, "vertical_align", data["vertical_align"], parent=parent,
2022-10-31 16:28:26 +00:00
options=["top", "center", "bottom"]) if "vertical_align" in data else None
if vertical_align is None:
if required:
raise Failed(f"{err_type} Error: {parent} vertical_align is required")
vertical_align = dva
2022-10-28 23:32:48 +00:00
horizontal_offset = None
if "horizontal_offset" in data and data["horizontal_offset"] is not None:
x_off = data["horizontal_offset"]
per = False
if str(x_off).endswith("%"):
x_off = x_off[:-1]
per = True
x_off = check_num(x_off)
error = f"{err_type} Error: {parent} horizontal_offset: {data['horizontal_offset']} must be a number"
2022-10-28 23:32:48 +00:00
if x_off is None:
raise Failed(error)
if horizontal_align != "center" and not per and x_off < 0:
raise Failed(f"{error} 0 or greater")
elif horizontal_align != "center" and per and (x_off > 100 or x_off < 0):
raise Failed(f"{error} between 0% and 100%")
elif horizontal_align == "center" and per and (x_off > 50 or x_off < -50):
raise Failed(f"{error} between -50% and 50%")
horizontal_offset = f"{x_off}%" if per else x_off
if horizontal_offset is None:
if required:
raise Failed(f"{err_type} Error: {parent} horizontal_offset is required")
horizontal_offset = dho
2022-10-28 23:32:48 +00:00
vertical_offset = None
if "vertical_offset" in data and data["vertical_offset"] is not None:
y_off = data["vertical_offset"]
per = False
if str(y_off).endswith("%"):
y_off = y_off[:-1]
per = True
y_off = check_num(y_off)
error = f"{err_type} Error: {parent} vertical_offset: {data['vertical_offset']} must be a number"
2022-10-28 23:32:48 +00:00
if y_off is None:
raise Failed(error)
if vertical_align != "center" and not per and y_off < 0:
raise Failed(f"{error} 0 or greater")
elif vertical_align != "center" and per and (y_off > 100 or y_off < 0):
raise Failed(f"{error} between 0% and 100%")
elif vertical_align == "center" and per and (y_off > 50 or y_off < -50):
raise Failed(f"{error} between -50% and 50%")
vertical_offset = f"{y_off}%" if per else y_off
if vertical_offset is None:
if required:
raise Failed(f"{err_type} Error: {parent} vertical_offset is required")
vertical_offset = dvo
2022-10-28 23:32:48 +00:00
2022-11-03 19:44:01 +00:00
return horizontal_offset, horizontal_align, vertical_offset, vertical_align
2022-10-28 23:32:48 +00:00
2022-02-13 18:28:53 +00:00
def replace_label(_label, _data):
replaced = False
if isinstance(_data, dict):
final_data = {}
for sm, sd in _data.items():
try:
_new_data, _new_replaced = replace_label(_label, sd)
final_data[sm] = _new_data
if _new_replaced:
replaced = True
except Failed:
continue
elif isinstance(_data, list):
final_data = []
for li in _data:
try:
_new_data, _new_replaced = replace_label(_label, li)
final_data.append(_new_data)
if _new_replaced:
replaced = True
except Failed:
continue
elif "<<smart_label>>" in str(_data):
final_data = str(_data).replace("<<smart_label>>", _label)
replaced = True
else:
final_data = _data
return final_data, replaced
2022-04-21 18:24:56 +00:00
def check_time(message, end=False):
global previous_time
global start_time
current_time = time.time()
if end:
previous_time = start_time
if previous_time is None:
logger.debug(f"{message}: {current_time}")
start_time = current_time
else:
logger.debug(f"{message}: {current_time - previous_time}")
previous_time = None if end else current_time
2022-05-12 19:10:03 +00:00
system_fonts = []
2022-05-15 03:06:32 +00:00
def get_system_fonts():
global system_fonts
if not system_fonts:
dirs = []
if sys.platform == "win32":
windir = os.environ.get("WINDIR")
if windir:
dirs.append(os.path.join(windir, "fonts"))
elif sys.platform in ("linux", "linux2"):
lindirs = os.environ.get("XDG_DATA_DIRS", "")
if not lindirs:
lindirs = "/usr/share"
dirs += [os.path.join(lindir, "fonts") for lindir in lindirs.split(":")]
elif sys.platform == "darwin":
dirs += ["/Library/Fonts", "/System/Library/Fonts", os.path.expanduser("~/Library/Fonts")]
else:
return dirs
system_fonts = [n for d in dirs for _, _, ns in os.walk(d) for n in ns]
return system_fonts