Plex-Meta-Manager/modules/request.py
2024-05-29 08:40:00 -04:00

242 lines
9.8 KiB
Python

import base64, os, ruamel.yaml, requests
from lxml import html
from modules import util
from modules.poster import ImageData
from modules.util import Failed
from requests.exceptions import ConnectionError
from retrying import retry
from urllib import parse
logger = util.logger
image_content_types = ["image/png", "image/jpeg", "image/webp"]
def get_header(headers, header, language):
if headers:
return headers
else:
if header and not language:
language = "en-US,en;q=0.5"
if language:
return {
"Accept-Language": "eng" if language == "default" else language,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/113.0"
}
def parse_version(version, text="develop"):
version = version.replace("develop", text)
split_version = version.split(f"-{text}")
return version, split_version[0], int(split_version[1]) if len(split_version) > 1 else 0
def quote(data):
return parse.quote(str(data))
def quote_plus(data):
return parse.quote_plus(str(data))
def parse_qs(data):
return parse.parse_qs(data)
def urlparse(data):
return parse.urlparse(str(data))
class Requests:
def __init__(self, file_version, env_version, git_branch, verify_ssl=True):
self.file_version = file_version
self.env_version = env_version
self.git_branch = git_branch
self.image_content_types = ["image/png", "image/jpeg", "image/webp"]
self.nightly_version = None
self.develop_version = None
self.master_version = None
self.session = self.create_session()
self.global_ssl = verify_ssl
if not self.global_ssl:
self.no_verify_ssl()
self.branch = self.guess_branch()
self.version = (self.file_version[0].replace("develop", self.branch), self.file_version[1].replace("develop", self.branch), self.file_version[2])
self.latest_version = self.current_version(self.version, branch=self.branch)
self.new_version = self.latest_version[0] if self.latest_version and (self.version[1] != self.latest_version[1] or (self.version[2] and self.version[2] < self.latest_version[2])) else None
def create_session(self, verify_ssl=True):
session = requests.Session()
if not verify_ssl:
self.no_verify_ssl(session)
return session
def no_verify_ssl(self, session=None):
if session is None:
session = self.session
session.verify = False
if session.verify is False:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def has_new_version(self):
return self.version[0] != "Unknown" and self.latest_version[0] != "Unknown" and self.version[1] != self.latest_version[1] or (self.version[2] and self.version[2] < self.latest_version[2])
def download_image(self, title, image_url, download_directory, is_poster=True, filename=None):
response = self.get_image(image_url)
new_image = os.path.join(download_directory, f"{filename}") if filename else download_directory
if response.headers["Content-Type"] == "image/jpeg":
new_image += ".jpg"
elif response.headers["Content-Type"] == "image/webp":
new_image += ".webp"
else:
new_image += ".png"
with open(new_image, "wb") as handler:
handler.write(response.content)
return ImageData("asset_directory", new_image, prefix=f"{title}'s ", is_poster=is_poster, is_url=False)
def file_yaml(self, path_to_file, check_empty=False, create=False, start_empty=False):
return YAML(path=path_to_file, check_empty=check_empty, create=create, start_empty=start_empty)
def get_yaml(self, url, headers=None, check_empty=False):
response = self.get(url, headers=headers)
if response.status_code >= 400:
raise Failed(f"URL Error: No file found at {url}")
return YAML(input_data=response.content, check_empty=check_empty)
def get_image(self, url):
response = self.get(url, header=True)
if response.status_code == 404:
raise Failed(f"Image Error: Not Found on Image URL: {url}")
if response.status_code >= 400:
raise Failed(f"Image Error: {response.status_code} on Image URL: {url}")
if "Content-Type" not in response.headers or response.headers["Content-Type"] not in self.image_content_types:
raise Failed("Image Not PNG, JPG, or WEBP")
def get_stream(self, url, location, info="Item"):
with self.session.get(url, stream=True) as r:
r.raise_for_status()
total_length = r.headers.get('content-length')
if total_length is not None:
total_length = int(total_length)
dl = 0
with open(location, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
dl += len(chunk)
f.write(chunk)
logger.ghost(f"Downloading {info}: {dl / total_length * 100:6.2f}%")
logger.exorcise()
def get_html(self, url, headers=None, params=None, header=None, language=None):
return html.fromstring(self.get(url, headers=headers, params=params, header=header, language=language).content)
def get_json(self, url, json=None, headers=None, params=None, header=None, language=None):
response = self.get(url, json=json, headers=headers, params=params, header=header, language=language)
try:
return response.json()
except ValueError:
logger.error(str(response.content))
raise
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def get(self, url, json=None, headers=None, params=None, header=None, language=None):
return self.session.get(url, json=json, headers=get_header(headers, header, language), params=params)
def get_image_encoded(self, url):
return base64.b64encode(self.get(url).content).decode('utf-8')
def post_html(self, url, data=None, json=None, headers=None, header=None, language=None):
return html.fromstring(self.post(url, data=data, json=json, headers=headers, header=header, language=language).content)
def post_json(self, url, data=None, json=None, headers=None, header=None, language=None):
response = self.post(url, data=data, json=json, headers=headers, header=header, language=language)
try:
return response.json()
except ValueError:
logger.error(str(response.content))
raise
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def post(self, url, data=None, json=None, headers=None, header=None, language=None):
return self.session.post(url, data=data, json=json, headers=get_header(headers, header, language))
def guess_branch(self):
if self.git_branch:
return self.git_branch
elif self.env_version in ["nightly", "develop"]:
return self.env_version
elif self.file_version[2] > 0:
dev_version = self.get_develop()
if self.file_version[1] != dev_version[1] or self.file_version[2] <= dev_version[2]:
return "develop"
else:
return "nightly"
else:
return "master"
def current_version(self, version, branch=None):
if branch == "nightly":
return self.get_nightly()
elif branch == "develop":
return self.get_develop()
elif version[2] > 0:
new_version = self.get_develop()
if version[1] != new_version[1] or new_version[2] >= version[2]:
return new_version
return self.get_nightly()
else:
return self.get_master()
def get_nightly(self):
if self.nightly_version is None:
self.nightly_version = self.get_version("nightly")
return self.nightly_version
def get_develop(self):
if self.develop_version is None:
self.develop_version = self.get_version("develop")
return self.develop_version
def get_master(self):
if self.master_version is None:
self.master_version = self.get_version("master")
return self.master_version
def get_version(self, level):
try:
url = f"https://raw.githubusercontent.com/Kometa-Team/Kometa/{level}/VERSION"
return parse_version(self.get(url).content.decode().strip(), text=level)
except ConnectionError:
return "Unknown", "Unknown", 0
class YAML:
def __init__(self, path=None, input_data=None, check_empty=False, create=False, start_empty=False):
self.path = path
self.input_data = input_data
self.yaml = ruamel.yaml.YAML()
self.yaml.width = 100000
self.yaml.indent(mapping=2, sequence=2)
try:
if input_data:
self.data = self.yaml.load(input_data)
else:
if start_empty or (create and not os.path.exists(self.path)):
with open(self.path, 'w'):
pass
self.data = {}
else:
with open(self.path, encoding="utf-8") as fp:
self.data = self.yaml.load(fp)
except ruamel.yaml.error.YAMLError as e:
e = str(e).replace("\n", "\n ")
raise Failed(f"YAML Error: {e}")
except Exception as e:
raise Failed(f"YAML Error: {e}")
if not self.data or not isinstance(self.data, dict):
if check_empty:
raise Failed("YAML Error: File is empty")
self.data = {}
def save(self):
if self.path:
with open(self.path, 'w', encoding="utf-8") as fp:
self.yaml.dump(self.data, fp)