#341 added validate_builders

This commit is contained in:
meisnate12 2021-07-30 15:19:43 -04:00
parent 711ae08372
commit 91e525e43d
4 changed files with 163 additions and 204 deletions

View file

@ -73,7 +73,7 @@ poster_details = ["url_poster", "tmdb_poster", "tmdb_profile", "tvdb_poster", "f
background_details = ["url_background", "tmdb_background", "tvdb_background", "file_background"]
boolean_details = ["visible_library", "visible_home", "visible_shared", "show_filtered", "show_missing", "save_missing", "item_assets"]
string_details = ["sort_title", "content_rating", "name_mapping"]
ignored_details = ["smart_filter", "smart_label", "smart_url", "run_again", "schedule", "sync_mode", "template", "test", "tmdb_person", "build_collection", "collection_order"]
ignored_details = ["smart_filter", "smart_label", "smart_url", "run_again", "schedule", "sync_mode", "template", "test", "tmdb_person", "build_collection", "collection_order", "validate_builders"]
details = ["collection_mode", "collection_order", "label"] + boolean_details + string_details
collectionless_details = ["collection_order", "plex_collectionless", "label", "label_sync_mode", "test"] + \
poster_details + background_details + summary_details + string_details
@ -350,18 +350,28 @@ class CollectionBuilder:
if skip_collection:
raise Failed(f"{self.schedule}\n\nCollection {self.name} not scheduled to run")
self.run_again = "run_again" in methods
self.collectionless = "plex_collectionless" in methods
self.validate_builders = True
if "validate_builders" in methods:
logger.info("")
logger.info("Validating Method: validate_builders")
logger.info(f"Value: {data[methods['validate_builders']]}")
self.validate_builders = util.parse("validate_builders", self.data, datatype="bool", methods=methods, default=True)
self.run_again = False
if "run_again" in methods:
logger.info("")
logger.info("Validating Method: run_again")
if not self.data[methods["run_again"]]:
logger.warning(f"Collection Warning: run_again attribute is blank defaulting to false")
else:
logger.debug(f"Value: {self.data[methods['run_again']]}")
self.run_again = util.parse_bool("run_again", self.data[methods["run_again"]])
logger.info(f"Value: {data[methods['run_again']]}")
self.run_again = util.parse("run_again", self.data, datatype="bool", methods=methods, default=False)
self.build_collection = True
if "build_collection" in methods:
logger.info("")
logger.info("Validating Method: build_collection")
logger.info(f"Value: {data[methods['build_collection']]}")
self.build_collection = util.parse("build_collection", self.data, datatype="bool", methods=methods, default=True)
self.sync = self.library.sync_mode == "sync"
if "sync_mode" in methods:
@ -376,16 +386,6 @@ class CollectionBuilder:
else:
self.sync = self.data[methods["sync_mode"]].lower() == "sync"
self.build_collection = True
if "build_collection" in methods:
logger.info("")
logger.info("Validating Method: build_collection")
if self.data[methods["build_collection"]] is None:
logger.warning(f"Collection Warning: build_collection attribute is blank defaulting to true")
else:
logger.debug(f"Value: {self.data[methods['build_collection']]}")
self.build_collection = util.parse_bool("build_collection", self.data[methods["build_collection"]])
self.custom_sort = False
if "collection_order" in methods:
logger.info("")
@ -477,6 +477,7 @@ class CollectionBuilder:
logger.debug("")
logger.debug(f"Validating Method: {method_key}")
logger.debug(f"Value: {method_data}")
try:
if method_data is None and method_name in all_builders + plex.searches: raise Failed(f"Collection Error: {method_final} attribute is blank")
elif method_data is None: logger.warning(f"Collection Warning: {method_final} attribute is blank")
elif not self.config.Trakt and "trakt" in method_name: raise Failed(f"Collection Error: {method_final} requires Trakt to be configured")
@ -511,6 +512,11 @@ class CollectionBuilder:
elif method_name in tvdb.builders: self._tvdb(method_name, method_data)
elif method_name == "filters": self._filters(method_name, method_data)
else: raise Failed(f"Collection Error: {method_final} attribute not supported")
except Failed as e:
if self.validate_builders:
raise
else:
logger.error(e)
if self.custom_sort and len(self.builders) > 1:
raise Failed("Collection Error: collection_order: custom can only be used with a single builder per collection")
@ -622,7 +628,7 @@ class CollectionBuilder:
else:
self.details[method_final] = util.get_list(method_data)
elif method_name in boolean_details:
self.details[method_name] = util.parse_bool(method_name, method_data)
self.details[method_name] = util.parse(method_name, method_data, datatype="bool")
elif method_name in string_details:
self.details[method_name] = str(method_data)
@ -664,11 +670,11 @@ class CollectionBuilder:
def _radarr(self, method_name, method_data):
if method_name == "radarr_add":
self.add_to_radarr = util.parse_bool(method_name, method_data)
self.add_to_radarr = util.parse(method_name, method_data, datatype="bool")
elif method_name == "radarr_folder":
self.radarr_options["folder"] = method_data
elif method_name in ["radarr_monitor", "radarr_search"]:
self.radarr_options[method_name[7:]] = util.parse_bool(method_name, method_data)
self.radarr_options[method_name[7:]] = util.parse(method_name, method_data, datatype="bool")
elif method_name == "radarr_availability":
if str(method_data).lower() in radarr.availability_translation:
self.radarr_options["availability"] = str(method_data).lower()
@ -682,7 +688,7 @@ class CollectionBuilder:
def _sonarr(self, method_name, method_data):
if method_name == "sonarr_add":
self.add_to_sonarr = util.parse_bool(method_name, method_data)
self.add_to_sonarr = util.parse(method_name, method_data, datatype="bool")
elif method_name == "sonarr_folder":
self.sonarr_options["folder"] = method_data
elif method_name == "sonarr_monitor":
@ -702,18 +708,18 @@ class CollectionBuilder:
else:
raise Failed(f"Collection Error: {method_name} attribute must be either standard, daily, or anime")
elif method_name in ["sonarr_season", "sonarr_search", "sonarr_cutoff_search"]:
self.sonarr_options[method_name[7:]] = util.parse_bool(method_name, method_data)
self.sonarr_options[method_name[7:]] = util.parse(method_name, method_data, datatype="bool")
elif method_name == "sonarr_tag":
self.sonarr_options["tag"] = util.get_list(method_data)
def _anidb(self, method_name, method_data):
if method_name == "anidb_popular":
self.builders.append((method_name, util.parse_int(method_name, method_data, default=30, maximum=30)))
self.builders.append((method_name, util.parse(method_name, method_data, datatype="int", default=30, maximum=30)))
elif method_name in ["anidb_id", "anidb_relation"]:
for anidb_id in self.config.AniDB.validate_anidb_ids(method_data, self.language):
self.builders.append((method_name, anidb_id))
elif method_name == "anidb_tag":
for dict_data, dict_methods in util.validate_dict_list(method_name, method_data):
for dict_data, dict_methods in util.parse(method_name, method_data, datatype="dictlist"):
new_dictionary = {}
if "tag" not in dict_methods:
raise Failed("Collection Error: anidb_tag tag attribute is required")
@ -721,7 +727,7 @@ class CollectionBuilder:
raise Failed("Collection Error: anidb_tag tag attribute is blank")
else:
new_dictionary["tag"] = util.regex_first_int(dict_data[dict_methods["username"]], "AniDB Tag ID")
new_dictionary["limit"] = util.parse_int_from_dict(method_name, "limit", dict_data, dict_methods, 0, minimum=0)
new_dictionary["limit"] = util.parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name, minimum=0)
self.builders.append((method_name, new_dictionary))
def _anilist(self, method_name, method_data):
@ -729,23 +735,23 @@ class CollectionBuilder:
for anilist_id in self.config.AniList.validate_anilist_ids(method_data, studio=method_name == "anilist_studio"):
self.builders.append((method_name, anilist_id))
elif method_name in ["anilist_popular", "anilist_top_rated"]:
self.builders.append((method_name, util.parse_int(method_name, method_data)))
self.builders.append((method_name, util.parse(method_name, method_data, datatype="int", default=10)))
elif method_name in ["anilist_season", "anilist_genre", "anilist_tag"]:
for dict_data, dict_methods in util.validate_dict_list(method_name, method_data):
for dict_data, dict_methods in util.parse(method_name, method_data, datatype="dictlist"):
new_dictionary = {}
if method_name == "anilist_season":
if self.current_time.month in [12, 1, 2]: new_dictionary["season"] = "winter"
elif self.current_time.month in [3, 4, 5]: new_dictionary["season"] = "spring"
elif self.current_time.month in [6, 7, 8]: new_dictionary["season"] = "summer"
elif self.current_time.month in [9, 10, 11]: new_dictionary["season"] = "fall"
new_dictionary["season"] = util.parse_from_dict(method_name, "season", dict_data, dict_methods, default=new_dictionary["season"], options=["winter", "spring", "summer", "fall"])
new_dictionary["year"] = util.parse_int_from_dict(method_name, "year", dict_data, dict_methods, self.current_time.year, minimum=1917, maximum=self.current_time.year + 1)
new_dictionary["season"] = util.parse("season", dict_data, methods=dict_methods, parent=method_name, default=new_dictionary["season"], options=["winter", "spring", "summer", "fall"])
new_dictionary["year"] = util.parse("year", dict_data, datatype="int", methods=dict_methods, default=self.current_time.year, parent=method_name, minimum=1917, maximum=self.current_time.year + 1)
elif method_name == "anilist_genre":
new_dictionary["genre"] = self.config.AniList.validate_genre(util.parse_from_dict(method_name, "genre", dict_data, dict_methods))
new_dictionary["genre"] = self.config.AniList.validate_genre(util.parse("genre", dict_data, methods=dict_methods, parent=method_name))
elif method_name == "anilist_tag":
new_dictionary["tag"] = self.config.AniList.validate_tag(util.parse_from_dict(method_name, "tag", dict_data, dict_methods))
new_dictionary["sort_by"] = util.parse_from_dict(method_name, "sort_by", dict_data, dict_methods, default="score", options=["score", "popular"])
new_dictionary["limit"] = util.parse_int_from_dict(method_name, "limit", dict_data, dict_methods, 0, maximum=500)
new_dictionary["tag"] = self.config.AniList.validate_tag(util.parse("tag", dict_data, methods=dict_methods, parent=method_name))
new_dictionary["sort_by"] = util.parse("sort_by", dict_data, methods=dict_methods, parent=method_name, default="score", options=["score", "popular"])
new_dictionary["limit"] = util.parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name, maximum=500)
self.builders.append((method_name, new_dictionary))
def _icheckmovies(self, method_name, method_data):
@ -780,37 +786,37 @@ class CollectionBuilder:
for mal_id in util.get_int_list(method_data, "MyAnimeList ID"):
self.builders.append((method_name, mal_id))
elif method_name in ["mal_all", "mal_airing", "mal_upcoming", "mal_tv", "mal_ova", "mal_movie", "mal_special", "mal_popular", "mal_favorite", "mal_suggested"]:
self.builders.append((method_name, util.parse_int(method_name, method_data)))
self.builders.append((method_name, util.parse(method_name, method_data, datatype="int", default=10)))
elif method_name in ["mal_season", "mal_userlist"]:
for dict_data, dict_methods in util.validate_dict_list(method_name, method_data):
for dict_data, dict_methods in util.parse(method_name, method_data, datatype="dictlist"):
new_dictionary = {}
if method_name == "mal_season":
if self.current_time.month in [1, 2, 3]: new_dictionary["season"] = "winter"
elif self.current_time.month in [4, 5, 6]: new_dictionary["season"] = "spring"
elif self.current_time.month in [7, 8, 9]: new_dictionary["season"] = "summer"
elif self.current_time.month in [10, 11, 12]: new_dictionary["season"] = "fall"
new_dictionary["season"] = util.parse_from_dict(method_name, "season", dict_data, dict_methods, default=new_dictionary["season"], options=["winter", "spring", "summer", "fall"])
new_dictionary["sort_by"] = util.parse_from_dict(method_name, "sort_by", dict_data, dict_methods, default="members", options=mal.season_sort_options, translation=mal.season_sort_translation)
new_dictionary["year"] = util.parse_int_from_dict(method_name, "year", dict_data, dict_methods, self.current_time.year, minimum=1917, maximum=self.current_time.year + 1)
new_dictionary["limit"] = util.parse_int_from_dict(method_name, "limit", dict_data, dict_methods, 100, maximum=500)
new_dictionary["season"] = util.parse("season", dict_data, methods=dict_methods, parent=method_name, default=new_dictionary["season"], options=["winter", "spring", "summer", "fall"])
new_dictionary["sort_by"] = util.parse("sort_by", dict_data, methods=dict_methods, parent=method_name, default="members", options=mal.season_sort_options, translation=mal.season_sort_translation)
new_dictionary["year"] = util.parse("year", dict_data, datatype="int", methods=dict_methods, default=self.current_time.year, parent=method_name, minimum=1917, maximum=self.current_time.year + 1)
new_dictionary["limit"] = util.parse("limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name, maximum=500)
elif method_name == "mal_userlist":
new_dictionary["username"] = util.parse_from_dict(method_name, "username", dict_data, dict_methods)
new_dictionary["status"] = util.parse_from_dict(method_name, "status", dict_data, dict_methods, default="all", options=mal.userlist_status)
new_dictionary["sort_by"] = util.parse_from_dict(method_name, "sort_by", dict_data, dict_methods, default="score", options=mal.userlist_sort_options, translation=mal.userlist_sort_translation)
new_dictionary["limit"] = util.parse_int_from_dict(method_name, "limit", dict_data, dict_methods, 100, maximum=1000)
new_dictionary["username"] = util.parse("username", dict_data, methods=dict_methods, parent=method_name)
new_dictionary["status"] = util.parse("status", dict_data, methods=dict_methods, parent=method_name, default="all", options=mal.userlist_status)
new_dictionary["sort_by"] = util.parse("sort_by", dict_data, methods=dict_methods, parent=method_name, default="score", options=mal.userlist_sort_options, translation=mal.userlist_sort_translation)
new_dictionary["limit"] = util.parse("limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name, maximum=1000)
self.builders.append((method_name, new_dictionary))
def _plex(self, method_name, method_data):
if method_name == "plex_all":
self.builders.append((method_name, True))
elif method_name in ["plex_search", "plex_collectionless"]:
for dict_data, dict_methods in util.validate_dict_list(method_name, method_data):
for dict_data, dict_methods in util.parse(method_name, method_data, datatype="dictlist"):
new_dictionary = {}
if method_name == "plex_search":
new_dictionary = self.build_filter("plex_search", dict_data)
elif method_name == "plex_collectionless":
prefix_list = util.parse_list("exclude_prefix", dict_data, dict_methods)
exact_list = util.parse_list("exclude", dict_data, dict_methods)
prefix_list = util.parse("exclude_prefix", dict_data, datatype="list", methods=dict_methods)
exact_list = util.parse("exclude", dict_data, datatype="list", methods=dict_methods)
if len(prefix_list) == 0 and len(exact_list) == 0:
raise Failed("Collection Error: you must have at least one exclusion")
exact_list.append(self.name)
@ -821,17 +827,17 @@ class CollectionBuilder:
self.builders.append(("plex_search", self.build_filter("plex_search", {"any": {method_name: method_data}})))
def _tautulli(self, method_name, method_data):
for dict_data, dict_methods in util.validate_dict_list(method_name, method_data):
for dict_data, dict_methods in util.parse(method_name, method_data, datatype="dictlist"):
self.builders.append((method_name, {
"list_type": "popular" if method_name == "tautulli_popular" else "watched",
"list_days": util.parse_int_from_dict(method_name, "list_days", dict_data, dict_methods, 30),
"list_size": util.parse_int_from_dict(method_name, "list_size", dict_data, dict_methods, 10),
"list_buffer": util.parse_int_from_dict(method_name, "list_buffer", dict_data, dict_methods, 20)
"list_days": util.parse("list_days", dict_data, datatype="int", methods=dict_methods, default=30, parent=method_name),
"list_size": util.parse("list_size", dict_data, datatype="int", methods=dict_methods, default=10, parent=method_name),
"list_buffer": util.parse("list_buffer", dict_data, datatype="int", methods=dict_methods, default=20, parent=method_name)
}))
def _tmdb(self, method_name, method_data):
if method_name == "tmdb_discover":
for dict_data, dict_methods in util.validate_dict_list(method_name, method_data):
for dict_data, dict_methods in util.parse(method_name, method_data, datatype="dictlist"):
new_dictionary = {"limit": 100}
for discover_name, discover_data in dict_data.items():
discover_final = discover_name.lower()
@ -868,9 +874,9 @@ class CollectionBuilder:
elif discover_final in tmdb.discover_dates:
new_dictionary[discover_final] = util.validate_date(discover_data, f"{method_name} attribute {discover_final}", return_as="%m/%d/%Y")
elif discover_final in ["primary_release_year", "year", "first_air_date_year"]:
new_dictionary[discover_final] = util.check_number(discover_data, f"{method_name} attribute {discover_final}", minimum=1800, maximum=self.current_year + 1)
new_dictionary[discover_final] = util.parse(discover_final, discover_data, datatype="int", parent=method_name, minimum=1800, maximum=self.current_year + 1)
elif discover_final in ["vote_count.gte", "vote_count.lte", "vote_average.gte", "vote_average.lte", "with_runtime.gte", "with_runtime.lte"]:
new_dictionary[discover_final] = util.check_number(discover_data, f"{method_name} attribute {discover_final}", minimum=1)
new_dictionary[discover_final] = util.parse(discover_final, discover_data, datatype="int", parent=method_name)
elif discover_final in ["with_cast", "with_crew", "with_people", "with_companies", "with_networks", "with_genres", "without_genres", "with_keywords", "without_keywords", "with_original_language", "timezone"]:
new_dictionary[discover_final] = discover_data
else:
@ -889,7 +895,7 @@ class CollectionBuilder:
else:
raise Failed(f"Collection Error: {method_name} had no valid fields")
elif method_name in ["tmdb_popular", "tmdb_top_rated", "tmdb_now_playing", "tmdb_trending_daily", "tmdb_trending_weekly"]:
self.builders.append((method_name, util.parse_int(method_name, method_data)))
self.builders.append((method_name, util.parse(method_name, method_data, datatype="int", default=10)))
else:
values = self.config.TMDb.validate_tmdb_ids(method_data, method_name)
if method_name.endswith("_details"):
@ -922,7 +928,7 @@ class CollectionBuilder:
if method_name.endswith("_details"):
self.summaries[method_name] = self.config.Trakt.list_description(trakt_lists[0])
elif method_name in ["trakt_trending", "trakt_popular", "trakt_recommended", "trakt_watched", "trakt_collected"]:
self.builders.append((method_name, util.parse_int(method_name, method_data)))
self.builders.append((method_name, util.parse(method_name, method_data, datatype="int", default=10)))
elif method_name in ["trakt_watchlist", "trakt_collection"]:
for trakt_list in self.config.Trakt.validate_trakt(method_data, self.library.is_movie, trakt_type=method_name[6:]):
self.builders.append((method_name, trakt_list))
@ -944,7 +950,7 @@ class CollectionBuilder:
self.builders.append((method_name[:-8] if method_name.endswith("_details") else method_name, value))
def _filters(self, method_name, method_data):
for dict_data, dict_methods in util.validate_dict_list(method_name, method_data):
for dict_data, dict_methods in util.parse(method_name, method_data, datatype="dictlist"):
validate = True
if "validate" in dict_data:
if dict_data["validate"] is None:
@ -1215,7 +1221,7 @@ class CollectionBuilder:
return util.get_list(data)
elif attribute == "history":
try:
return util.check_number(data, final, minimum=1, maximum=30)
return util.parse(final, data, datatype="int", maximum=30)
except Failed:
if str(data).lower() in ["day", "month"]:
return data.lower()
@ -1247,17 +1253,21 @@ class CollectionBuilder:
logger.error(error)
return valid_list
elif attribute in ["year", "episode_year"] and modifier in [".gt", ".gte", ".lt", ".lte"]:
return util.check_year(data, self.current_year, final)
return util.parse(final, data, datatype="int", minimum=1800, maximum=self.current_year)
elif attribute in plex.date_attributes and modifier in [".before", ".after"]:
return util.validate_date(data, final, return_as="%Y-%m-%d")
elif attribute in plex.number_attributes and modifier in ["", ".not", ".gt", ".gte", ".lt", ".lte"]:
return util.check_number(data, final, minimum=1)
return util.parse(final, data, datatype="int")
elif attribute in plex.float_attributes and modifier in [".gt", ".gte", ".lt", ".lte"]:
return util.check_number(data, final, number_type="float", minimum=0, maximum=10)
return util.parse(final, data, datatype="float", minimum=0, maximum=10)
elif attribute in ["decade", "year", "episode_year"] and modifier in ["", ".not"]:
return smart_pair(util.get_year_list(data, self.current_year, final))
final_years = []
values = util.get_list(data)
for value in values:
final_years.append(util.parse(final, value, datatype="int", minimum=1800, maximum=self.current_year))
return smart_pair(final_years)
elif attribute in plex.boolean_attributes:
return util.parse_bool(attribute, data)
return util.parse(attribute, data, datatype="bool")
else:
raise Failed(f"Collection Error: {final} attribute not supported")
@ -1796,7 +1806,10 @@ class CollectionBuilder:
items = self.library.get_collection_items(self.obj, self.smart_label_collection)
keys = {item.ratingKey: item for item in items}
previous = None
logger.debug(keys)
logger.debug(self.rating_keys)
for ki, key in enumerate(self.rating_keys):
logger.debug(items)
if key != items[ki].ratingKey:
logger.info(f"Moving {keys[key].title} {'after {}'.format(keys[previous].title) if previous else 'to the beginning'}")
self.library.moveItem(self.obj, key, after=previous)

View file

@ -30,11 +30,11 @@ class IMDb:
if not isinstance(imdb_dict, dict):
imdb_dict = {"url": imdb_dict}
dict_methods = {dm.lower(): dm for dm in imdb_dict}
imdb_url = util.parse_from_dict("imdb_list", "url", imdb_dict, dict_methods).strip()
imdb_url = util.parse("url", imdb_dict, methods=dict_methods, parent="imdb_list").strip()
if not imdb_url.startswith((urls["list"], urls["search"], urls["keyword"])):
raise Failed(f"IMDb Error: {imdb_url} must begin with either:\n{urls['list']} (For Lists)\n{urls['search']} (For Searches)\n{urls['keyword']} (For Keyword Searches)")
self._total(imdb_url, language)
list_count = util.parse_int_from_dict("imdb_list", "limit", imdb_dict, dict_methods, 0, minimum=0) if "limit" in dict_methods else 0
list_count = util.parse("limit", imdb_dict, datatype="int", methods=dict_methods, default=0, parent="imdb_list", minimum=0) if "limit" in dict_methods else 0
valid_lists.append({"url": imdb_url, "limit": list_count})
return valid_lists

View file

@ -93,7 +93,7 @@ class Metadata:
final_value = util.validate_date(value, name, return_as="%Y-%m-%d")
current = current[:-9]
elif var_type == "float":
final_value = util.check_number(value, name, number_type="float", minimum=0, maximum=10)
final_value = util.parse(name, value, datatype="float", minimum=0, maximum=10)
else:
final_value = value
if current != str(final_value):
@ -170,7 +170,7 @@ class Metadata:
logger.info("")
year = None
if "year" in methods:
year = util.check_number(meta[methods["year"]], "year", minimum=1800, maximum=datetime.now().year + 1)
year = util.parse("year", meta, datatype="int", methods=methods, minimum=1800, maximum=datetime.now().year + 1)
title = mapping_name
if "title" in methods:

View file

@ -95,18 +95,6 @@ def tab_new_lines(data):
def make_ordinal(n):
return f"{n}{'th' if 11 <= (n % 100) <= 13 else ['th', 'st', 'nd', 'rd', 'th'][min(n % 10, 4)]}"
def parse_bool(method_name, method_data):
if isinstance(method_data, bool):
return method_data
elif isinstance(method_data, int):
return method_data > 0
elif str(method_data).lower() in ["t", "true"]:
return True
elif str(method_data).lower() in ["f", "false"]:
return False
else:
raise Failed(f"Collection Error: {method_name} attribute: {method_data} invalid must be either true or false")
def compile_list(data):
if isinstance(data, list):
text = ""
@ -126,40 +114,12 @@ def get_list(data, lower=False, split=True, int_list=False):
else: return [d.strip() for d in str(data).split(",")]
def get_int_list(data, id_type):
values = get_list(data)
int_values = []
for value in values:
for value in get_list(data):
try: int_values.append(regex_first_int(value, id_type))
except Failed as e: logger.error(e)
return int_values
def get_year_list(data, current_year, method):
final_years = []
values = get_list(data)
for value in values:
final_years.append(check_year(value, current_year, method))
return final_years
def check_year(year, current_year, method):
return check_number(year, method, minimum=1800, maximum=current_year)
def check_number(value, method, number_type="int", minimum=None, maximum=None):
if number_type == "int":
try: num_value = int(str(value))
except ValueError: raise Failed(f"Collection Error: {method}: {value} must be an integer")
elif number_type == "float":
try: num_value = float(str(value))
except ValueError: raise Failed(f"Collection Error: {method}: {value} must be a number")
else: raise Failed(f"Number Type: {number_type} invalid")
if minimum is not None and maximum is not None and (num_value < minimum or num_value > maximum):
raise Failed(f"Collection Error: {method}: {num_value} must be between {minimum} and {maximum}")
elif minimum is not None and num_value < minimum:
raise Failed(f"Collection Error: {method}: {num_value} is less then {minimum}")
elif maximum is not None and num_value > maximum:
raise Failed(f"Collection Error: {method}: {num_value} is greater then {maximum}")
else:
return num_value
def validate_date(date_text, method, return_as=None):
try: date_obg = datetime.strptime(str(date_text), "%Y-%m-%d" if "-" in str(date_text) else "%m/%d/%Y")
except ValueError: raise Failed(f"Collection Error: {method}: {date_text} must match pattern YYYY-MM-DD (e.g. 2020-12-25) or MM/DD/YYYY (e.g. 12/25/2020)")
@ -184,22 +144,6 @@ def unix_input(prompt, timeout=60):
except EOFError: raise Failed("Input Failed")
finally: signal.alarm(0)
def old_windows_input(prompt, timeout=60, timer=time.monotonic):
prompt = f"| {prompt}: "
sys.stdout.write(prompt)
sys.stdout.flush()
endtime = timer() + timeout
result = []
while timer() < endtime:
if msvcrt.kbhit():
result.append(msvcrt.getwche())
if result[-1] == "\n":
out = "".join(result[:-1])
logger.debug(f"{prompt[2:]}{out}")
return out
time.sleep(0.04)
raise TimeoutExpired
def windows_input(prompt, timeout=5):
sys.stdout.write(f"| {prompt}: ")
sys.stdout.flush()
@ -322,64 +266,66 @@ def is_locked(filepath):
file_object = open(filepath, 'a', 8)
if file_object:
locked = False
except IOError as message:
except IOError:
locked = True
finally:
if file_object:
file_object.close()
return locked
def validate_dict_list(method_name, data):
def parse(attribute, data, datatype=None, methods=None, parent=None, default=None, options=None, translation=None, minimum=1, maximum=None):
display = f"{parent + ' ' if parent else ''}{attribute} attribute"
if options is None and translation is not None:
options = [o for o in translation]
value = data[methods[attribute]] if methods and attribute in methods else data
if datatype == "list":
if methods and attribute in methods and data[methods[attribute]]:
return [v for v in value if v] if isinstance(value, list) else [str(value)]
return []
elif datatype == "dictlist":
final_list = []
for dict_data in get_list(data):
for dict_data in get_list(value):
if isinstance(dict_data, dict):
final_list.append((dict_data, {dm.lower(): dm for dm in dict_data}))
else:
raise Failed(f"Collection Error: {method_name} attribute is not a dictionary: {dict_data}")
raise Failed(f"Collection Error: {display} {dict_data} is not a dictionary")
return final_list
def parse_int(method, data, default=10, minimum=1, maximum=None):
list_count = regex_first_int(data, "List Size", default=default)
if maximum is None and list_count < minimum:
logger.warning(f"Collection Warning: {method} must an integer >= {minimum} using {default} as default")
elif maximum is not None and (list_count < minimum or list_count > maximum):
logger.warning(f"Collection Warning: {method} must an integer between {minimum} and {maximum} using {default} as default")
elif methods and attribute not in methods:
message = f"{display} not found"
elif value is None:
message = f"{display} is blank"
elif datatype == "bool":
if isinstance(value, bool):
return value
elif isinstance(value, int):
return value > 0
elif str(value).lower() in ["t", "true"]:
return True
elif str(value).lower() in ["f", "false"]:
return False
else:
return list_count
return default
def parse_from_dict(parent, method, data, methods, default=None, options=None, translation=None):
if options is None and translation is not None:
options = [o for o in translation]
if method not in methods:
message = f"{parent} {method} attribute not found"
elif data[methods[method]] is None:
message = f"{parent} {method} attribute is blank"
elif (translation is not None and str(data[methods[method]]).lower() not in translation) or \
(options is not None and translation is None and str(data[methods[method]]).lower() not in options):
message = f"{parent} {method} attribute {data[methods[method]]} must be in {options}"
message = f"{display} must be either true or false"
elif datatype in ["int", "float"]:
try:
value = int(str(value)) if datatype == "int" else float(str(value))
if (maximum is None and minimum <= value) or (maximum is not None and minimum <= value <= maximum):
return value
except ValueError:
pass
pre = f"{display} {value} must {'an integer' if datatype == 'int' else 'a number'}"
if maximum is None:
message = f"{pre} {minimum} or greater"
else:
return translation[data[methods[method]]] if translation is not None else data[methods[method]]
message = f"{pre} between {minimum} and {maximum}"
elif (translation is not None and str(value).lower() not in translation) or \
(options is not None and translation is None and str(value).lower() not in options):
message = f"{display} {value} must be in {options}"
else:
return translation[value] if translation is not None else value
if default is None:
raise Failed(f"Collection Error: {message}")
else:
logger.warning(f"Collection Warning: {message} using {default} as default")
return translation[default] if translation is not None else default
def parse_int_from_dict(parent, method, data, methods, default, minimum=1, maximum=None):
if method not in methods:
logger.warning(f"Collection Warning: {parent} {method} attribute not found using {default} as default")
elif not data[methods[method]]:
logger.warning(f"Collection Warning: {parent} {methods[method]} attribute is blank using {default} as default")
elif maximum is None and (not isinstance(data[methods[method]], int) or data[methods[method]] < minimum):
logger.warning(f"Collection Warning: {parent} {methods[method]} attribute {data[methods[method]]} must an integer >= {minimum} using {default} as default")
elif maximum is not None and (not isinstance(data[methods[method]], int) or data[methods[method]] < minimum or data[methods[method]] > maximum):
logger.warning(f"Collection Warning: {parent} {methods[method]} attribute {data[methods[method]]} must an integer between {minimum} and {maximum} using {default} as default")
else:
return data[methods[method]]
return default
def parse_list(method, data, methods):
if method in methods and data[methods[method]]:
return [i for i in data[methods[method]] if i] if isinstance(data[methods[method]], list) else [str(data[methods[method]])]
return []