mirror of
https://github.com/meisnate12/Plex-Meta-Manager
synced 2024-11-25 05:50:21 +00:00
274 lines
11 KiB
Python
274 lines
11 KiB
Python
import base64, os, ruamel.yaml, requests
|
|
from lxml import html
|
|
from modules import util
|
|
from modules.poster import ImageData
|
|
from modules.util import Failed
|
|
from requests.exceptions import ConnectionError
|
|
from tenacity import retry, stop_after_attempt, wait_fixed
|
|
from urllib import parse
|
|
|
|
logger = util.logger
|
|
|
|
image_content_types = ["image/png", "image/jpeg", "image/webp"]
|
|
|
|
def get_header(headers, header, language):
|
|
if headers:
|
|
return headers
|
|
else:
|
|
if header and not language:
|
|
language = "en-US,en;q=0.5"
|
|
if language:
|
|
return {
|
|
"Accept-Language": "eng" if language == "default" else language,
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/113.0"
|
|
}
|
|
|
|
|
|
def quote(data):
|
|
return parse.quote(str(data))
|
|
|
|
|
|
def quote_plus(data):
|
|
return parse.quote_plus(str(data))
|
|
|
|
|
|
def parse_qs(data):
|
|
return parse.parse_qs(data)
|
|
|
|
|
|
def urlparse(data):
|
|
return parse.urlparse(str(data))
|
|
|
|
class Version:
|
|
def __init__(self, version_string="Unknown", part_string=""):
|
|
self.full = version_string.replace("develop", "build")
|
|
version_parts = self.full.split("-build")
|
|
self.main = version_parts[0]
|
|
self.build = 0
|
|
self.part = int(part_string) if part_string else 0
|
|
if len(version_parts) > 1:
|
|
self.build = int(version_parts[1])
|
|
|
|
def __bool__(self):
|
|
return self.full != "Unknown"
|
|
|
|
def __repr__(self):
|
|
return str(self)
|
|
|
|
def __str__(self):
|
|
return f"{self.full}.{self.part}" if self.part else self.full
|
|
|
|
class Requests:
|
|
def __init__(self, local, part, env_branch, git_branch, verify_ssl=True):
|
|
self.local = Version(local, part)
|
|
self.env_branch = env_branch
|
|
self.git_branch = git_branch
|
|
self.image_content_types = ["image/png", "image/jpeg", "image/webp"]
|
|
self._nightly = None
|
|
self._develop = None
|
|
self._master = None
|
|
self._branch = None
|
|
self._latest = None
|
|
self._newest = None
|
|
self.session = self.create_session()
|
|
self.global_ssl = verify_ssl
|
|
if not self.global_ssl:
|
|
self.no_verify_ssl()
|
|
|
|
def create_session(self, verify_ssl=True):
|
|
session = requests.Session()
|
|
if not verify_ssl:
|
|
self.no_verify_ssl(session)
|
|
return session
|
|
|
|
def no_verify_ssl(self, session=None):
|
|
if session is None:
|
|
session = self.session
|
|
session.verify = False
|
|
if session.verify is False:
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
def download_image(self, title, image_url, download_directory, session=None, is_poster=True, filename=None):
|
|
response = self.get_image(image_url, session=session)
|
|
new_image = os.path.join(download_directory, f"{filename}") if filename else download_directory
|
|
if response.headers["Content-Type"] == "image/jpeg":
|
|
new_image += ".jpg"
|
|
elif response.headers["Content-Type"] == "image/webp":
|
|
new_image += ".webp"
|
|
else:
|
|
new_image += ".png"
|
|
with open(new_image, "wb") as handler:
|
|
handler.write(response.content)
|
|
return ImageData("asset_directory", new_image, prefix=f"{title}'s ", is_poster=is_poster, is_url=False)
|
|
|
|
def file_yaml(self, path_to_file, check_empty=False, create=False, start_empty=False):
|
|
return YAML(path=path_to_file, check_empty=check_empty, create=create, start_empty=start_empty)
|
|
|
|
def get_yaml(self, url, headers=None, params=None, check_empty=False):
|
|
response = self.get(url, headers=headers, params=params)
|
|
if response.status_code == 401:
|
|
raise Failed(f"URL Error: Unauthorized - {url}")
|
|
if response.status_code == 404:
|
|
raise Failed(f"URL Error: No file found at {url}")
|
|
if response.status_code == 429:
|
|
raise Failed(f"URL Error: Too many requests - {url}")
|
|
if response.status_code >= 400:
|
|
raise Failed(f"URL Error: {response.status_code} on {url}")
|
|
return YAML(input_data=response.content, check_empty=check_empty)
|
|
|
|
def get_image(self, url, session=None):
|
|
response = self.get(url, header=True) if session is None else session.get(url, headers=get_header(None, True, None))
|
|
if response.status_code == 404:
|
|
raise Failed(f"Image Error: Not Found on Image URL: {url}")
|
|
if response.status_code >= 400:
|
|
raise Failed(f"Image Error: {response.status_code} on Image URL: {url}")
|
|
if "Content-Type" not in response.headers or response.headers["Content-Type"] not in self.image_content_types:
|
|
raise Failed("Image Not PNG, JPG, or WEBP")
|
|
return response
|
|
|
|
def get_stream(self, url, location, info="Item"):
|
|
with self.session.get(url, stream=True) as r:
|
|
r.raise_for_status()
|
|
total_length = r.headers.get('content-length')
|
|
if total_length is not None:
|
|
total_length = int(total_length)
|
|
dl = 0
|
|
with open(location, "wb") as f:
|
|
for chunk in r.iter_content(chunk_size=8192):
|
|
dl += len(chunk)
|
|
f.write(chunk)
|
|
logger.ghost(f"Downloading {info}: {dl / total_length * 100:6.2f}%")
|
|
logger.exorcise()
|
|
|
|
def get_html(self, url, headers=None, params=None, header=None, language=None):
|
|
return html.fromstring(self.get(url, headers=headers, params=params, header=header, language=language).content)
|
|
|
|
def get_json(self, url, json=None, headers=None, params=None, header=None, language=None):
|
|
response = self.get(url, json=json, headers=headers, params=params, header=header, language=language)
|
|
try:
|
|
return response.json()
|
|
except ValueError:
|
|
logger.error(str(response.content))
|
|
raise
|
|
|
|
@retry(stop=stop_after_attempt(6), wait=wait_fixed(10))
|
|
def get(self, url, json=None, headers=None, params=None, header=None, language=None):
|
|
return self.session.get(url, json=json, headers=get_header(headers, header, language), params=params)
|
|
|
|
def get_image_encoded(self, url):
|
|
return base64.b64encode(self.get(url).content).decode('utf-8')
|
|
|
|
def post_html(self, url, data=None, json=None, headers=None, header=None, language=None):
|
|
return html.fromstring(self.post(url, data=data, json=json, headers=headers, header=header, language=language).content)
|
|
|
|
def post_json(self, url, data=None, json=None, headers=None, header=None, language=None):
|
|
response = self.post(url, data=data, json=json, headers=headers, header=header, language=language)
|
|
try:
|
|
return response.json()
|
|
except ValueError:
|
|
logger.error(str(response.content))
|
|
raise
|
|
|
|
@retry(stop=stop_after_attempt(6), wait=wait_fixed(10))
|
|
def post(self, url, data=None, json=None, headers=None, header=None, language=None):
|
|
return self.session.post(url, data=data, json=json, headers=get_header(headers, header, language))
|
|
|
|
def has_new_version(self):
|
|
return self.local and self.latest and self.local.main != self.latest.main or (self.local.build and self.local.build < self.latest.build)
|
|
|
|
@property
|
|
def branch(self):
|
|
if self._branch is None:
|
|
if self.git_branch in ["develop", "nightly"]:
|
|
self._branch = self.git_branch
|
|
elif self.env_branch in ["develop", "nightly"]:
|
|
self._branch = self.env_branch
|
|
elif self.local.build > 0:
|
|
if self.local.main != self.develop.main or self.local.build <= self.develop.build:
|
|
self._branch = "develop"
|
|
else:
|
|
self._branch = "nightly"
|
|
else:
|
|
self._branch = "master"
|
|
return self._branch
|
|
|
|
@property
|
|
def latest(self):
|
|
if self._latest is None:
|
|
if self.branch == "develop":
|
|
self._latest = self.develop
|
|
elif self.branch == "nightly":
|
|
self._latest = self.nightly
|
|
elif self.local.build > 0:
|
|
if self.local.main != self.develop.main or self.develop.build >= self.local.build:
|
|
self._latest = self.develop
|
|
self._latest = self.nightly
|
|
else:
|
|
self._latest = self.master
|
|
return self._latest
|
|
|
|
@property
|
|
def newest(self):
|
|
if self._newest is None:
|
|
self._newest = self.latest if self.latest and (self.local.main != self.latest.main or (self.local.build and self.local.build < self.latest.build)) else None
|
|
return self._newest
|
|
|
|
@property
|
|
def master(self):
|
|
if self._master is None:
|
|
self._master = self._version("master")
|
|
return self._master
|
|
|
|
@property
|
|
def develop(self):
|
|
if self._develop is None:
|
|
self._develop = self._version("develop")
|
|
return self._develop
|
|
|
|
@property
|
|
def nightly(self):
|
|
if self._nightly is None:
|
|
self._nightly = self._version("nightly")
|
|
return self._nightly
|
|
|
|
def _version(self, level):
|
|
try:
|
|
url = f"https://raw.githubusercontent.com/Kometa-Team/Kometa/{level}/VERSION"
|
|
return Version(self.get(url).content.decode().strip())
|
|
except ConnectionError:
|
|
return Version()
|
|
|
|
|
|
class YAML:
|
|
def __init__(self, path=None, input_data=None, check_empty=False, create=False, start_empty=False):
|
|
self.path = path
|
|
self.input_data = input_data
|
|
self.yaml = ruamel.yaml.YAML()
|
|
self.yaml.width = 100000
|
|
self.yaml.indent(mapping=2, sequence=2)
|
|
try:
|
|
if input_data:
|
|
self.data = self.yaml.load(input_data)
|
|
else:
|
|
if start_empty or (create and not os.path.exists(self.path)):
|
|
with open(self.path, 'w'):
|
|
pass
|
|
self.data = {}
|
|
else:
|
|
with open(self.path, encoding="utf-8") as fp:
|
|
self.data = self.yaml.load(fp)
|
|
except ruamel.yaml.error.YAMLError as e:
|
|
e = str(e).replace("\n", "\n ")
|
|
raise Failed(f"YAML Error: {e}")
|
|
except Exception as e:
|
|
raise Failed(f"YAML Error: {e}")
|
|
if not self.data or not isinstance(self.data, dict):
|
|
if check_empty:
|
|
raise Failed("YAML Error: File is empty")
|
|
self.data = {}
|
|
|
|
def save(self):
|
|
if self.path:
|
|
with open(self.path, 'w', encoding="utf-8") as fp:
|
|
self.yaml.dump(self.data, fp)
|