discord-rich-presence-plex/main.py

141 lines
5.4 KiB
Python
Raw Normal View History

from config.constants import isInContainer, runtimeDirectory, uid, gid, containerCwd, noRuntimeDirChown
2024-02-06 20:10:41 +00:00
from utils.logging import logger
2023-10-01 13:41:42 +00:00
import os
import sys
2023-10-01 13:41:42 +00:00
if isInContainer:
if not os.path.isdir(runtimeDirectory):
2024-02-06 20:10:41 +00:00
logger.error(f"Runtime directory does not exist. Ensure that it is mounted into the container at {runtimeDirectory}")
exit(1)
if os.geteuid() == 0: # pyright: ignore[reportGeneralTypeIssues,reportUnknownMemberType]
if uid == -1 or gid == -1:
logger.warning(f"Environment variable(s) DRPP_UID and/or DRPP_GID are/is not set. Manually ensure appropriate ownership of {runtimeDirectory}")
statResult = os.stat(runtimeDirectory)
uid, gid = statResult.st_uid, statResult.st_gid
else:
if noRuntimeDirChown:
logger.warning(f"DRPP_NO_RUNTIME_DIR_CHOWN is set to true. Manually ensure appropriate ownership of {runtimeDirectory}")
else:
os.system(f"chmod 700 {runtimeDirectory}")
os.system(f"chown -R {uid}:{gid} {runtimeDirectory}")
os.system(f"chown -R {uid}:{gid} {containerCwd}")
os.setgid(gid) # pyright: ignore[reportGeneralTypeIssues,reportUnknownMemberType]
os.setuid(uid) # pyright: ignore[reportGeneralTypeIssues,reportUnknownMemberType]
else:
logger.warning(f"Not running as the superuser. Manually ensure appropriate ownership of mounted contents")
else:
2023-11-05 05:24:45 +00:00
try:
import subprocess
def parsePipPackages(packagesStr: str) -> dict[str, str]:
return { packageSplit[0]: packageSplit[1] if len(packageSplit) > 1 else "" for packageSplit in [package.split("==") for package in packagesStr.splitlines()] }
pipFreezeResult = subprocess.run([sys.executable, "-m", "pip", "freeze"], stdout = subprocess.PIPE, text = True, check = True)
installedPackages = parsePipPackages(pipFreezeResult.stdout)
with open("requirements.txt", "r", encoding = "UTF-8") as requirementsFile:
requiredPackages = parsePipPackages(requirementsFile.read())
for packageName, packageVersion in requiredPackages.items():
if packageName not in installedPackages:
package = f"{packageName}{f'=={packageVersion}' if packageVersion else ''}"
2024-02-06 20:10:41 +00:00
logger.info(f"Installing missing dependency: {package}")
2023-11-05 05:24:45 +00:00
subprocess.run([sys.executable, "-m", "pip", "install", "-U", package], check = True)
except Exception as e:
2024-02-06 20:10:41 +00:00
logger.exception("An unexpected error occured during automatic installation of dependencies. Install them manually by running the following command: python -m pip install -U -r requirements.txt")
2023-10-01 13:41:42 +00:00
2023-11-05 05:24:45 +00:00
from config.constants import dataDirectoryPath, logFilePath, name, version, isInteractive
from core.config import config, loadConfig, saveConfig
from core.discord import DiscordIpcService
2023-11-05 05:24:45 +00:00
from core.plex import PlexAlertListener, initiateAuth, getAuthToken
from typing import Optional
from utils.cache import loadCache
2024-02-06 20:10:41 +00:00
from utils.logging import formatter
from utils.text import formatSeconds
2022-05-10 20:23:12 +00:00
import logging
import models.config
import time
2022-05-10 20:23:12 +00:00
def init() -> None:
2024-02-06 20:10:41 +00:00
if not os.path.isdir(dataDirectoryPath):
os.makedirs(dataDirectoryPath)
2023-10-01 13:41:42 +00:00
for oldFilePath in ["config.json", "cache.json", "console.log"]:
if os.path.isfile(oldFilePath):
os.rename(oldFilePath, os.path.join(dataDirectoryPath, oldFilePath))
2022-05-22 17:03:17 +00:00
loadConfig()
if config["logging"]["debug"]:
logger.setLevel(logging.DEBUG)
if config["logging"]["writeToFile"]:
fileHandler = logging.FileHandler(logFilePath)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
logger.info("%s - v%s", name, version)
loadCache()
def main() -> None:
init()
if not config["users"]:
2023-11-04 19:39:30 +00:00
logger.info("No users found in the config file")
2023-11-05 05:24:45 +00:00
user = authNewUser()
if not user:
exit(1)
config["users"].append(user)
saveConfig()
plexAlertListeners = [PlexAlertListener(user["token"], server) for user in config["users"] for server in user["servers"]]
try:
if isInteractive:
while True:
userInput = input()
if userInput in ["exit", "quit"]:
raise KeyboardInterrupt
else:
while True:
time.sleep(3600)
except KeyboardInterrupt:
for plexAlertListener in plexAlertListeners:
plexAlertListener.disconnect()
2023-11-05 05:24:45 +00:00
def authNewUser() -> Optional[models.config.User]:
id, code, url = initiateAuth()
logger.info("Open the below URL in your web browser and sign in:")
2023-11-05 05:24:45 +00:00
logger.info(url)
time.sleep(5)
for i in range(35):
2023-11-05 05:24:45 +00:00
logger.info(f"Checking whether authentication is successful ({formatSeconds((i + 1) * 5)})")
authToken = getAuthToken(id, code)
if authToken:
logger.info("Authentication successful")
2024-02-06 20:10:41 +00:00
serverName = os.environ.get("DRPP_PLEX_SERVER_NAME_INPUT")
if not serverName:
serverName = input("Enter the name of the Plex Media Server you wish to connect to: ") if isInteractive else "ServerName"
2023-11-05 05:24:45 +00:00
return { "token": authToken, "servers": [{ "name": serverName }] }
2022-05-11 02:19:49 +00:00
time.sleep(5)
2022-05-22 17:03:17 +00:00
else:
logger.info(f"Authentication timed out ({formatSeconds(180)})")
2024-02-06 20:10:41 +00:00
def testIpc(pipeNumber: int) -> None:
init()
logger.info("Testing Discord IPC connection")
2024-02-06 20:10:41 +00:00
discordIpcService = DiscordIpcService(pipeNumber)
discordIpcService.connect()
discordIpcService.setActivity({
"details": "details",
"state": "state",
"assets": {
"large_text": "large_text",
"large_image": "logo",
"small_text": "small_text",
"small_image": "playing",
},
})
time.sleep(15)
discordIpcService.disconnect()
if __name__ == "__main__":
mode = sys.argv[1] if len(sys.argv) > 1 else ""
try:
if not mode:
main()
elif mode == "test-ipc":
testIpc(int(sys.argv[2]) if len(sys.argv) > 2 else -1)
else:
2024-02-06 20:10:41 +00:00
logger.error(f"Invalid mode: {mode}")
except KeyboardInterrupt:
pass