2022-11-15 07:00:59 +00:00
import glob , os , re , requests , ruamel . yaml , signal , sys , time
2021-08-06 23:02:33 +00:00
from datetime import datetime , timedelta
2022-11-15 07:00:59 +00:00
from modules . logs import MyLogger
2022-07-26 18:30:40 +00:00
from num2words import num2words
2021-05-19 21:30:20 +00:00
from pathvalidate import is_valid_filename , sanitize_filename
2022-02-13 16:33:57 +00:00
from plexapi . audio import Album , Track
2021-05-11 01:22:18 +00:00
from plexapi . exceptions import BadRequest , NotFound , Unauthorized
2021-12-17 14:24:46 +00:00
from plexapi . video import Season , Episode , Movie
2021-01-20 21:37:59 +00:00
try :
import msvcrt
windows = True
except ModuleNotFoundError :
windows = False
2023-04-28 03:43:26 +00:00
logger : MyLogger = None # noqa
2021-01-20 21:37:59 +00:00
class TimeoutExpired ( Exception ) :
pass
2022-10-17 16:06:32 +00:00
class LimitReached ( Exception ) :
pass
2021-01-20 21:37:59 +00:00
class Failed ( Exception ) :
pass
2022-11-11 16:59:39 +00:00
class FilterFailed ( Failed ) :
pass
2022-11-30 21:12:52 +00:00
class Continue ( Exception ) :
pass
2022-05-28 16:01:51 +00:00
class Deleted ( Exception ) :
pass
2022-06-27 20:46:46 +00:00
class NonExisting ( Exception ) :
pass
2021-10-21 14:00:16 +00:00
class NotScheduled ( Exception ) :
pass
2022-01-06 19:16:12 +00:00
class NotScheduledRange ( NotScheduled ) :
pass
2021-06-30 15:02:55 +00:00
class ImageData :
2023-04-05 15:58:46 +00:00
def __init__ ( self , attribute , location , prefix = " " , is_poster = True , is_url = True , compare = None ) :
2021-06-12 15:29:17 +00:00
self . attribute = attribute
self . location = location
self . prefix = prefix
self . is_poster = is_poster
self . is_url = is_url
2023-04-05 15:58:46 +00:00
self . compare = compare if compare else location if is_url else os . stat ( location ) . st_size
2021-06-12 15:29:17 +00:00
self . message = f " { prefix } { ' poster ' if is_poster else ' background ' } to [ { ' URL ' if is_url else ' File ' } ] { location } "
2021-10-04 17:51:32 +00:00
def __str__ ( self ) :
return str ( self . __dict__ )
2021-01-20 21:37:59 +00:00
def retry_if_not_failed ( exception ) :
return not isinstance ( exception , Failed )
2021-05-11 01:22:18 +00:00
def retry_if_not_plex ( exception ) :
2022-11-12 16:42:53 +00:00
return not isinstance ( exception , ( BadRequest , NotFound , Unauthorized , Failed ) )
2021-05-11 01:22:18 +00:00
2021-01-20 21:37:59 +00:00
days_alias = {
" monday " : 0 , " mon " : 0 , " m " : 0 ,
" tuesday " : 1 , " tues " : 1 , " tue " : 1 , " tu " : 1 , " t " : 1 ,
" wednesday " : 2 , " wed " : 2 , " w " : 2 ,
" thursday " : 3 , " thurs " : 3 , " thur " : 3 , " thu " : 3 , " th " : 3 , " r " : 3 ,
" friday " : 4 , " fri " : 4 , " f " : 4 ,
" saturday " : 5 , " sat " : 5 , " s " : 5 ,
" sunday " : 6 , " sun " : 6 , " su " : 6 , " u " : 6
}
2021-08-14 22:59:35 +00:00
mod_displays = {
2021-09-13 02:42:33 +00:00
" " : " is " , " .not " : " is not " , " .is " : " is " , " .isnot " : " is not " , " .begins " : " begins with " , " .ends " : " ends with " , " .before " : " is before " , " .after " : " is after " ,
2022-04-21 16:36:44 +00:00
" .gt " : " is greater than " , " .gte " : " is greater than or equal " , " .lt " : " is less than " , " .lte " : " is less than or equal " , " .regex " : " is "
2021-08-14 22:59:35 +00:00
}
2021-07-23 19:44:21 +00:00
pretty_days = { 0 : " Monday " , 1 : " Tuesday " , 2 : " Wednesday " , 3 : " Thursday " , 4 : " Friday " , 5 : " Saturday " , 6 : " Sunday " }
2024-01-15 21:38:54 +00:00
lower_days = { v . lower ( ) : k for k , v in pretty_days . items ( ) }
2021-01-20 21:37:59 +00:00
pretty_months = {
2021-07-23 19:44:21 +00:00
1 : " January " , 2 : " February " , 3 : " March " , 4 : " April " , 5 : " May " , 6 : " June " ,
7 : " July " , 8 : " August " , 9 : " September " , 10 : " October " , 11 : " November " , 12 : " December "
2021-01-20 21:37:59 +00:00
}
2024-01-15 21:38:54 +00:00
lower_months = { v . lower ( ) : k for k , v in pretty_months . items ( ) }
2022-03-03 14:43:00 +00:00
seasons = [ " current " , " winter " , " spring " , " summer " , " fall " ]
2022-01-27 15:22:58 +00:00
advance_tags_to_edit = {
2024-03-26 16:15:46 +00:00
" Movie " : [ " metadata_language " , " use_original_title " , " credits_detection " ] ,
" Show " : [ " episode_sorting " , " keep_episodes " , " delete_episodes " , " season_display " , " episode_ordering " , " metadata_language " , " use_original_title " , " credits_detection " , " audio_language " , " subtitle_language " , " subtitle_mode " ] ,
" Season " : [ " audio_language " , " subtitle_language " , " subtitle_mode " ] ,
2022-01-27 15:22:58 +00:00
" Artist " : [ " album_sorting " ]
}
tags_to_edit = {
" Movie " : [ " genre " , " label " , " collection " , " country " , " director " , " producer " , " writer " ] ,
2022-06-07 07:06:20 +00:00
" Video " : [ " genre " , " label " , " collection " , " country " , " director " , " producer " , " writer " ] ,
2022-01-27 15:22:58 +00:00
" Show " : [ " genre " , " label " , " collection " ] ,
2022-04-30 22:12:45 +00:00
" Artist " : [ " genre " , " label " , " style " , " mood " , " country " , " collection " , " similar_artist " ]
2022-01-27 15:22:58 +00:00
}
2022-04-13 04:30:59 +00:00
collection_mode_options = {
" default " : " default " , " hide " : " hide " ,
" hide_items " : " hideItems " , " hideitems " : " hideItems " ,
" show_items " : " showItems " , " showitems " : " showItems "
}
2023-03-21 07:12:50 +00:00
image_content_types = [ " image/png " , " image/jpeg " , " image/webp " ]
2022-04-23 19:57:33 +00:00
parental_types = [ " nudity " , " violence " , " profanity " , " alcohol " , " frightening " ]
parental_values = [ " None " , " Mild " , " Moderate " , " Severe " ]
2023-03-01 22:10:46 +00:00
parental_levels = { " none " : [ ] , " mild " : [ " None " ] , " moderate " : [ " None " , " Mild " ] , " severe " : [ " None " , " Mild " , " Moderate " ] }
2022-04-23 19:57:33 +00:00
parental_labels = [ f " { t . capitalize ( ) } : { v } " for t in parental_types for v in parental_values ]
2022-04-21 18:24:56 +00:00
previous_time = None
start_time = None
2021-01-20 21:37:59 +00:00
2023-01-19 19:38:55 +00:00
def guess_branch ( version , env_version , git_branch ) :
if git_branch :
return git_branch
elif env_version in [ " nightly " , " develop " ] :
return env_version
elif version [ 2 ] > 0 :
dev_version = get_develop ( )
if version [ 1 ] != dev_version [ 1 ] or version [ 2 ] < = dev_version [ 2 ] :
return " develop "
else :
return " nightly "
else :
return " master "
2024-03-28 13:08:28 +00:00
def current_version ( version , branch = None ) :
if branch == " nightly " :
2022-09-22 18:34:25 +00:00
return get_nightly ( )
2023-01-24 14:29:25 +00:00
elif branch == " develop " :
2023-01-19 15:48:49 +00:00
return get_develop ( )
2022-04-22 15:24:44 +00:00
elif version [ 2 ] > 0 :
2022-09-22 18:34:25 +00:00
new_version = get_develop ( )
2022-05-05 12:48:04 +00:00
if version [ 1 ] != new_version [ 1 ] or new_version [ 2 ] > = version [ 2 ] :
return new_version
2022-09-22 18:34:25 +00:00
return get_nightly ( )
2022-04-22 15:24:44 +00:00
else :
2022-09-22 18:34:25 +00:00
return get_master ( )
nightly_version = None
def get_nightly ( ) :
global nightly_version
if nightly_version is None :
nightly_version = get_version ( " nightly " )
return nightly_version
develop_version = None
def get_develop ( ) :
global develop_version
if develop_version is None :
develop_version = get_version ( " develop " )
return develop_version
master_version = None
def get_master ( ) :
global master_version
if master_version is None :
master_version = get_version ( " master " )
return master_version
2022-04-22 15:24:44 +00:00
def get_version ( level ) :
2022-03-18 13:00:42 +00:00
try :
2022-05-03 14:29:23 +00:00
url = f " https://raw.githubusercontent.com/meisnate12/Plex-Meta-Manager/ { level } /VERSION "
2023-01-24 14:29:25 +00:00
return parse_version ( requests . get ( url ) . content . decode ( ) . strip ( ) , text = level )
2022-03-18 13:00:42 +00:00
except requests . exceptions . ConnectionError :
2022-05-03 14:29:23 +00:00
return " Unknown " , " Unknown " , 0
2022-03-18 13:00:42 +00:00
2023-01-24 14:29:25 +00:00
def parse_version ( version , text = " develop " ) :
version = version . replace ( " develop " , text )
split_version = version . split ( f " - { text } " )
2022-03-18 13:00:42 +00:00
return version , split_version [ 0 ] , int ( split_version [ 1 ] ) if len ( split_version ) > 1 else 0
2023-03-07 18:53:33 +00:00
def quote ( data ) :
return requests . utils . quote ( str ( data ) )
2024-04-04 14:26:39 +00:00
def download_image ( title , image_url , download_directory , is_poster = True , filename = None ) :
2022-05-10 13:25:35 +00:00
response = requests . get ( image_url , headers = header ( ) )
2023-11-25 21:18:38 +00:00
if response . status_code == 404 :
raise Failed ( f " Image Error: Not Found on Image URL: { image_url } " )
2023-03-21 07:12:50 +00:00
if response . status_code > = 400 :
2023-11-25 21:18:38 +00:00
raise Failed ( f " Image Error: { response . status_code } on Image URL: { image_url } " )
2023-03-21 07:12:50 +00:00
if " Content-Type " not in response . headers or response . headers [ " Content-Type " ] not in image_content_types :
raise Failed ( " Image Not PNG, JPG, or WEBP " )
new_image = os . path . join ( download_directory , f " { filename } " ) if filename else download_directory
if response . headers [ " Content-Type " ] == " image/jpeg " :
new_image + = " .jpg "
elif response . headers [ " Content-Type " ] == " image/webp " :
new_image + = " .webp "
else :
new_image + = " .png "
2022-05-10 13:25:35 +00:00
with open ( new_image , " wb " ) as handler :
handler . write ( response . content )
2024-04-04 14:26:39 +00:00
return ImageData ( " asset_directory " , new_image , prefix = f " { title } ' s " , is_poster = is_poster , is_url = False )
2022-05-10 13:25:35 +00:00
def get_image_dicts ( group , alias ) :
posters = { }
backgrounds = { }
for attr in [ " url_poster " , " file_poster " , " url_background " , " file_background " ] :
if attr in alias :
if group [ alias [ attr ] ] :
if " poster " in attr :
2022-05-20 20:35:11 +00:00
posters [ attr ] = group [ alias [ attr ] ]
2022-05-10 13:25:35 +00:00
else :
2022-05-20 20:35:11 +00:00
backgrounds [ attr ] = group [ alias [ attr ] ]
2022-05-10 13:25:35 +00:00
else :
logger . error ( f " Metadata Error: { attr } attribute is blank " )
return posters , backgrounds
def pick_image ( title , images , prioritize_assets , download_url_assets , item_dir , is_poster = True , image_name = None ) :
image_type = " poster " if is_poster else " background "
if image_name is None :
image_name = image_type
if images :
2022-05-20 20:35:11 +00:00
logger . debug ( f " { len ( images ) } { image_type } { ' s ' if len ( images ) > 1 else ' ' } found: " )
2022-05-10 13:25:35 +00:00
for i in images :
logger . debug ( f " Method: { i } { image_type . capitalize ( ) } : { images [ i ] } " )
if prioritize_assets and " asset_directory " in images :
return images [ " asset_directory " ]
2023-03-21 07:12:50 +00:00
for attr in [ " style_data " , f " url_ { image_type } " , f " file_ { image_type } " , f " tmdb_ { image_type } " , " tmdb_profile " ,
2023-04-05 15:58:46 +00:00
" tmdb_list_poster " , " tvdb_list_poster " , f " tvdb_ { image_type } " , " asset_directory " , f " pmm_ { image_type } " ,
" tmdb_person " , " tmdb_collection_details " , " tmdb_actor_details " , " tmdb_crew_details " , " tmdb_director_details " ,
2023-03-10 15:07:25 +00:00
" tmdb_producer_details " , " tmdb_writer_details " , " tmdb_movie_details " , " tmdb_list_details " ,
" tvdb_list_details " , " tvdb_movie_details " , " tvdb_show_details " , " tmdb_show_details " ] :
if attr in images :
2023-03-21 07:12:50 +00:00
if attr in [ " style_data " , f " url_ { image_type } " ] and download_url_assets and item_dir :
2023-03-10 20:32:37 +00:00
if " asset_directory " in images :
return images [ " asset_directory " ]
else :
try :
2024-04-04 14:26:39 +00:00
return download_image ( title , images [ attr ] , item_dir , is_poster = is_poster , filename = image_name )
2023-03-10 20:32:37 +00:00
except Failed as e :
logger . error ( e )
2023-04-05 15:58:46 +00:00
if attr in [ " asset_directory " , f " pmm_ { image_type } " ] :
2023-03-10 20:32:37 +00:00
return images [ attr ]
return ImageData ( attr , images [ attr ] , is_poster = is_poster , is_url = attr != f " file_ { image_type } " )
2022-05-10 13:25:35 +00:00
2021-08-04 14:20:52 +00:00
def add_dict_list ( keys , value , dict_map ) :
for key in keys :
if key in dict_map :
2022-09-22 15:17:22 +00:00
dict_map [ key ] . append ( int ( value ) )
2021-08-04 14:20:52 +00:00
else :
2022-09-22 15:17:22 +00:00
dict_map [ key ] = [ int ( value ) ]
2021-08-04 14:20:52 +00:00
2022-05-09 15:22:41 +00:00
def get_list ( data , lower = False , upper = False , split = True , int_list = False , trim = True ) :
2022-05-05 12:48:04 +00:00
if split is True : split = " , "
2021-05-25 22:22:59 +00:00
if data is None : return None
2022-03-11 14:10:23 +00:00
elif isinstance ( data , list ) : list_data = data
2021-01-20 21:37:59 +00:00
elif isinstance ( data , dict ) : return [ data ]
2022-03-11 14:36:04 +00:00
elif split is False : list_data = [ str ( data ) ]
2022-05-22 17:26:54 +00:00
else : list_data = [ s . strip ( ) for s in str ( data ) . split ( split ) ]
2022-03-11 14:10:23 +00:00
2022-05-09 15:22:41 +00:00
def get_str ( input_data ) :
return str ( input_data ) . strip ( ) if trim else str ( input_data )
if lower is True : return [ get_str ( d ) . lower ( ) for d in list_data ]
elif upper is True : return [ get_str ( d ) . upper ( ) for d in list_data ]
2021-11-28 08:18:12 +00:00
elif int_list is True :
2022-05-09 15:22:41 +00:00
try : return [ int ( get_str ( d ) ) for d in list_data ]
2021-11-28 08:18:12 +00:00
except ValueError : return [ ]
2022-05-09 15:22:41 +00:00
else : return [ d if isinstance ( d , dict ) else get_str ( d ) for d in list_data ]
2021-01-20 21:37:59 +00:00
def get_int_list ( data , id_type ) :
int_values = [ ]
2021-07-30 19:19:43 +00:00
for value in get_list ( data ) :
2021-01-20 21:37:59 +00:00
try : int_values . append ( regex_first_int ( value , id_type ) )
except Failed as e : logger . error ( e )
return int_values
2023-12-07 19:49:32 +00:00
def validate_date ( date_text , return_as = None ) :
2022-03-23 07:45:26 +00:00
if isinstance ( date_text , datetime ) :
date_obg = date_text
else :
try :
date_obg = datetime . strptime ( str ( date_text ) , " % Y- % m- %d " if " - " in str ( date_text ) else " % m/ %d / % Y " )
except ValueError :
2023-12-07 19:49:32 +00:00
raise Failed ( f " { date_text } must match pattern YYYY-MM-DD (e.g. 2020-12-25) or MM/DD/YYYY (e.g. 12/25/2020) " )
2021-07-21 19:25:29 +00:00
return datetime . strftime ( date_obg , return_as ) if return_as else date_obg
2021-02-25 21:59:56 +00:00
2022-04-20 16:03:08 +00:00
def validate_regex ( data , col_type , validate = True ) :
regex_list = get_list ( data , split = False )
valid_regex = [ ]
for reg in regex_list :
try :
re . compile ( reg )
valid_regex . append ( reg )
except re . error :
err = f " { col_type } Error: Regular Expression Invalid: { reg } "
if validate :
raise Failed ( err )
else :
logger . error ( err )
return valid_regex
2021-01-20 21:37:59 +00:00
def logger_input ( prompt , timeout = 60 ) :
if windows : return windows_input ( prompt , timeout )
elif hasattr ( signal , " SIGALRM " ) : return unix_input ( prompt , timeout )
else : raise SystemError ( " Input Timeout not supported on this system " )
2021-07-14 14:47:20 +00:00
def header ( language = " en-US,en;q=0.5 " ) :
2023-05-26 14:20:47 +00:00
return { " Accept-Language " : " eng " if language == " default " else language , " User-Agent " : " Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/113.0 " }
2021-07-14 14:47:20 +00:00
2021-01-20 21:37:59 +00:00
def alarm_handler ( signum , frame ) :
raise TimeoutExpired
def unix_input ( prompt , timeout = 60 ) :
2021-02-24 06:44:06 +00:00
prompt = f " | { prompt } : "
2021-01-20 21:37:59 +00:00
signal . signal ( signal . SIGALRM , alarm_handler )
signal . alarm ( timeout )
2021-07-04 04:13:06 +00:00
try : return input ( prompt )
except EOFError : raise Failed ( " Input Failed " )
finally : signal . alarm ( 0 )
2021-01-20 21:37:59 +00:00
def windows_input ( prompt , timeout = 5 ) :
2021-02-24 06:44:06 +00:00
sys . stdout . write ( f " | { prompt } : " )
2021-01-20 21:37:59 +00:00
sys . stdout . flush ( )
result = [ ]
2023-04-28 03:43:26 +00:00
s_time = time . time ( )
2021-01-20 21:37:59 +00:00
while True :
if msvcrt . kbhit ( ) :
2021-02-24 06:42:58 +00:00
char = msvcrt . getwche ( )
if ord ( char ) == 13 : # enter_key
2021-01-20 21:37:59 +00:00
out = " " . join ( result )
print ( " " )
2021-02-24 06:44:06 +00:00
logger . debug ( f " { prompt } : { out } " )
2021-01-20 21:37:59 +00:00
return out
2021-02-24 06:42:58 +00:00
elif ord ( char ) > = 32 : #space_char
result . append ( char )
2023-04-28 03:43:26 +00:00
if ( time . time ( ) - s_time ) > timeout :
2021-01-20 21:37:59 +00:00
print ( " " )
raise TimeoutExpired
def get_id_from_imdb_url ( imdb_url ) :
match = re . search ( " (tt \\ d+) " , str ( imdb_url ) )
if match : return match . group ( 1 )
2021-02-24 06:44:06 +00:00
else : raise Failed ( f " Regex Error: Failed to parse IMDb ID from IMDb URL: { imdb_url } " )
2021-01-20 21:37:59 +00:00
def regex_first_int ( data , id_type , default = None ) :
match = re . search ( " ( \\ d+) " , str ( data ) )
if match :
return int ( match . group ( 1 ) )
elif default :
2021-02-24 06:44:06 +00:00
logger . warning ( f " Regex Warning: Failed to parse { id_type } from { data } using { default } as default " )
2021-01-20 21:37:59 +00:00
return int ( default )
else :
2021-02-24 06:44:06 +00:00
raise Failed ( f " Regex Error: Failed to parse { id_type } from { data } " )
2021-01-20 21:37:59 +00:00
2021-05-19 21:30:20 +00:00
def validate_filename ( filename ) :
2022-05-31 15:22:51 +00:00
if is_valid_filename ( str ( filename ) ) :
2021-05-20 20:38:48 +00:00
return filename , None
2021-05-19 21:30:20 +00:00
else :
2022-05-31 15:22:51 +00:00
mapping_name = sanitize_filename ( str ( filename ) )
2021-05-20 20:38:48 +00:00
return mapping_name , f " Log Folder Name: { filename } is invalid using { mapping_name } "
2021-07-06 15:46:29 +00:00
2021-12-17 14:24:46 +00:00
def item_title ( item ) :
if isinstance ( item , Season ) :
if f " Season { item . index } " == item . title :
return f " { item . parentTitle } { item . title } "
else :
return f " { item . parentTitle } Season { item . index } : { item . title } "
elif isinstance ( item , Episode ) :
2022-08-23 15:30:53 +00:00
season = item . parentIndex if item . parentIndex else 0
episode = item . index if item . index else 0
show_title = item . grandparentTitle if item . grandparentTitle else " "
season_title = f " { item . parentTitle } : " if item . parentTitle and f " Season { season } " == item . parentTitle else " "
return f " { show_title } S { season : 02 } E { episode : 02 } : { season_title } { item . title if item . title else ' ' } "
2021-12-17 14:24:46 +00:00
elif isinstance ( item , Movie ) and item . year :
return f " { item . title } ( { item . year } ) "
2021-12-29 15:51:22 +00:00
elif isinstance ( item , Album ) :
return f " { item . parentTitle } : { item . title } "
elif isinstance ( item , Track ) :
return f " { item . grandparentTitle } : { item . parentTitle } : { item . title } "
2021-12-17 14:24:46 +00:00
else :
return item . title
2021-12-17 23:18:34 +00:00
def item_set ( item , item_id ) :
return { " title " : item_title ( item ) , " tmdb " if isinstance ( item , Movie ) else " tvdb " : item_id }
2021-07-06 15:46:29 +00:00
def is_locked ( filepath ) :
locked = None
file_object = None
if os . path . exists ( filepath ) :
try :
file_object = open ( filepath , ' a ' , 8 )
if file_object :
locked = False
2021-07-30 19:19:43 +00:00
except IOError :
2021-07-06 15:46:29 +00:00
locked = True
finally :
if file_object :
file_object . close ( )
return locked
2021-07-22 21:00:45 +00:00
2021-12-18 03:02:24 +00:00
def time_window ( tw ) :
2021-11-27 00:30:41 +00:00
today = datetime . now ( )
2021-12-18 03:02:24 +00:00
if tw == " today " :
2021-11-27 00:30:41 +00:00
return f " { today : %Y-%m-%d } "
2021-12-18 03:02:24 +00:00
elif tw == " yesterday " :
2021-11-27 00:30:41 +00:00
return f " { today - timedelta ( days = 1 ) : %Y-%m-%d } "
2021-12-18 03:02:24 +00:00
elif tw == " this_week " :
2021-11-27 00:30:41 +00:00
return f " { today : %Y-0%V } "
2021-12-18 03:02:24 +00:00
elif tw == " last_week " :
2021-11-27 00:30:41 +00:00
return f " { today - timedelta ( weeks = 1 ) : %Y-0%V } "
2021-12-18 03:02:24 +00:00
elif tw == " this_month " :
2021-11-27 00:30:41 +00:00
return f " { today : %Y-%m } "
2023-01-10 02:11:11 +00:00
elif tw == " last_month " and today . month == 1 :
return f " { today . year - 1 } -12 "
2021-12-18 03:02:24 +00:00
elif tw == " last_month " :
2023-01-10 02:11:11 +00:00
return f " { today . year } - { today . month - 1 : 02 } "
2021-12-18 03:02:24 +00:00
elif tw == " this_year " :
2021-11-27 00:30:41 +00:00
return f " { today . year } "
2021-12-18 03:02:24 +00:00
elif tw == " last_year " :
2021-11-27 00:30:41 +00:00
return f " { today . year - 1 } "
else :
2021-12-18 03:02:24 +00:00
return tw
2021-11-27 00:30:41 +00:00
2023-02-24 15:52:23 +00:00
def load_files ( files_to_load , method , err_type = " Config " , schedule = None , lib_vars = None , single = False ) :
2022-04-10 02:28:05 +00:00
files = [ ]
2023-04-18 18:37:08 +00:00
had_scheduled = False
2022-06-01 04:50:38 +00:00
if not lib_vars :
lib_vars = { }
2023-02-24 15:52:23 +00:00
files_to_load = get_list ( files_to_load , split = False )
if single and len ( files_to_load ) > 1 :
raise Failed ( f " { err_type } Error: { method } can only have one entry " )
for file in files_to_load :
2023-12-31 16:45:00 +00:00
logger . info ( " " )
2022-04-18 18:16:39 +00:00
if isinstance ( file , dict ) :
2022-04-26 05:34:06 +00:00
current = [ ]
2022-04-10 02:28:05 +00:00
def check_dict ( attr , name ) :
2023-12-25 08:16:04 +00:00
if attr in file and ( method != " metadata_files " or attr != " pmm " ) :
2023-12-31 16:45:00 +00:00
logger . info ( f " Reading { attr } : { file [ attr ] } " )
2022-04-18 18:16:39 +00:00
if file [ attr ] :
2024-01-07 15:35:05 +00:00
if attr == " pmm " and file [ attr ] == " other_award " :
logger . error ( f " { err_type } Error: The PMM Default other_award has been deprecated. Please visit the wiki for the full list of available award files " )
elif attr == " git " and file [ attr ] . startswith ( " PMM/ " ) :
2023-12-31 16:45:00 +00:00
current . append ( ( " PMM Default " , file [ attr ] [ 4 : ] ) )
2022-09-27 06:19:29 +00:00
else :
2023-12-31 16:45:00 +00:00
current . append ( ( name , file [ attr ] ) )
2022-04-10 02:28:05 +00:00
else :
2023-02-24 15:52:23 +00:00
logger . error ( f " { err_type } Error: { method } { attr } is blank " )
2023-12-31 16:45:00 +00:00
return " "
2022-04-10 02:28:05 +00:00
check_dict ( " url " , " URL " )
check_dict ( " git " , " Git " )
2022-09-27 06:19:29 +00:00
check_dict ( " pmm " , " PMM Default " )
2022-04-10 02:28:05 +00:00
check_dict ( " repo " , " Repo " )
check_dict ( " file " , " File " )
2023-02-24 15:52:23 +00:00
if not single and " folder " in file :
2023-12-31 16:45:00 +00:00
logger . info ( f " Reading folder: { file [ ' folder ' ] } " )
2022-04-18 18:16:39 +00:00
if file [ " folder " ] is None :
2023-02-24 15:52:23 +00:00
logger . error ( f " { err_type } Error: { method } folder is blank " )
2022-04-18 18:16:39 +00:00
elif not os . path . isdir ( file [ " folder " ] ) :
2023-02-24 15:52:23 +00:00
logger . error ( f " { err_type } Error: Folder not found: { file [ ' folder ' ] } " )
2022-04-10 02:28:05 +00:00
else :
2022-10-02 17:19:32 +00:00
yml_files = glob_filter ( os . path . join ( file [ " folder " ] , " *.yml " ) )
yml_files . extend ( glob_filter ( os . path . join ( file [ " folder " ] , " *.yaml " ) ) )
2022-04-10 02:28:05 +00:00
if yml_files :
2023-12-31 16:45:00 +00:00
current . extend ( [ ( " File " , yml ) for yml in yml_files ] )
2022-04-10 02:28:05 +00:00
else :
2023-02-24 15:52:23 +00:00
logger . error ( f " { err_type } Error: No YAML (.yml|.yaml) files found in { file [ ' folder ' ] } " )
2022-04-26 05:34:06 +00:00
2023-12-31 16:45:00 +00:00
temp_vars = { }
if " template_variables " in file and file [ " template_variables " ] and isinstance ( file [ " template_variables " ] , dict ) :
temp_vars = file [ " template_variables " ]
for k , v in lib_vars . items ( ) :
if k not in temp_vars :
temp_vars [ k ] = v
if temp_vars :
logger . info ( f " Template Variables: { temp_vars } " )
asset_directory = [ ]
if " asset_directory " in file and file [ " asset_directory " ] :
logger . info ( f " Asset Directory: { file [ ' asset_directory ' ] } " )
for asset_path in get_list ( file [ " asset_directory " ] , split = False ) :
if os . path . exists ( asset_path ) :
asset_directory . append ( asset_path )
else :
logger . error ( f " { err_type } Error: Asset Directory Does Not Exist: { asset_path } " )
2022-04-26 05:34:06 +00:00
if schedule and " schedule " in file and file [ " schedule " ] :
current_time , run_hour , ignore_schedules = schedule
2023-12-31 16:45:00 +00:00
logger . info ( f " Schedule: { file [ ' schedule ' ] } " )
2022-04-26 05:34:06 +00:00
err = None
2023-12-31 16:45:00 +00:00
schedule_str = None
2022-04-26 05:34:06 +00:00
try :
2023-12-31 16:45:00 +00:00
schedule_str = schedule_check ( " schedule " , file [ " schedule " ] , current_time , run_hour )
2022-04-26 05:34:06 +00:00
except NotScheduledRange as e :
err = e
2023-12-31 16:45:00 +00:00
schedule_str = e
2022-04-26 05:34:06 +00:00
except NotScheduled as e :
if not ignore_schedules :
err = e
2023-12-31 16:45:00 +00:00
schedule_str = e
if schedule_str :
logger . info ( f " Schedule Read: { schedule_str } \n " )
2022-04-26 05:34:06 +00:00
if err :
2023-04-18 18:37:08 +00:00
had_scheduled = True
2023-12-31 16:45:00 +00:00
logger . warning ( f " This { ' set of files ' if len ( current ) > 1 else ' file ' } not scheduled to run " )
2022-04-26 05:34:06 +00:00
continue
2023-12-31 16:45:00 +00:00
files . extend ( [ ( ft , fp , temp_vars , asset_directory ) for ft , fp in current ] )
2022-04-10 02:28:05 +00:00
else :
2023-12-31 16:45:00 +00:00
logger . info ( f " Reading file: { file } " )
2022-04-18 18:16:39 +00:00
if os . path . exists ( file ) :
2022-04-21 05:35:07 +00:00
files . append ( ( " File " , file , { } , None ) )
2022-04-10 02:28:05 +00:00
else :
2023-02-24 15:52:23 +00:00
logger . error ( f " { err_type } Error: Path not found: { file } " )
2023-04-18 18:37:08 +00:00
return files , had_scheduled
2022-04-10 02:28:05 +00:00
2022-02-06 07:33:09 +00:00
def check_num ( num , is_int = True ) :
try :
return int ( str ( num ) ) if is_int else float ( str ( num ) )
except ( ValueError , TypeError ) :
return None
2022-04-13 04:30:59 +00:00
def check_collection_mode ( collection_mode ) :
if collection_mode and str ( collection_mode ) . lower ( ) in collection_mode_options :
return collection_mode_options [ str ( collection_mode ) . lower ( ) ]
else :
raise Failed ( f " Config Error: { collection_mode } collection_mode invalid \n \t default (Library default) \n \t hide (Hide Collection) \n \t hide_items (Hide Items in this Collection) \n \t show_items (Show this Collection and its Items) " )
2021-08-10 15:33:32 +00:00
def glob_filter ( filter_in ) :
filter_in = filter_in . translate ( { ord ( " [ " ) : " [[] " , ord ( " ] " ) : " []] " } ) if " [ " in filter_in else filter_in
2021-12-13 07:30:19 +00:00
return glob . glob ( filter_in )
2021-08-10 15:33:32 +00:00
2021-08-07 06:01:21 +00:00
def is_date_filter ( value , modifier , data , final , current_time ) :
if value is None :
return True
2021-08-06 23:02:33 +00:00
if modifier in [ " " , " .not " ] :
threshold_date = current_time - timedelta ( days = data )
2021-08-07 06:01:21 +00:00
if ( modifier == " " and ( value is None or value < threshold_date ) ) \
or ( modifier == " .not " and value and value > = threshold_date ) :
return True
2021-08-06 23:02:33 +00:00
elif modifier in [ " .before " , " .after " ] :
2023-12-07 19:49:32 +00:00
try :
filter_date = validate_date ( data )
except Failed as e :
raise Failed ( f " Collection Error: { final } : { e } " )
2021-08-07 06:01:21 +00:00
if ( modifier == " .before " and value > = filter_date ) or ( modifier == " .after " and value < = filter_date ) :
return True
2021-08-06 23:02:33 +00:00
elif modifier == " .regex " :
2024-01-09 16:59:17 +00:00
jailbreak = False
2021-08-06 23:02:33 +00:00
for check_data in data :
2021-08-07 06:01:21 +00:00
if re . compile ( check_data ) . match ( value . strftime ( " % m/ %d / % Y " ) ) :
2021-08-06 23:02:33 +00:00
jailbreak = True
break
if not jailbreak :
2021-08-07 06:01:21 +00:00
return True
return False
def is_number_filter ( value , modifier , data ) :
2022-06-16 03:20:40 +00:00
return value is None or ( modifier == " " and value == data ) \
or ( modifier == " .not " and value != data ) \
or ( modifier == " .gt " and value < = data ) \
2021-08-07 06:01:21 +00:00
or ( modifier == " .gte " and value < data ) \
or ( modifier == " .lt " and value > = data ) \
or ( modifier == " .lte " and value > data )
2021-12-10 16:17:50 +00:00
def is_boolean_filter ( value , data ) :
return ( data and not value ) or ( not data and value )
2021-08-07 06:01:21 +00:00
def is_string_filter ( values , modifier , data ) :
jailbreak = False
2022-11-15 07:00:59 +00:00
if modifier == " .regex " :
logger . trace ( f " Regex Values: { values } " )
2021-08-07 06:01:21 +00:00
for value in values :
for check_value in data :
if ( modifier in [ " " , " .not " ] and check_value . lower ( ) in value . lower ( ) ) \
2021-09-13 02:42:33 +00:00
or ( modifier in [ " .is " , " .isnot " ] and value . lower ( ) == check_value . lower ( ) ) \
2021-08-07 06:01:21 +00:00
or ( modifier == " .begins " and value . lower ( ) . startswith ( check_value . lower ( ) ) ) \
or ( modifier == " .ends " and value . lower ( ) . endswith ( check_value . lower ( ) ) ) \
2022-04-13 04:30:59 +00:00
or ( modifier == " .regex " and re . compile ( check_value ) . search ( value ) ) :
2021-08-07 06:01:21 +00:00
jailbreak = True
break
if jailbreak : break
2021-09-13 02:42:33 +00:00
return ( jailbreak and modifier in [ " .not " , " .isnot " ] ) or ( not jailbreak and modifier in [ " " , " .is " , " .begins " , " .ends " , " .regex " ] )
2021-08-06 23:02:33 +00:00
2021-12-19 04:11:23 +00:00
def check_day ( _m , _d ) :
if _m in [ 1 , 3 , 5 , 7 , 8 , 10 , 12 ] and _d > 31 :
return _m , 31
elif _m in [ 4 , 6 , 9 , 11 ] and _d > 30 :
return _m , 30
elif _m == 2 and _d > 28 :
return _m , 28
else :
return _m , _d
2022-05-12 14:09:08 +00:00
def schedule_check ( attribute , data , current_time , run_hour , is_all = False ) :
2022-01-06 19:16:12 +00:00
range_collection = False
2022-06-27 20:46:46 +00:00
non_existing = False
2022-04-30 22:25:29 +00:00
all_check = 0
schedules_run = 0
2021-12-18 03:02:24 +00:00
next_month = current_time . replace ( day = 28 ) + timedelta ( days = 4 )
last_day = next_month - timedelta ( days = next_month . day )
schedule_str = " "
2022-05-12 14:09:08 +00:00
if isinstance ( data , str ) and ( ( " all " in data and not data . endswith ( " ] " ) ) or data . count ( " all " ) > 1 ) :
raise Failed ( " Schedule Error: each all schedule must be on its own line " )
elif isinstance ( data , str ) and " all " in data :
data = [ data ]
2022-04-30 22:25:29 +00:00
for schedule in get_list ( data ) :
2021-12-18 03:02:24 +00:00
run_time = str ( schedule ) . lower ( )
2022-01-06 19:16:12 +00:00
display = f " { attribute } attribute { schedule } invalid "
2022-04-30 22:25:29 +00:00
schedules_run + = 1
2022-05-12 14:09:08 +00:00
if run_time . startswith ( " all " ) :
match = re . search ( " \\ [([^ \\ ]]+) \\ ] " , run_time )
if not match :
logger . error ( f " Schedule Error: failed to parse { attribute } : { schedule } " )
continue
try :
2022-05-12 15:04:52 +00:00
schedule_str + = f " \n Scheduled to meet all of these: \n \t "
schedule_str + = schedule_check ( attribute , match . group ( 1 ) , current_time , run_hour , is_all = True )
2022-05-12 14:09:08 +00:00
all_check + = 1
2022-05-12 15:04:52 +00:00
except NotScheduled as e :
schedule_str + = str ( e )
2022-05-12 14:09:08 +00:00
continue
2022-04-30 22:25:29 +00:00
elif run_time . startswith ( ( " day " , " daily " ) ) :
all_check + = 1
2022-06-27 20:46:46 +00:00
elif run_time . startswith ( " non_existing " ) :
all_check + = 1
non_existing = True
2021-12-18 03:02:24 +00:00
elif run_time == " never " :
schedule_str + = f " \n Never scheduled to run "
elif run_time . startswith ( ( " hour " , " week " , " month " , " year " , " range " ) ) :
match = re . search ( " \\ (([^)]+) \\ ) " , run_time )
if not match :
2021-12-19 05:41:58 +00:00
logger . error ( f " Schedule Error: failed to parse { attribute } : { schedule } " )
2021-12-18 03:02:24 +00:00
continue
param = match . group ( 1 )
if run_time . startswith ( " hour " ) :
2023-08-24 15:11:22 +00:00
if " - " in run_time :
2023-08-30 15:18:20 +00:00
start , end = param . split ( " - " )
2023-08-24 15:11:22 +00:00
try :
start = int ( start )
end = int ( end )
if start != end and 0 < = start < = 23 and 0 < = end < = 23 :
schedule_str + = f " \n Scheduled to run between the { num2words ( start , to = ' ordinal_num ' ) } hour and the { num2words ( end , to = ' ordinal_num ' ) } hour "
if end > start and start < = run_hour < = end :
all_check + = 1
elif start > end and ( start < = run_hour or run_hour < = end ) :
all_check + = 1
else :
raise ValueError
except ValueError :
logger . error ( f " Schedule Error: hourly { start } - { end } each must be a different integer between 0 and 23 " )
else :
try :
if 0 < = int ( param ) < = 23 :
schedule_str + = f " \n Scheduled to run on the { num2words ( param , to = ' ordinal_num ' ) } hour "
if run_hour == int ( param ) :
all_check + = 1
else :
raise ValueError
except ValueError :
logger . error ( f " Schedule Error: hourly { display } must be an integer between 0 and 23 " )
2021-12-18 03:02:24 +00:00
elif run_time . startswith ( " week " ) :
2023-08-24 15:11:22 +00:00
ok_days = param . lower ( ) . split ( " | " )
err = None
for ok_day in ok_days :
if ok_day not in days_alias :
err = f " Schedule Error: weekly { display } must be a day of the week i.e. weekly(Monday) "
if err :
logger . error ( err )
2021-12-18 03:02:24 +00:00
continue
2023-08-24 15:11:22 +00:00
pass_day = False
for ok_day in ok_days :
weekday = days_alias [ ok_day ]
schedule_str + = f " \n Scheduled weekly on { pretty_days [ weekday ] } "
if weekday == current_time . weekday ( ) :
pass_day = True
if pass_day :
2022-04-30 22:25:29 +00:00
all_check + = 1
2021-12-18 03:02:24 +00:00
elif run_time . startswith ( " month " ) :
try :
if 1 < = int ( param ) < = 31 :
2022-07-26 18:30:40 +00:00
schedule_str + = f " \n Scheduled monthly on the { num2words ( param , to = ' ordinal_num ' ) } "
if current_time . day == int ( param ) or ( current_time . day == last_day . day and int ( param ) > last_day . day ) :
2022-04-30 22:25:29 +00:00
all_check + = 1
2021-12-18 03:02:24 +00:00
else :
raise ValueError
except ValueError :
2022-01-06 19:16:12 +00:00
logger . error ( f " Schedule Error: monthly { display } must be an integer between 1 and 31 " )
2021-12-18 03:02:24 +00:00
elif run_time . startswith ( " year " ) :
try :
if " / " in param :
opt = param . split ( " / " )
month = int ( opt [ 0 ] )
day = int ( opt [ 1 ] )
2022-07-26 18:30:40 +00:00
schedule_str + = f " \n Scheduled yearly on { pretty_months [ month ] } { num2words ( day , to = ' ordinal_num ' ) } "
if current_time . month == month and ( current_time . day == day or ( current_time . day == last_day . day and day > last_day . day ) ) :
2022-04-30 22:25:29 +00:00
all_check + = 1
2021-12-18 03:02:24 +00:00
else :
raise ValueError
except ValueError :
2022-01-06 19:16:12 +00:00
logger . error ( f " Schedule Error: yearly { display } must be in the MM/DD format i.e. yearly(11/22) " )
2021-12-18 03:02:24 +00:00
elif run_time . startswith ( " range " ) :
2023-09-19 19:28:03 +00:00
ranges = [ ]
range_pass = False
for ok_range in param . lower ( ) . split ( " | " ) :
2023-09-16 20:55:12 +00:00
match = re . match ( " ^(1[0-2]|0?[1-9])/(3[01]|[12][0-9]|0?[1-9])-(1[0-2]|0?[1-9])/(3[01]|[12][0-9]|0?[1-9])$ " , ok_range )
if not match :
logger . error ( f " Schedule Error: range { display } must be in the MM/DD-MM/DD format i.e. range(12/01-12/25) " )
continue
month_start , day_start = check_day ( int ( match . group ( 1 ) ) , int ( match . group ( 2 ) ) )
month_end , day_end = check_day ( int ( match . group ( 3 ) ) , int ( match . group ( 4 ) ) )
month_check , day_check = check_day ( current_time . month , current_time . day )
check = datetime . strptime ( f " { month_check } / { day_check } " , " % m/ %d " )
start = datetime . strptime ( f " { month_start } / { day_start } " , " % m/ %d " )
end = datetime . strptime ( f " { month_end } / { day_end } " , " % m/ %d " )
range_collection = True
2023-09-19 19:28:03 +00:00
ranges . append ( f " { pretty_months [ month_start ] } { num2words ( day_start , to = ' ordinal_num ' ) } and { pretty_months [ month_end ] } { num2words ( day_end , to = ' ordinal_num ' ) } " )
2023-09-16 20:55:12 +00:00
if start < = check < = end if start < end else ( check < = end or check > = start ) :
2023-09-19 19:28:03 +00:00
range_pass = True
if ranges :
schedule_str + = f " \n Scheduled { ' or ' . join ( ranges ) } "
if range_pass :
all_check + = 1
2021-12-18 03:02:24 +00:00
else :
2022-01-06 19:16:12 +00:00
logger . error ( f " Schedule Error: { display } " )
2022-05-12 15:04:52 +00:00
if is_all :
schedule_str . replace ( " \n " , " \n \t " )
2022-05-12 14:09:08 +00:00
if ( all_check == 0 and not is_all ) or ( is_all and schedules_run != all_check ) :
2022-06-27 20:46:46 +00:00
if non_existing :
raise NonExisting ( schedule_str )
elif range_collection :
2022-04-30 22:25:29 +00:00
raise NotScheduledRange ( schedule_str )
else :
raise NotScheduled ( schedule_str )
2022-05-12 15:04:52 +00:00
return schedule_str
2022-01-28 18:36:21 +00:00
2024-01-26 05:10:35 +00:00
def check_int ( value , datatype = " int " , minimum = 1 , maximum = None , throw = False ) :
2022-03-28 16:15:34 +00:00
try :
value = int ( str ( value ) ) if datatype == " int " else float ( str ( value ) )
if ( maximum is None and minimum < = value ) or ( maximum is not None and minimum < = value < = maximum ) :
return value
except ValueError :
2024-01-26 05:10:35 +00:00
if throw :
message = f " { value } must be { ' an integer ' if datatype == ' int ' else ' a number ' } "
raise Failed ( f " { message } { minimum } or greater " if maximum is None else f " { message } between { minimum } and { maximum } " )
return None
2022-03-28 16:15:34 +00:00
2023-01-24 19:07:42 +00:00
def parse_and_or ( error , attribute , data , test_list ) :
2022-12-13 17:15:50 +00:00
out = " "
2023-01-24 19:07:42 +00:00
final = " "
2022-12-13 21:12:59 +00:00
ands = [ d . strip ( ) for d in data . split ( " , " ) ]
2022-12-13 17:15:50 +00:00
for an in ands :
2022-12-13 21:12:59 +00:00
ors = [ a . strip ( ) for a in an . split ( " | " ) ]
2023-01-24 19:07:42 +00:00
or_num = [ ]
2022-12-13 17:15:50 +00:00
for item in ors :
if not item :
raise Failed ( f " { error } Error: Cannot have a blank { attribute } " )
2023-01-24 19:07:42 +00:00
if str ( item ) not in test_list :
2022-12-13 17:15:50 +00:00
raise Failed ( f " { error } Error: { attribute } { item } is invalid " )
2023-01-24 22:03:30 +00:00
or_num . append ( str ( test_list [ str ( item ) ] ) )
2023-01-24 19:07:42 +00:00
if final :
final + = " , "
final + = " | " . join ( or_num )
2022-12-13 17:15:50 +00:00
if out :
out + = f " and "
if len ( ands ) > 1 and len ( ors ) > 1 :
out + = " ( "
if len ( ors ) > 1 :
2023-01-23 19:08:39 +00:00
out + = ' or ' . join ( [ test_list [ test_list [ str ( o ) ] ] if test_list else o for o in ors ] )
2022-12-13 17:15:50 +00:00
else :
2023-01-23 19:08:39 +00:00
out + = test_list [ test_list [ str ( ors [ 0 ] ) ] ] if test_list else ors [ 0 ]
2022-12-13 17:15:50 +00:00
if len ( ands ) > 1 and len ( ors ) > 1 :
out + = " ) "
2023-01-24 19:07:42 +00:00
return out , final
2022-12-13 17:15:50 +00:00
2023-12-07 19:49:32 +00:00
def parse ( error , attribute , data , datatype = None , methods = None , parent = None , default = None , options = None , translation = None , minimum = 1 , maximum = None , regex = None , range_split = None , date_return = None ) :
2022-01-28 18:36:21 +00:00
display = f " { parent + ' ' if parent else ' ' } { attribute } attribute "
if options is None and translation is not None :
options = [ o for o in translation ]
value = data [ methods [ attribute ] ] if methods and attribute in methods else data
2023-12-07 19:49:32 +00:00
if datatype in [ " list " , " commalist " , " strlist " , " lowerlist " , " upperlist " ] :
2022-01-28 21:20:09 +00:00
final_list = [ ]
2022-01-28 18:36:21 +00:00
if value :
2023-12-07 19:49:32 +00:00
if isinstance ( value , dict ) :
2022-04-14 13:29:54 +00:00
raise Failed ( f " { error } Error: { display } { value } must be a list or string " )
2022-01-28 18:36:21 +00:00
if datatype == " commalist " :
value = get_list ( value )
2023-01-27 18:33:22 +00:00
if datatype == " lowerlist " :
value = get_list ( value , lower = True )
2023-12-07 19:49:32 +00:00
if datatype == " upperlist " :
value = get_list ( value , upper = True )
2022-01-28 21:20:09 +00:00
if not isinstance ( value , list ) :
value = [ value ]
for v in value :
2024-01-01 23:19:29 +00:00
if v or v == 0 :
2022-03-25 20:13:51 +00:00
if options is None or ( options and ( v in options or ( datatype == " strlist " and str ( v ) in options ) ) ) :
final_list . append ( str ( v ) if datatype == " strlist " else v )
2022-01-28 21:20:09 +00:00
elif options :
2024-01-15 21:38:54 +00:00
raise Failed ( f " { error } Error: { display } { v } is invalid; Options include: { ' , ' . join ( [ o for o in options ] ) } " )
2022-01-28 21:20:09 +00:00
return final_list
2022-01-28 18:36:21 +00:00
elif datatype == " intlist " :
if value :
try :
return [ int ( v ) for v in value if v ] if isinstance ( value , list ) else [ int ( value ) ]
except ValueError :
pass
return [ ]
2022-01-30 07:48:56 +00:00
elif datatype == " listdict " :
2022-01-28 18:36:21 +00:00
final_list = [ ]
2023-04-05 15:58:46 +00:00
for dict_data in get_list ( value , split = False ) :
2022-01-28 18:36:21 +00:00
if isinstance ( dict_data , dict ) :
2022-02-13 22:47:08 +00:00
final_list . append ( dict_data )
2022-01-28 18:36:21 +00:00
else :
raise Failed ( f " { error } Error: { display } { dict_data } is not a dictionary " )
return final_list
2022-03-19 16:02:52 +00:00
elif datatype in [ " dict " , " dictlist " , " dictdict " , " strdict " , " dictliststr " ] :
2022-01-28 18:36:21 +00:00
if isinstance ( value , dict ) :
2022-01-30 07:48:56 +00:00
if datatype == " dict " :
return value
elif datatype == " dictlist " :
2023-03-21 14:31:43 +00:00
return { k : v if isinstance ( v , list ) else [ v ] if v else [ ] for k , v in value . items ( ) }
2022-03-19 16:02:52 +00:00
elif datatype == " dictliststr " :
return { str ( k ) : [ str ( y ) for y in v ] if isinstance ( v , list ) else [ str ( v ) ] for k , v in value . items ( ) }
2022-03-12 15:34:43 +00:00
elif datatype == " strdict " :
return { str ( k ) : str ( v ) for k , v in value . items ( ) }
2022-01-30 07:48:56 +00:00
else :
final_dict = { }
for dict_key , dict_data in value . items ( ) :
if isinstance ( dict_data , dict ) and dict_data :
2022-03-22 20:06:59 +00:00
new_data = { }
for dict_data_key , dict_data_data in dict_data . items ( ) :
new_data [ str ( dict_data_key ) ] = dict_data_data
final_dict [ dict_key ] = new_data
2022-01-30 07:48:56 +00:00
else :
2022-09-30 15:56:13 +00:00
raise Failed ( f " { error } Warning: { display } { dict_key } is not a dictionary " )
2022-01-30 07:48:56 +00:00
return final_dict
else :
raise Failed ( f " { error } Error: { display } { value } is not a dictionary " )
2022-01-28 18:36:21 +00:00
elif methods and attribute not in methods :
message = f " { display } not found "
elif value is None :
message = f " { display } is blank "
elif regex is not None :
regex_str , example = regex
if re . compile ( regex_str ) . match ( str ( value ) ) :
return str ( value )
else :
message = f " { display } : { value } must match pattern { regex_str } e.g. { example } "
elif datatype == " bool " :
if isinstance ( value , bool ) :
return value
elif isinstance ( value , ( int , float ) ) :
return value > 0
2023-02-23 18:57:00 +00:00
elif str ( value ) . lower ( ) in [ " t " , " true " , " y " , " yes " ] :
2022-01-28 18:36:21 +00:00
return True
2023-02-23 18:57:00 +00:00
elif str ( value ) . lower ( ) in [ " f " , " false " , " n " , " no " ] :
2022-01-28 18:36:21 +00:00
return False
else :
message = f " { display } must be either true or false "
elif datatype in [ " int " , " float " ] :
2022-03-28 16:15:34 +00:00
if range_split :
range_values = str ( value ) . split ( range_split )
if len ( range_values ) == 2 :
2024-01-11 20:02:42 +00:00
start = check_int ( range_values [ 0 ] , datatype = datatype , minimum = minimum , maximum = maximum )
end = check_int ( range_values [ 1 ] , datatype = datatype , minimum = minimum , maximum = maximum )
2022-03-28 16:15:34 +00:00
if start and end and start < end :
return f " { start } { range_split } { end } "
2022-01-28 18:36:21 +00:00
else :
2022-10-27 06:39:30 +00:00
new_value = check_int ( value , datatype = datatype , minimum = minimum , maximum = maximum )
if new_value is not None :
return new_value
2022-03-28 16:15:34 +00:00
message = f " { display } { value } must { ' each ' if range_split else ' ' } be { ' an integer ' if datatype == ' int ' else ' a number ' } "
message = f " { message } { minimum } or greater " if maximum is None else f " { message } between { minimum } and { maximum } "
if range_split :
message = f " { message } separated by a { range_split } "
2023-12-07 19:49:32 +00:00
elif datatype == " date " :
try :
2024-01-15 21:38:54 +00:00
if default in [ " today " , " current " ] :
default = validate_date ( datetime . now ( ) , return_as = date_return )
return validate_date ( datetime . now ( ) if data in [ " today " , " current " ] else data , return_as = date_return )
2023-12-07 19:49:32 +00:00
except Failed as e :
message = f " { e } "
2022-01-28 18:36:21 +00:00
elif ( translation is not None and str ( value ) . lower ( ) not in translation ) or \
( options is not None and translation is None and str ( value ) . lower ( ) not in options ) :
2023-12-12 21:24:37 +00:00
message = f " { display } { value } must be in [ { ' , ' . join ( [ str ( o ) for o in options ] ) } ] "
2022-01-28 18:36:21 +00:00
else :
2023-12-07 19:49:32 +00:00
return translation [ str ( value ) . lower ( ) ] if translation is not None else value
2022-01-28 18:36:21 +00:00
if default is None :
raise Failed ( f " { error } Error: { message } " )
else :
logger . warning ( f " { error } Warning: { message } using { default } as default " )
return translation [ default ] if translation is not None else default
2022-02-13 18:28:53 +00:00
2023-04-05 15:58:46 +00:00
def parse_cords ( data , parent , required = False , err_type = " Overlay " , default = None ) :
dho , dha , dvo , dva = default if default else ( None , None , None , None )
horizontal_align = parse ( err_type , " horizontal_align " , data [ " horizontal_align " ] , parent = parent ,
2022-10-31 16:28:26 +00:00
options = [ " left " , " center " , " right " ] ) if " horizontal_align " in data else None
2023-04-05 15:58:46 +00:00
if horizontal_align is None :
if required :
raise Failed ( f " { err_type } Error: { parent } horizontal_align is required " )
horizontal_align = dha
2022-10-31 16:28:26 +00:00
2023-04-05 15:58:46 +00:00
vertical_align = parse ( err_type , " vertical_align " , data [ " vertical_align " ] , parent = parent ,
2022-10-31 16:28:26 +00:00
options = [ " top " , " center " , " bottom " ] ) if " vertical_align " in data else None
2023-04-05 15:58:46 +00:00
if vertical_align is None :
if required :
raise Failed ( f " { err_type } Error: { parent } vertical_align is required " )
vertical_align = dva
2022-10-28 23:32:48 +00:00
horizontal_offset = None
if " horizontal_offset " in data and data [ " horizontal_offset " ] is not None :
x_off = data [ " horizontal_offset " ]
per = False
if str ( x_off ) . endswith ( " % " ) :
x_off = x_off [ : - 1 ]
per = True
x_off = check_num ( x_off )
2023-04-05 15:58:46 +00:00
error = f " { err_type } Error: { parent } horizontal_offset: { data [ ' horizontal_offset ' ] } must be a number "
2022-10-28 23:32:48 +00:00
if x_off is None :
raise Failed ( error )
if horizontal_align != " center " and not per and x_off < 0 :
raise Failed ( f " { error } 0 or greater " )
elif horizontal_align != " center " and per and ( x_off > 100 or x_off < 0 ) :
raise Failed ( f " { error } between 0% and 100% " )
elif horizontal_align == " center " and per and ( x_off > 50 or x_off < - 50 ) :
raise Failed ( f " { error } between -50% and 50% " )
horizontal_offset = f " { x_off } % " if per else x_off
2023-04-05 15:58:46 +00:00
if horizontal_offset is None :
if required :
raise Failed ( f " { err_type } Error: { parent } horizontal_offset is required " )
horizontal_offset = dho
2022-10-28 23:32:48 +00:00
vertical_offset = None
if " vertical_offset " in data and data [ " vertical_offset " ] is not None :
y_off = data [ " vertical_offset " ]
per = False
if str ( y_off ) . endswith ( " % " ) :
y_off = y_off [ : - 1 ]
per = True
y_off = check_num ( y_off )
2023-04-05 15:58:46 +00:00
error = f " { err_type } Error: { parent } vertical_offset: { data [ ' vertical_offset ' ] } must be a number "
2022-10-28 23:32:48 +00:00
if y_off is None :
raise Failed ( error )
if vertical_align != " center " and not per and y_off < 0 :
raise Failed ( f " { error } 0 or greater " )
elif vertical_align != " center " and per and ( y_off > 100 or y_off < 0 ) :
raise Failed ( f " { error } between 0% and 100% " )
elif vertical_align == " center " and per and ( y_off > 50 or y_off < - 50 ) :
raise Failed ( f " { error } between -50% and 50% " )
vertical_offset = f " { y_off } % " if per else y_off
2023-04-05 15:58:46 +00:00
if vertical_offset is None :
if required :
raise Failed ( f " { err_type } Error: { parent } vertical_offset is required " )
vertical_offset = dvo
2022-10-28 23:32:48 +00:00
2022-11-03 19:44:01 +00:00
return horizontal_offset , horizontal_align , vertical_offset , vertical_align
2022-10-28 23:32:48 +00:00
2022-02-13 18:28:53 +00:00
def replace_label ( _label , _data ) :
replaced = False
if isinstance ( _data , dict ) :
final_data = { }
for sm , sd in _data . items ( ) :
try :
_new_data , _new_replaced = replace_label ( _label , sd )
final_data [ sm ] = _new_data
if _new_replaced :
replaced = True
except Failed :
continue
elif isinstance ( _data , list ) :
final_data = [ ]
for li in _data :
try :
_new_data , _new_replaced = replace_label ( _label , li )
final_data . append ( _new_data )
if _new_replaced :
replaced = True
except Failed :
continue
elif " <<smart_label>> " in str ( _data ) :
final_data = str ( _data ) . replace ( " <<smart_label>> " , _label )
replaced = True
else :
final_data = _data
return final_data , replaced
2022-04-21 18:24:56 +00:00
def check_time ( message , end = False ) :
global previous_time
global start_time
current_time = time . time ( )
if end :
previous_time = start_time
if previous_time is None :
logger . debug ( f " { message } : { current_time } " )
start_time = current_time
else :
logger . debug ( f " { message } : { current_time - previous_time } " )
previous_time = None if end else current_time
2022-05-12 19:10:03 +00:00
2022-05-31 13:29:37 +00:00
system_fonts = [ ]
2022-05-15 03:06:32 +00:00
def get_system_fonts ( ) :
2022-05-31 13:29:37 +00:00
global system_fonts
if not system_fonts :
dirs = [ ]
if sys . platform == " win32 " :
windir = os . environ . get ( " WINDIR " )
if windir :
dirs . append ( os . path . join ( windir , " fonts " ) )
elif sys . platform in ( " linux " , " linux2 " ) :
lindirs = os . environ . get ( " XDG_DATA_DIRS " , " " )
if not lindirs :
lindirs = " /usr/share "
dirs + = [ os . path . join ( lindir , " fonts " ) for lindir in lindirs . split ( " : " ) ]
elif sys . platform == " darwin " :
dirs + = [ " /Library/Fonts " , " /System/Library/Fonts " , os . path . expanduser ( " ~/Library/Fonts " ) ]
else :
return dirs
system_fonts = [ n for d in dirs for _ , _ , ns in os . walk ( d ) for n in ns ]
return system_fonts
2022-05-15 03:06:32 +00:00
2022-05-12 19:10:03 +00:00
class YAML :
2023-01-09 21:22:39 +00:00
def __init__ ( self , path = None , input_data = None , check_empty = False , create = False , start_empty = False ) :
2022-05-12 19:10:03 +00:00
self . path = path
self . input_data = input_data
self . yaml = ruamel . yaml . YAML ( )
2023-12-01 18:07:47 +00:00
self . yaml . width = 100000
2022-05-12 19:10:03 +00:00
self . yaml . indent ( mapping = 2 , sequence = 2 )
try :
if input_data :
self . data = self . yaml . load ( input_data )
else :
2023-01-09 21:22:39 +00:00
if start_empty or ( create and not os . path . exists ( self . path ) ) :
2022-05-12 19:10:03 +00:00
with open ( self . path , ' w ' ) :
pass
self . data = { }
else :
with open ( self . path , encoding = " utf-8 " ) as fp :
self . data = self . yaml . load ( fp )
except ruamel . yaml . error . YAMLError as e :
e = str ( e ) . replace ( " \n " , " \n " )
raise Failed ( f " YAML Error: { e } " )
except Exception as e :
raise Failed ( f " YAML Error: { e } " )
if not self . data or not isinstance ( self . data , dict ) :
if check_empty :
raise Failed ( " YAML Error: File is empty " )
self . data = { }
def save ( self ) :
if self . path :
2022-05-16 13:59:46 +00:00
with open ( self . path , ' w ' , encoding = " utf-8 " ) as fp :
2022-05-12 19:10:03 +00:00
self . yaml . dump ( self . data , fp )