Plex-Meta-Manager/modules/util.py

458 lines
19 KiB
Python
Raw Normal View History

2021-08-10 15:33:32 +00:00
import glob, logging, os, re, signal, sys, time, traceback
2021-08-06 23:02:33 +00:00
from datetime import datetime, timedelta
2021-08-10 15:33:32 +00:00
from logging.handlers import RotatingFileHandler
2021-05-19 21:30:20 +00:00
from pathvalidate import is_valid_filename, sanitize_filename
from plexapi.audio import Artist, Album, Track
2021-05-11 01:22:18 +00:00
from plexapi.exceptions import BadRequest, NotFound, Unauthorized
2021-12-17 14:24:46 +00:00
from plexapi.video import Season, Episode, Movie
2021-01-20 21:37:59 +00:00
try:
import msvcrt
windows = True
except ModuleNotFoundError:
windows = False
logger = logging.getLogger("Plex Meta Manager")
class TimeoutExpired(Exception):
pass
class Failed(Exception):
pass
class NotScheduled(Exception):
pass
2021-06-30 15:02:55 +00:00
class ImageData:
2021-06-12 15:29:17 +00:00
def __init__(self, attribute, location, prefix="", is_poster=True, is_url=True):
self.attribute = attribute
self.location = location
self.prefix = prefix
self.is_poster = is_poster
self.is_url = is_url
self.compare = location if is_url else os.stat(location).st_size
self.message = f"{prefix}{'poster' if is_poster else 'background'} to [{'URL' if is_url else 'File'}] {location}"
2021-10-04 17:51:32 +00:00
def __str__(self):
return str(self.__dict__)
2021-01-20 21:37:59 +00:00
def retry_if_not_failed(exception):
return not isinstance(exception, Failed)
2021-05-11 01:22:18 +00:00
def retry_if_not_plex(exception):
return not isinstance(exception, (BadRequest, NotFound, Unauthorized))
2021-02-24 06:42:58 +00:00
separating_character = "="
2021-01-20 21:37:59 +00:00
screen_width = 100
2021-05-26 13:25:32 +00:00
spacing = 0
2021-01-20 21:37:59 +00:00
days_alias = {
"monday": 0, "mon": 0, "m": 0,
"tuesday": 1, "tues": 1, "tue": 1, "tu": 1, "t": 1,
"wednesday": 2, "wed": 2, "w": 2,
"thursday": 3, "thurs": 3, "thur": 3, "thu": 3, "th": 3, "r": 3,
"friday": 4, "fri": 4, "f": 4,
"saturday": 5, "sat": 5, "s": 5,
"sunday": 6, "sun": 6, "su": 6, "u": 6
}
2021-08-14 22:59:35 +00:00
mod_displays = {
"": "is", ".not": "is not", ".is": "is", ".isnot": "is not", ".begins": "begins with", ".ends": "ends with", ".before": "is before", ".after": "is after",
2021-08-14 22:59:35 +00:00
".gt": "is greater than", ".gte": "is greater than or equal", ".lt": "is less than", ".lte": "is less than or equal"
}
2021-07-23 19:44:21 +00:00
pretty_days = {0: "Monday", 1: "Tuesday", 2: "Wednesday", 3: "Thursday", 4: "Friday", 5: "Saturday", 6: "Sunday"}
2021-01-20 21:37:59 +00:00
pretty_months = {
2021-07-23 19:44:21 +00:00
1: "January", 2: "February", 3: "March", 4: "April", 5: "May", 6: "June",
7: "July", 8: "August", 9: "September", 10: "October", 11: "November", 12: "December"
2021-01-20 21:37:59 +00:00
}
2021-08-14 22:59:35 +00:00
seasons = ["winter", "spring", "summer", "fall"]
2021-07-23 19:44:21 +00:00
pretty_ids = {"anidbid": "AniDB", "imdbid": "IMDb", "mal_id": "MyAnimeList", "themoviedb_id": "TMDb", "thetvdb_id": "TVDb", "tvdbid": "TVDb"}
2021-12-20 01:25:38 +00:00
collection_mode_options = {
"default": "default", "hide": "hide",
"hide_items": "hideItems", "hideitems": "hideItems",
"show_items": "showItems", "showitems": "showItems"
}
2021-01-20 21:37:59 +00:00
2021-02-24 06:44:06 +00:00
def tab_new_lines(data):
return str(data).replace("\n", "\n|\t ") if "\n" in str(data) else str(data)
2021-01-20 21:37:59 +00:00
def make_ordinal(n):
2021-07-26 20:29:28 +00:00
return f"{n}{'th' if 11 <= (n % 100) <= 13 else ['th', 'st', 'nd', 'rd', 'th'][min(n % 10, 4)]}"
2021-01-20 21:37:59 +00:00
2021-08-22 15:54:33 +00:00
def add_zero(number):
return str(number) if len(str(number)) > 1 else f"0{number}"
2021-08-04 14:20:52 +00:00
def add_dict_list(keys, value, dict_map):
for key in keys:
if key in dict_map:
dict_map[key].append(value)
else:
dict_map[key] = [value]
2021-05-08 23:49:55 +00:00
def get_list(data, lower=False, split=True, int_list=False):
if data is None: return None
elif isinstance(data, list): return data
2021-01-20 21:37:59 +00:00
elif isinstance(data, dict): return [data]
2021-02-17 06:10:50 +00:00
elif split is False: return [str(data)]
2021-02-16 04:54:47 +00:00
elif lower is True: return [d.strip().lower() for d in str(data).split(",")]
2021-11-28 08:18:12 +00:00
elif int_list is True:
try: return [int(d.strip()) for d in str(data).split(",")]
except ValueError: return []
2021-02-16 04:54:47 +00:00
else: return [d.strip() for d in str(data).split(",")]
2021-01-20 21:37:59 +00:00
def get_int_list(data, id_type):
int_values = []
2021-07-30 19:19:43 +00:00
for value in get_list(data):
2021-01-20 21:37:59 +00:00
try: int_values.append(regex_first_int(value, id_type))
except Failed as e: logger.error(e)
return int_values
2021-07-21 19:25:29 +00:00
def validate_date(date_text, method, return_as=None):
try: date_obg = datetime.strptime(str(date_text), "%Y-%m-%d" if "-" in str(date_text) else "%m/%d/%Y")
except ValueError: raise Failed(f"Collection Error: {method}: {date_text} must match pattern YYYY-MM-DD (e.g. 2020-12-25) or MM/DD/YYYY (e.g. 12/25/2020)")
return datetime.strftime(date_obg, return_as) if return_as else date_obg
2021-01-20 21:37:59 +00:00
def logger_input(prompt, timeout=60):
if windows: return windows_input(prompt, timeout)
elif hasattr(signal, "SIGALRM"): return unix_input(prompt, timeout)
else: raise SystemError("Input Timeout not supported on this system")
2021-07-14 14:47:20 +00:00
def header(language="en-US,en;q=0.5"):
2021-10-26 15:01:08 +00:00
return {"Accept-Language": "eng" if language == "default" else language, "User-Agent": "Mozilla/5.0 x64"}
2021-07-14 14:47:20 +00:00
2021-01-20 21:37:59 +00:00
def alarm_handler(signum, frame):
raise TimeoutExpired
def unix_input(prompt, timeout=60):
2021-02-24 06:44:06 +00:00
prompt = f"| {prompt}: "
2021-01-20 21:37:59 +00:00
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(timeout)
2021-07-04 04:13:06 +00:00
try: return input(prompt)
except EOFError: raise Failed("Input Failed")
finally: signal.alarm(0)
2021-01-20 21:37:59 +00:00
def windows_input(prompt, timeout=5):
2021-02-24 06:44:06 +00:00
sys.stdout.write(f"| {prompt}: ")
2021-01-20 21:37:59 +00:00
sys.stdout.flush()
result = []
start_time = time.time()
while True:
if msvcrt.kbhit():
2021-02-24 06:42:58 +00:00
char = msvcrt.getwche()
if ord(char) == 13: # enter_key
2021-01-20 21:37:59 +00:00
out = "".join(result)
print("")
2021-02-24 06:44:06 +00:00
logger.debug(f"{prompt}: {out}")
2021-01-20 21:37:59 +00:00
return out
2021-02-24 06:42:58 +00:00
elif ord(char) >= 32: #space_char
result.append(char)
2021-01-20 21:37:59 +00:00
if (time.time() - start_time) > timeout:
print("")
raise TimeoutExpired
def print_multiline(lines, info=False, warning=False, error=False, critical=False):
2021-02-21 17:01:10 +00:00
for i, line in enumerate(str(lines).split("\n")):
2021-01-20 21:37:59 +00:00
if critical: logger.critical(line)
elif error: logger.error(line)
elif warning: logger.warning(line)
elif info: logger.info(line)
else: logger.debug(line)
if i == 0:
logger.handlers[1].setFormatter(logging.Formatter(" " * 65 + "| %(message)s"))
logger.handlers[1].setFormatter(logging.Formatter("[%(asctime)s] %(filename)-27s %(levelname)-10s | %(message)s"))
def print_stacktrace():
print_multiline(traceback.format_exc())
def my_except_hook(exctype, value, tb):
for line in traceback.format_exception(etype=exctype, value=value, tb=tb):
print_multiline(line, critical=True)
def get_id_from_imdb_url(imdb_url):
match = re.search("(tt\\d+)", str(imdb_url))
if match: return match.group(1)
2021-02-24 06:44:06 +00:00
else: raise Failed(f"Regex Error: Failed to parse IMDb ID from IMDb URL: {imdb_url}")
2021-01-20 21:37:59 +00:00
def regex_first_int(data, id_type, default=None):
match = re.search("(\\d+)", str(data))
if match:
return int(match.group(1))
elif default:
2021-02-24 06:44:06 +00:00
logger.warning(f"Regex Warning: Failed to parse {id_type} from {data} using {default} as default")
2021-01-20 21:37:59 +00:00
return int(default)
else:
2021-02-24 06:44:06 +00:00
raise Failed(f"Regex Error: Failed to parse {id_type} from {data}")
2021-01-20 21:37:59 +00:00
2021-12-30 23:05:31 +00:00
def centered(text, sep=" ", side_space=True, left=False):
2021-01-20 21:37:59 +00:00
if len(text) > screen_width - 2:
2021-06-11 14:26:11 +00:00
return text
2021-01-20 21:37:59 +00:00
space = screen_width - len(text) - 2
2021-12-30 23:05:31 +00:00
text = f"{' ' if side_space else sep}{text}{' ' if side_space else sep}"
2021-01-20 21:37:59 +00:00
if space % 2 == 1:
2021-05-24 03:38:46 +00:00
text += sep
2021-01-20 21:37:59 +00:00
space -= 1
2021-05-24 03:38:46 +00:00
side = int(space / 2) - 1
2021-12-30 23:05:31 +00:00
final_text = f"{text}{sep * side}{sep * side}" if left else f"{sep * side}{text}{sep * side}"
2021-03-21 23:00:37 +00:00
return final_text
2021-01-20 21:37:59 +00:00
2021-12-30 23:05:31 +00:00
def separator(text=None, space=True, border=True, debug=False, side_space=True, left=False):
2021-05-24 03:38:46 +00:00
sep = " " if space else separating_character
2021-05-19 17:12:34 +00:00
for handler in logger.handlers:
apply_formatter(handler, border=False)
2021-05-24 03:38:46 +00:00
border_text = f"|{separating_character * screen_width}|"
if border and debug:
logger.debug(border_text)
elif border:
logger.info(border_text)
2021-01-20 21:37:59 +00:00
if text:
2021-02-28 03:56:49 +00:00
text_list = text.split("\n")
for t in text_list:
if debug:
2021-12-30 23:05:31 +00:00
logger.debug(f"|{sep}{centered(t, sep=sep, side_space=side_space, left=left)}{sep}|")
else:
2021-12-30 23:05:31 +00:00
logger.info(f"|{sep}{centered(t, sep=sep, side_space=side_space, left=left)}{sep}|")
2021-05-24 03:38:46 +00:00
if border and debug:
logger.debug(border_text)
elif border:
logger.info(border_text)
2021-05-19 17:12:34 +00:00
for handler in logger.handlers:
apply_formatter(handler)
def apply_formatter(handler, border=True):
text = f"| %(message)-{screen_width - 2}s |" if border else f"%(message)-{screen_width - 2}s"
2021-08-10 15:33:32 +00:00
if isinstance(handler, RotatingFileHandler):
2021-05-19 17:12:34 +00:00
text = f"[%(asctime)s] %(filename)-27s %(levelname)-10s {text}"
handler.setFormatter(logging.Formatter(text))
2021-01-20 21:37:59 +00:00
2021-05-26 13:25:32 +00:00
def adjust_space(display_title):
display_title = str(display_title)
space_length = spacing - len(display_title)
if space_length > 0:
display_title += " " * space_length
return display_title
def print_return(text):
print(adjust_space(f"| {text}"), end="\r")
global spacing
spacing = len(text) + 2
2021-01-20 21:37:59 +00:00
2021-05-26 13:25:32 +00:00
def print_end():
print(adjust_space(" "), end="\r")
global spacing
spacing = 0
2021-05-19 21:30:20 +00:00
def validate_filename(filename):
if is_valid_filename(filename):
2021-05-20 20:38:48 +00:00
return filename, None
2021-05-19 21:30:20 +00:00
else:
mapping_name = sanitize_filename(filename)
2021-05-20 20:38:48 +00:00
return mapping_name, f"Log Folder Name: {filename} is invalid using {mapping_name}"
2021-07-06 15:46:29 +00:00
2021-12-17 14:24:46 +00:00
def item_title(item):
if isinstance(item, Season):
if f"Season {item.index}" == item.title:
return f"{item.parentTitle} {item.title}"
else:
return f"{item.parentTitle} Season {item.index}: {item.title}"
elif isinstance(item, Episode):
text = f"{item.grandparentTitle} S{add_zero(item.parentIndex)}E{add_zero(item.index)}"
if f"Season {item.parentIndex}" == item.parentTitle:
return f"{text}: {item.title}"
else:
return f"{text}: {item.parentTitle}: {item.title}"
elif isinstance(item, Movie) and item.year:
return f"{item.title} ({item.year})"
elif isinstance(item, Album):
return f"{item.parentTitle}: {item.title}"
elif isinstance(item, Track):
return f"{item.grandparentTitle}: {item.parentTitle}: {item.title}"
2021-12-17 14:24:46 +00:00
else:
return item.title
def item_set(item, item_id):
return {"title": item_title(item), "tmdb" if isinstance(item, Movie) else "tvdb": item_id}
2021-07-06 15:46:29 +00:00
def is_locked(filepath):
locked = None
file_object = None
if os.path.exists(filepath):
try:
file_object = open(filepath, 'a', 8)
if file_object:
locked = False
2021-07-30 19:19:43 +00:00
except IOError:
2021-07-06 15:46:29 +00:00
locked = True
finally:
if file_object:
file_object.close()
return locked
2021-07-22 21:00:45 +00:00
2021-12-18 03:02:24 +00:00
def time_window(tw):
2021-11-27 00:30:41 +00:00
today = datetime.now()
2021-12-18 03:02:24 +00:00
if tw == "today":
2021-11-27 00:30:41 +00:00
return f"{today:%Y-%m-%d}"
2021-12-18 03:02:24 +00:00
elif tw == "yesterday":
2021-11-27 00:30:41 +00:00
return f"{today - timedelta(days=1):%Y-%m-%d}"
2021-12-18 03:02:24 +00:00
elif tw == "this_week":
2021-11-27 00:30:41 +00:00
return f"{today:%Y-0%V}"
2021-12-18 03:02:24 +00:00
elif tw == "last_week":
2021-11-27 00:30:41 +00:00
return f"{today - timedelta(weeks=1):%Y-0%V}"
2021-12-18 03:02:24 +00:00
elif tw == "this_month":
2021-11-27 00:30:41 +00:00
return f"{today:%Y-%m}"
2021-12-18 03:02:24 +00:00
elif tw == "last_month":
2021-11-27 00:30:41 +00:00
return f"{today.year}-{today.month - 1 or 12}"
2021-12-18 03:02:24 +00:00
elif tw == "this_year":
2021-11-27 00:30:41 +00:00
return f"{today.year}"
2021-12-18 03:02:24 +00:00
elif tw == "last_year":
2021-11-27 00:30:41 +00:00
return f"{today.year - 1}"
else:
2021-12-18 03:02:24 +00:00
return tw
2021-11-27 00:30:41 +00:00
2021-08-10 15:33:32 +00:00
def glob_filter(filter_in):
filter_in = filter_in.translate({ord("["): "[[]", ord("]"): "[]]"}) if "[" in filter_in else filter_in
2021-12-13 07:30:19 +00:00
return glob.glob(filter_in)
2021-08-10 15:33:32 +00:00
2021-08-07 06:01:21 +00:00
def is_date_filter(value, modifier, data, final, current_time):
if value is None:
return True
2021-08-06 23:02:33 +00:00
if modifier in ["", ".not"]:
threshold_date = current_time - timedelta(days=data)
2021-08-07 06:01:21 +00:00
if (modifier == "" and (value is None or value < threshold_date)) \
or (modifier == ".not" and value and value >= threshold_date):
return True
2021-08-06 23:02:33 +00:00
elif modifier in [".before", ".after"]:
filter_date = validate_date(data, final)
2021-08-07 06:01:21 +00:00
if (modifier == ".before" and value >= filter_date) or (modifier == ".after" and value <= filter_date):
return True
2021-08-06 23:02:33 +00:00
elif modifier == ".regex":
2021-08-07 06:01:21 +00:00
jailbreak = True
2021-08-06 23:02:33 +00:00
for check_data in data:
2021-08-07 06:01:21 +00:00
if re.compile(check_data).match(value.strftime("%m/%d/%Y")):
2021-08-06 23:02:33 +00:00
jailbreak = True
break
if not jailbreak:
2021-08-07 06:01:21 +00:00
return True
return False
def is_number_filter(value, modifier, data):
return value is None or (modifier == ".gt" and value <= data) \
or (modifier == ".gte" and value < data) \
or (modifier == ".lt" and value >= data) \
or (modifier == ".lte" and value > data)
2021-12-10 16:17:50 +00:00
def is_boolean_filter(value, data):
return (data and not value) or (not data and value)
2021-08-07 06:01:21 +00:00
def is_string_filter(values, modifier, data):
jailbreak = False
for value in values:
for check_value in data:
if (modifier in ["", ".not"] and check_value.lower() in value.lower()) \
or (modifier in [".is", ".isnot"] and value.lower() == check_value.lower()) \
2021-08-07 06:01:21 +00:00
or (modifier == ".begins" and value.lower().startswith(check_value.lower())) \
or (modifier == ".ends" and value.lower().endswith(check_value.lower())) \
or (modifier == ".regex" and re.compile(check_value).match(value)):
jailbreak = True
break
if jailbreak: break
return (jailbreak and modifier in [".not", ".isnot"]) or (not jailbreak and modifier in ["", ".is", ".begins", ".ends", ".regex"])
2021-08-06 23:02:33 +00:00
def check_collection_mode(collection_mode):
2021-12-20 01:25:38 +00:00
if collection_mode and str(collection_mode).lower() in collection_mode_options:
return collection_mode_options[str(collection_mode).lower()]
else:
raise Failed(f"Config Error: {collection_mode} collection_mode invalid\n\tdefault (Library default)\n\thide (Hide Collection)\n\thide_items (Hide Items in this Collection)\n\tshow_items (Show this Collection and its Items)")
def check_day(_m, _d):
if _m in [1, 3, 5, 7, 8, 10, 12] and _d > 31:
return _m, 31
elif _m in [4, 6, 9, 11] and _d > 30:
return _m, 30
elif _m == 2 and _d > 28:
return _m, 28
else:
return _m, _d
2021-12-19 05:41:58 +00:00
def schedule_check(attribute, data, current_time, run_hour):
2021-12-18 03:02:24 +00:00
skip_collection = True
schedule_list = get_list(data)
next_month = current_time.replace(day=28) + timedelta(days=4)
last_day = next_month - timedelta(days=next_month.day)
schedule_str = ""
for schedule in schedule_list:
run_time = str(schedule).lower()
if run_time.startswith(("day", "daily")):
skip_collection = False
elif run_time == "never":
schedule_str += f"\nNever scheduled to run"
elif run_time.startswith(("hour", "week", "month", "year", "range")):
match = re.search("\\(([^)]+)\\)", run_time)
if not match:
2021-12-19 05:41:58 +00:00
logger.error(f"Schedule Error: failed to parse {attribute}: {schedule}")
2021-12-18 03:02:24 +00:00
continue
param = match.group(1)
if run_time.startswith("hour"):
try:
if 0 <= int(param) <= 23:
schedule_str += f"\nScheduled to run only on the {make_ordinal(int(param))} hour"
if run_hour == int(param):
skip_collection = False
else:
raise ValueError
except ValueError:
2021-12-19 05:41:58 +00:00
logger.error(f"Schedule Error: hourly {attribute} attribute {schedule} invalid must be an integer between 0 and 23")
2021-12-18 03:02:24 +00:00
elif run_time.startswith("week"):
if param.lower() not in days_alias:
2021-12-19 05:41:58 +00:00
logger.error(f"Schedule Error: weekly {attribute} attribute {schedule} invalid must be a day of the week i.e. weekly(Monday)")
2021-12-18 03:02:24 +00:00
continue
weekday = days_alias[param.lower()]
schedule_str += f"\nScheduled weekly on {pretty_days[weekday]}"
if weekday == current_time.weekday():
skip_collection = False
elif run_time.startswith("month"):
try:
if 1 <= int(param) <= 31:
schedule_str += f"\nScheduled monthly on the {make_ordinal(int(param))}"
if current_time.day == int(param) or (
current_time.day == last_day.day and int(param) > last_day.day):
skip_collection = False
else:
raise ValueError
except ValueError:
2021-12-19 05:41:58 +00:00
logger.error(f"Schedule Error: monthly {attribute} attribute {schedule} invalid must be an integer between 1 and 31")
2021-12-18 03:02:24 +00:00
elif run_time.startswith("year"):
try:
if "/" in param:
opt = param.split("/")
month = int(opt[0])
day = int(opt[1])
schedule_str += f"\nScheduled yearly on {pretty_months[month]} {make_ordinal(day)}"
if current_time.month == month and (current_time.day == day or (
current_time.day == last_day.day and day > last_day.day)):
skip_collection = False
else:
raise ValueError
except ValueError:
logger.error(
2021-12-19 05:41:58 +00:00
f"Schedule Error: yearly {attribute} attribute {schedule} invalid must be in the MM/DD format i.e. yearly(11/22)")
2021-12-18 03:02:24 +00:00
elif run_time.startswith("range"):
match = re.match("^(1[0-2]|0?[1-9])/(3[01]|[12][0-9]|0?[1-9])-(1[0-2]|0?[1-9])/(3[01]|[12][0-9]|0?[1-9])$", param)
if not match:
2021-12-19 05:41:58 +00:00
logger.error(f"Schedule Error: range {attribute} attribute {schedule} invalid must be in the MM/DD-MM/DD format i.e. range(12/01-12/25)")
2021-12-18 03:02:24 +00:00
continue
month_start, day_start = check_day(int(match.group(1)), int(match.group(2)))
month_end, day_end = check_day(int(match.group(3)), int(match.group(4)))
month_check, day_check = check_day(current_time.month, current_time.day)
check = datetime.strptime(f"{month_check}/{day_check}", "%m/%d")
start = datetime.strptime(f"{month_start}/{day_start}", "%m/%d")
end = datetime.strptime(f"{month_end}/{day_end}", "%m/%d")
schedule_str += f"\nScheduled between {pretty_months[month_start]} {make_ordinal(day_start)} and {pretty_months[month_end]} {make_ordinal(day_end)}"
if start <= check <= end if start < end else (check <= end or check >= start):
2021-12-18 03:02:24 +00:00
skip_collection = False
else:
2021-12-19 05:41:58 +00:00
logger.error(f"Schedule Error: {attribute} attribute {schedule} invalid")
2021-12-18 03:02:24 +00:00
if len(schedule_str) == 0:
skip_collection = False
if skip_collection:
raise NotScheduled(schedule_str)