Minor refactor and automatic installation of required packages

This commit is contained in:
Phin 2023-11-04 23:43:42 +05:30
parent d77558eaed
commit a6fdb596db
18 changed files with 200 additions and 178 deletions

View file

@ -3,5 +3,7 @@
.github .github
.gitignore .gitignore
CONTRIBUTING.md CONTRIBUTING.md
data
Dockerfile Dockerfile
pyrightconfig.json pyrightconfig.json
Showcase.psd

View file

@ -55,7 +55,7 @@ jobs:
uses: docker/build-push-action@v4 uses: docker/build-push-action@v4
with: with:
context: . context: .
platforms: linux/amd64,linux/arm64 platforms: linux/amd64,linux/arm64,linux/arm/v7
push: true push: true
provenance: false provenance: false
tags: ${{ env.DOCKER_IMAGE_NAME }}:${{ env.VERSION }},${{ env.DOCKER_IMAGE_NAME }}:latest tags: ${{ env.DOCKER_IMAGE_NAME }}:${{ env.VERSION }},${{ env.DOCKER_IMAGE_NAME }}:latest

View file

@ -7,5 +7,5 @@ WORKDIR /app
COPY requirements.txt . COPY requirements.txt .
RUN pip install -r requirements.txt --no-cache-dir RUN pip install -r requirements.txt --no-cache-dir
COPY . . COPY . .
ENV IN_CONTAINER true ENV DRPP_CONTAINER_DEMOTION_UID=$USER_UID
CMD ["python", "main.py"] CMD ["python", "main.py"]

18
config/constants.py Normal file
View file

@ -0,0 +1,18 @@
import os
import sys
name = "Discord Rich Presence for Plex"
version = "2.3.5"
plexClientID = "discord-rich-presence-plex"
discordClientID = "413407336082833418"
dataDirectoryPath = "data"
configFilePath = os.path.join(dataDirectoryPath, "config.json")
cacheFilePath = os.path.join(dataDirectoryPath, "cache.json")
logFilePath = os.path.join(dataDirectoryPath, "console.log")
isUnix = sys.platform in ["linux", "darwin"]
processID = os.getpid()
isInteractive = sys.stdin and sys.stdin.isatty()
containerDemotionUid = os.environ.get("DRPP_CONTAINER_DEMOTION_UID", "")

0
core/__init__.py Normal file
View file

View file

@ -1,5 +1,5 @@
from store.constants import configFilePath from config.constants import configFilePath
from utils.dict import merge from utils.dict import copyDict
from utils.logging import logger from utils.logging import logger
import json import json
import models.config import models.config
@ -32,7 +32,7 @@ def loadConfig() -> None:
os.rename(configFilePath, configFilePath.replace(".json", f"-{time.time():.0f}.json")) os.rename(configFilePath, configFilePath.replace(".json", f"-{time.time():.0f}.json"))
logger.exception("Failed to parse the application's config file. A new one will be created.") logger.exception("Failed to parse the application's config file. A new one will be created.")
else: else:
merge(loadedConfig, config) copyDict(loadedConfig, config)
saveConfig() saveConfig()
def saveConfig() -> None: def saveConfig() -> None:

View file

@ -1,6 +1,4 @@
# pyright: reportOptionalMemberAccess=none from config.constants import discordClientID, isUnix, processID
from store.constants import discordClientID, isUnix, processID
from typing import Any, Optional from typing import Any, Optional
from utils.logging import logger from utils.logging import logger
import asyncio import asyncio
@ -10,36 +8,27 @@ import os
import struct import struct
import time import time
class DiscordRpcService: class DiscordIpcService:
ipcPipe = (((os.path.isdir("/run/app") and "/run/app") or os.environ.get("XDG_RUNTIME_DIR", None) or os.environ.get("TMPDIR", None) or os.environ.get("TMP", None) or os.environ.get("TEMP", None) or "/tmp") + "/discord-ipc-0") if isUnix else r"\\?\pipe\discord-ipc-0" def __init__(self):
self.ipcPipe = ("/run/app" if os.path.isdir("/run/app") else os.environ.get("XDG_RUNTIME_DIR", os.environ.get("TMPDIR", os.environ.get("TMP", os.environ.get("TEMP", "/tmp"))))) if isUnix else r"\\?\pipe\discord-ipc-0"
def __init__(self) -> None: self.loop: asyncio.AbstractEventLoop = None # pyright: ignore[reportGeneralTypeIssues]
self.loop: Optional[asyncio.AbstractEventLoop] = None self.pipeReader: asyncio.StreamReader = None # pyright: ignore[reportGeneralTypeIssues]
self.pipeReader: Optional[asyncio.StreamReader] = None self.pipeWriter: asyncio.StreamWriter = None # pyright: ignore[reportGeneralTypeIssues]
self.pipeWriter: Optional[Any] = None
self.connected = False self.connected = False
def connect(self) -> None:
if self.connected:
logger.debug("Attempt to connect Discord IPC Pipe while already connected")
return
logger.info("Connecting Discord IPC Pipe")
self.loop = asyncio.new_event_loop()
self.loop.run_until_complete(self.handshake())
async def handshake(self) -> None: async def handshake(self) -> None:
try: try:
if isUnix: if isUnix:
self.pipeReader, self.pipeWriter = await asyncio.open_unix_connection(self.ipcPipe) # type: ignore self.pipeReader, self.pipeWriter = await asyncio.open_unix_connection(self.ipcPipe) # pyright: ignore[reportGeneralTypeIssues,reportUnknownMemberType]
else: else:
self.pipeReader = asyncio.StreamReader() self.pipeReader = asyncio.StreamReader()
self.pipeWriter, _ = await self.loop.create_pipe_connection(lambda: asyncio.StreamReaderProtocol(self.pipeReader), self.ipcPipe) # type: ignore self.pipeWriter = (await self.loop.create_pipe_connection(lambda: asyncio.StreamReaderProtocol(self.pipeReader), self.ipcPipe))[0] # pyright: ignore[reportGeneralTypeIssues,reportUnknownMemberType]
self.write(0, { "v": 1, "client_id": discordClientID }) self.write(0, { "v": 1, "client_id": discordClientID })
if await self.read(): if await self.read():
self.connected = True self.connected = True
except: except:
logger.exception("An unexpected error occured during a RPC handshake operation") logger.exception("An unexpected error occured during an IPC handshake operation")
async def read(self) -> Optional[Any]: async def read(self) -> Optional[Any]:
try: try:
@ -48,7 +37,7 @@ class DiscordRpcService:
logger.debug("[READ] %s", data) logger.debug("[READ] %s", data)
return data return data
except: except:
logger.exception("An unexpected error occured during a RPC read operation") logger.exception("An unexpected error occured during an IPC read operation")
self.connected = False self.connected = False
def write(self, op: int, payload: Any) -> None: def write(self, op: int, payload: Any) -> None:
@ -57,26 +46,34 @@ class DiscordRpcService:
payload = json.dumps(payload) payload = json.dumps(payload)
self.pipeWriter.write(struct.pack("<ii", op, len(payload)) + payload.encode("utf-8")) self.pipeWriter.write(struct.pack("<ii", op, len(payload)) + payload.encode("utf-8"))
except: except:
logger.exception("An unexpected error occured during a RPC write operation") logger.exception("An unexpected error occured during an IPC write operation")
self.connected = False self.connected = False
def connect(self) -> None:
if self.connected:
logger.debug("Attempt to connect Discord IPC pipe while already connected")
return
logger.info("Connecting Discord IPC pipe")
self.loop = asyncio.new_event_loop()
self.loop.run_until_complete(self.handshake())
def disconnect(self) -> None: def disconnect(self) -> None:
if not self.connected: if not self.connected:
logger.debug("Attempt to disconnect Discord IPC Pipe while not connected") logger.debug("Attempt to disconnect Discord IPC pipe while not connected")
return return
logger.info("Disconnecting Discord IPC Pipe") logger.info("Disconnecting Discord IPC pipe")
try: try:
self.pipeWriter.close() self.pipeWriter.close()
except: except:
logger.exception("An unexpected error occured while closing an IPC pipe writer") logger.exception("An unexpected error occured while closing the IPC pipe writer")
try: try:
self.loop.run_until_complete(self.pipeReader.read()) self.loop.run_until_complete(self.pipeReader.read())
except: except:
logger.exception("An unexpected error occured while closing an IPC pipe reader") logger.exception("An unexpected error occured while closing the IPC pipe reader")
try: try:
self.loop.close() self.loop.close()
except: except:
logger.exception("An unexpected error occured while closing an asyncio event loop") logger.exception("An unexpected error occured while closing the asyncio event loop")
self.connected = False self.connected = False
def setActivity(self, activity: models.discord.Activity) -> None: def setActivity(self, activity: models.discord.Activity) -> None:

View file

@ -1,10 +1,10 @@
from services.config import config from .config import config
from typing import Optional from typing import Optional
from utils.logging import logger from utils.logging import logger
import models.imgur import models.imgur
import requests import requests
def uploadImage(url: str) -> Optional[str]: def uploadToImgur(url: str) -> Optional[str]:
try: try:
data: models.imgur.UploadResponse = requests.post( data: models.imgur.UploadResponse = requests.post(
"https://api.imgur.com/3/image", "https://api.imgur.com/3/image",
@ -15,4 +15,4 @@ def uploadImage(url: str) -> Optional[str]:
raise Exception(data["data"]["error"]) raise Exception(data["data"]["error"])
return data["data"]["link"] return data["data"]["link"]
except: except:
logger.exception("An unexpected error occured while uploading an image") logger.exception("An unexpected error occured while uploading an image to Imgur")

View file

@ -1,17 +1,16 @@
# pyright: reportTypedDictNotRequiredAccess=none,reportUnknownArgumentType=none,reportUnknownMemberType=none # pyright: reportUnknownArgumentType=none,reportUnknownMemberType=none,reportUnknownVariableType=none
from .DiscordRpcService import DiscordRpcService
from .cache import getKey, setKey
from .config import config from .config import config
from .imgur import uploadImage from .discord import DiscordIpcService
from .imgur import uploadToImgur
from plexapi.alert import AlertListener from plexapi.alert import AlertListener
from plexapi.base import Playable, PlexPartialObject from plexapi.base import Playable, PlexPartialObject
from plexapi.media import Genre, GuidTag from plexapi.media import Genre, GuidTag
from plexapi.myplex import MyPlexAccount, PlexServer from plexapi.myplex import MyPlexAccount, PlexServer
from typing import Optional from typing import Optional
from utils.cache import getCacheKey, setCacheKey
from utils.logging import LoggerWithPrefix from utils.logging import LoggerWithPrefix
from utils.text import formatSeconds from utils.text import formatSeconds
import hashlib
import models.config import models.config
import models.discord import models.discord
import models.plex import models.plex
@ -30,8 +29,8 @@ class PlexAlertListener(threading.Thread):
self.daemon = True self.daemon = True
self.token = token self.token = token
self.serverConfig = serverConfig self.serverConfig = serverConfig
self.logger = LoggerWithPrefix(f"[{self.serverConfig['name']}/{hashlib.md5(str(id(self)).encode('UTF-8')).hexdigest()[:5].upper()}] ") self.logger = LoggerWithPrefix(f"[{self.serverConfig['name']}] ") # pyright: ignore[reportTypedDictNotRequiredAccess]
self.discordRpcService = DiscordRpcService() self.discordIpcService = DiscordIpcService()
self.updateTimeoutTimer: Optional[threading.Timer] = None self.updateTimeoutTimer: Optional[threading.Timer] = None
self.connectionTimeoutTimer: Optional[threading.Timer] = None self.connectionTimeoutTimer: Optional[threading.Timer] = None
self.account: Optional[MyPlexAccount] = None self.account: Optional[MyPlexAccount] = None
@ -48,7 +47,7 @@ class PlexAlertListener(threading.Thread):
self.logger.info("Signing into Plex") self.logger.info("Signing into Plex")
self.account = MyPlexAccount(token = self.token) self.account = MyPlexAccount(token = self.token)
self.logger.info("Signed in as Plex user \"%s\"", self.account.username) self.logger.info("Signed in as Plex user \"%s\"", self.account.username)
self.listenForUser = self.serverConfig.get("listenForUser", self.account.username) self.listenForUser = self.serverConfig.get("listenForUser", "") or self.account.username
self.server = None self.server = None
for resource in self.account.resources(): for resource in self.account.resources():
if resource.product == self.productName and resource.name.lower() == self.serverConfig["name"].lower(): if resource.product == self.productName and resource.name.lower() == self.serverConfig["name"].lower():
@ -71,7 +70,7 @@ class PlexAlertListener(threading.Thread):
self.logger.error("%s \"%s\" not found", self.productName, self.serverConfig["name"]) self.logger.error("%s \"%s\" not found", self.productName, self.serverConfig["name"])
break break
except Exception as e: except Exception as e:
self.logger.error("Failed to connect to %s \"%s\": %s", self.productName, self.serverConfig["name"], e) self.logger.error("Failed to connect to %s \"%s\": %s", self.productName, self.serverConfig["name"], e) # pyright: ignore[reportTypedDictNotRequiredAccess]
self.logger.error("Reconnecting in 10 seconds") self.logger.error("Reconnecting in 10 seconds")
time.sleep(10) time.sleep(10)
@ -93,7 +92,7 @@ class PlexAlertListener(threading.Thread):
def disconnectRpc(self) -> None: def disconnectRpc(self) -> None:
self.lastState, self.lastSessionKey, self.lastRatingKey = "", 0, 0 self.lastState, self.lastSessionKey, self.lastRatingKey = "", 0, 0
self.discordRpcService.disconnect() self.discordIpcService.disconnect()
self.cancelTimers() self.cancelTimers()
def cancelTimers(self) -> None: def cancelTimers(self) -> None:
@ -210,11 +209,11 @@ class PlexAlertListener(threading.Thread):
return return
thumbUrl = "" thumbUrl = ""
if thumb and config["display"]["posters"]["enabled"]: if thumb and config["display"]["posters"]["enabled"]:
thumbUrl = getKey(thumb) thumbUrl = getCacheKey(thumb)
if not thumbUrl: if not thumbUrl:
self.logger.debug("Uploading image") self.logger.debug("Uploading image to Imgur")
thumbUrl = uploadImage(self.server.url(thumb, True)) thumbUrl = uploadToImgur(self.server.url(thumb, True))
setKey(thumb, thumbUrl) setCacheKey(thumb, thumbUrl)
activity: models.discord.Activity = { activity: models.discord.Activity = {
"details": title[:128], "details": title[:128],
"state": stateText[:128], "state": stateText[:128],
@ -260,9 +259,9 @@ class PlexAlertListener(threading.Thread):
activity["timestamps"] = {"end": round(currentTimestamp + ((item.duration - viewOffset) / 1000))} activity["timestamps"] = {"end": round(currentTimestamp + ((item.duration - viewOffset) / 1000))}
else: else:
activity["timestamps"] = {"start": round(currentTimestamp - (viewOffset / 1000))} activity["timestamps"] = {"start": round(currentTimestamp - (viewOffset / 1000))}
if not self.discordRpcService.connected: if not self.discordIpcService.connected:
self.discordRpcService.connect() self.discordIpcService.connect()
if self.discordRpcService.connected: if self.discordIpcService.connected:
self.discordRpcService.setActivity(activity) self.discordIpcService.setActivity(activity)
except: except:
self.logger.exception("An unexpected error occured in the alert handler") self.logger.exception("An unexpected error occured in the Plex alert handler")

152
main.py
View file

@ -1,31 +1,44 @@
from store.constants import isUnix from config.constants import isUnix, containerDemotionUid
import os import os
import subprocess
if isUnix and os.environ.get("IN_CONTAINER", "") == "true":
uid = 10000
os.system(f"chown -R {uid}:{uid} /app")
os.setuid(uid)
from services import PlexAlertListener
from services.cache import loadCache
from services.config import config, loadConfig, saveConfig
from store.constants import dataFolderPath, logFilePath, name, plexClientID, version
from utils.logging import formatter, logger
import logging
import requests
import sys import sys
if isUnix and containerDemotionUid:
uid = int(containerDemotionUid)
os.system(f"chown -R {uid}:{uid} /app")
os.setuid(uid) # pyright: ignore[reportGeneralTypeIssues,reportUnknownMemberType]
else:
def parsePipPackages(packagesStr: str) -> dict[str, str]:
return { packageSplit[0]: packageSplit[1] for packageSplit in [package.split("==") for package in packagesStr.splitlines()] }
pipFreezeResult = subprocess.run([sys.executable, "-m", "pip", "freeze"], capture_output = True, text = True)
installedPackages = parsePipPackages(pipFreezeResult.stdout)
with open("requirements.txt", "r", encoding = "UTF-8") as requirementsFile:
requiredPackages = parsePipPackages(requirementsFile.read())
for packageName, packageVersion in requiredPackages.items():
if packageName not in installedPackages:
print(f"Required package '{packageName}' not found, installing...")
subprocess.run([sys.executable, "-m", "pip", "install", f"{packageName}=={packageVersion}"], check = True)
from config.constants import dataDirectoryPath, logFilePath, name, plexClientID, version, isInteractive
from core.config import config, loadConfig, saveConfig
from core.discord import DiscordIpcService
from core.plex import PlexAlertListener
from typing import Optional
from utils.cache import loadCache
from utils.logging import formatter, logger
from utils.text import formatSeconds
import logging
import models.config
import requests
import time import time
import urllib.parse import urllib.parse
isInteractive = sys.stdin and sys.stdin.isatty() def main() -> None:
plexAlertListeners: list[PlexAlertListener] = [] if not os.path.exists(dataDirectoryPath):
os.mkdir(dataDirectoryPath)
try:
if not os.path.exists(dataFolderPath):
os.mkdir(dataFolderPath)
for oldFilePath in ["config.json", "cache.json", "console.log"]: for oldFilePath in ["config.json", "cache.json", "console.log"]:
if os.path.isfile(oldFilePath): if os.path.isfile(oldFilePath):
os.rename(oldFilePath, os.path.join(dataFolderPath, oldFilePath)) os.rename(oldFilePath, os.path.join(dataDirectoryPath, oldFilePath))
loadConfig() loadConfig()
if config["logging"]["debug"]: if config["logging"]["debug"]:
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@ -33,46 +46,71 @@ try:
fileHandler = logging.FileHandler(logFilePath) fileHandler = logging.FileHandler(logFilePath)
fileHandler.setFormatter(formatter) fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler) logger.addHandler(fileHandler)
os.system("clear" if isUnix else "cls")
logger.info("%s - v%s", name, version) logger.info("%s - v%s", name, version)
loadCache() loadCache()
if len(config["users"]) == 0: if not config["users"]:
logger.info("No users found in the config file. Initiating authentication flow.") logger.info("No users found in the config file. Initiating authentication flow.")
response = requests.post("https://plex.tv/api/v2/pins.json?strong=true", headers = { user = authUser()
"X-Plex-Product": name, if not user:
exit()
config["users"].append(user)
saveConfig()
plexAlertListeners = [PlexAlertListener(user["token"], server) for user in config["users"] for server in user["servers"]]
try:
if isInteractive:
while True:
userInput = input()
if userInput in ["exit", "quit"]:
raise KeyboardInterrupt
else:
while True:
time.sleep(3600)
except KeyboardInterrupt:
for plexAlertListener in plexAlertListeners:
plexAlertListener.disconnect()
def authUser() -> Optional[models.config.User]:
response = requests.post("https://plex.tv/api/v2/pins.json?strong=true", headers = {
"X-Plex-Product": name,
"X-Plex-Client-Identifier": plexClientID,
}).json()
logger.info("Open the below URL in your web browser and sign in:")
logger.info("https://app.plex.tv/auth#?clientID=%s&code=%s&context%%5Bdevice%%5D%%5Bproduct%%5D=%s", plexClientID, response["code"], urllib.parse.quote(name))
time.sleep(5)
for i in range(35):
logger.info(f"Checking whether authentication is successful... ({formatSeconds((i + 1) * 5)})")
authCheckResponse = requests.get(f"https://plex.tv/api/v2/pins/{response['id']}.json?code={response['code']}", headers = {
"X-Plex-Client-Identifier": plexClientID, "X-Plex-Client-Identifier": plexClientID,
}).json() }).json()
logger.info("Open the below URL in your web browser and sign in:") if authCheckResponse["authToken"]:
logger.info("https://app.plex.tv/auth#?clientID=%s&code=%s&context%%5Bdevice%%5D%%5Bproduct%%5D=%s", plexClientID, response["code"], urllib.parse.quote(name)) logger.info("Authentication successful")
serverName = os.environ.get("PLEX_SERVER_NAME")
if not serverName:
serverName = input("Enter the name of the Plex Media Server you wish to connect to: ") if isInteractive else "ServerName"
return { "token": authCheckResponse["authToken"], "servers": [{ "name": serverName }] }
time.sleep(5) time.sleep(5)
logger.info("Checking whether authentication is successful...")
for _ in range(120):
authCheckResponse = requests.get(f"https://plex.tv/api/v2/pins/{response['id']}.json?code={response['code']}", headers = {
"X-Plex-Client-Identifier": plexClientID,
}).json()
if authCheckResponse["authToken"]:
logger.info("Authentication successful.")
serverName = os.environ.get("PLEX_SERVER_NAME")
if not serverName:
serverName = input("Enter the name of the Plex Media Server you wish to connect to: ") if isInteractive else "ServerName"
config["users"].append({ "token": authCheckResponse["authToken"], "servers": [{ "name": serverName }] })
saveConfig()
break
time.sleep(5)
else:
logger.info("Authentication failed.")
exit()
plexAlertListeners = [PlexAlertListener(user["token"], server) for user in config["users"] for server in user["servers"]]
if isInteractive:
while True:
userInput = input()
if userInput in ["exit", "quit"]:
raise KeyboardInterrupt
else: else:
while True: logger.info(f"Authentication timed out ({formatSeconds(180)})")
time.sleep(3600)
except KeyboardInterrupt: def testIpc() -> None:
for plexAlertListener in plexAlertListeners: logger.info("Testing Discord IPC connection")
plexAlertListener.disconnect() discordIpcService = DiscordIpcService()
except: discordIpcService.connect()
logger.exception("An unexpected error occured") discordIpcService.setActivity({
"details": "details",
"state": "state",
"assets": {
"large_text": "large_text",
"large_image": "logo",
"small_text": "small_text",
"small_image": "playing",
},
})
time.sleep(15)
discordIpcService.disconnect()
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "test-ipc":
testIpc()
else:
main()

View file

@ -1,2 +0,0 @@
from .DiscordRpcService import DiscordRpcService as DiscordRpcService
from .PlexAlertListener import PlexAlertListener as PlexAlertListener

View file

@ -1,28 +0,0 @@
from store.constants import cacheFilePath
from typing import Any
from utils.logging import logger
import json
import os
import time
cache: dict[str, Any] = {}
def loadCache() -> None:
if os.path.isfile(cacheFilePath):
try:
with open(cacheFilePath, "r", encoding = "UTF-8") as cacheFile:
cache.update(json.load(cacheFile))
except:
os.rename(cacheFilePath, cacheFilePath.replace(".json", f"-{time.time():.0f}.json"))
logger.exception("Failed to parse the application's cache file. A new one will be created.")
def getKey(key: str) -> Any:
return cache.get(key)
def setKey(key: str, value: Any) -> None:
cache[key] = value
try:
with open(cacheFilePath, "w", encoding = "UTF-8") as cacheFile:
json.dump(cache, cacheFile, separators = (",", ":"))
except:
logger.exception("Failed to write to the application's cache file.")

View file

@ -1,16 +0,0 @@
import os
import sys
name = "Discord Rich Presence for Plex"
version = "2.3.5"
plexClientID = "discord-rich-presence-plex"
discordClientID = "413407336082833418"
dataFolderPath = "data"
configFilePath = os.path.join(dataFolderPath, "config.json")
cacheFilePath = os.path.join(dataFolderPath, "cache.json")
logFilePath = os.path.join(dataFolderPath, "console.log")
isUnix = sys.platform in ["linux", "darwin"]
processID = os.getpid()

16
test.py
View file

@ -1,16 +0,0 @@
from services import DiscordRpcService
import time
discordRpcService = DiscordRpcService()
discordRpcService.connect()
discordRpcService.setActivity({
"details": "details",
"state": "state",
"assets": {
"large_text": "large_text",
"large_image": "logo",
"small_text": "small_text",
"small_image": "playing",
},
})
time.sleep(60)

29
utils/cache.py Normal file
View file

@ -0,0 +1,29 @@
from .logging import logger
from config.constants import cacheFilePath
from typing import Any
import json
import os
import time
cache: dict[str, Any] = {}
def loadCache() -> None:
if not os.path.isfile(cacheFilePath):
return
try:
with open(cacheFilePath, "r", encoding = "UTF-8") as cacheFile:
cache.update(json.load(cacheFile))
except:
os.rename(cacheFilePath, cacheFilePath.replace(".json", f"-{time.time():.0f}.json"))
logger.exception("Failed to parse the application's cache file. A new one will be created.")
def getCacheKey(key: str) -> Any:
return cache.get(key)
def setCacheKey(key: str, value: Any) -> None:
cache[key] = value
try:
with open(cacheFilePath, "w", encoding = "UTF-8") as cacheFile:
json.dump(cache, cacheFile, separators = (",", ":"))
except:
logger.exception("Failed to write to the application's cache file.")

View file

@ -1,8 +1,8 @@
from typing import Any from typing import Any
def merge(source: Any, target: Any) -> None: def copyDict(source: Any, target: Any) -> None:
for key, value in source.items(): for key, value in source.items():
if isinstance(value, dict): if isinstance(value, dict):
merge(value, target.setdefault(key, {})) copyDict(value, target.setdefault(key, {}))
else: else:
target[key] = source[key] target[key] = value

View file

@ -1,7 +1,8 @@
from config.constants import name
from typing import Any, Callable from typing import Any, Callable
import logging import logging
logger = logging.getLogger("discord-rich-presence-plex") logger = logging.getLogger(name)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] %(message)s", datefmt = "%d-%m-%Y %I:%M:%S %p") formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] %(message)s", datefmt = "%d-%m-%Y %I:%M:%S %p")
streamHandler = logging.StreamHandler() streamHandler = logging.StreamHandler()