diff --git a/.dockerignore b/.dockerignore index 1415f44..f203766 100644 --- a/.dockerignore +++ b/.dockerignore @@ -3,7 +3,10 @@ .github .gitignore CONTRIBUTING.md -data Dockerfile +lint.bat pyrightconfig.json + +__pycache__ assets/showcase.psd +data diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 2990f5d..b255ebd 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -18,7 +18,7 @@ jobs: - name: Create release ZIP file run: |- mv .github/release-notes ../ - rm -rf .git .github .gitignore .dockerignore Dockerfile CONTRIBUTING.md pyrightconfig.json + xargs rm -rf < .dockerignore mkdir ${{ github.event.repository.name }} mv * ${{ github.event.repository.name }} || true zip -r $RELEASE_ZIP_FILENAME ${{ github.event.repository.name }} diff --git a/.gitignore b/.gitignore index 79a86c1..7d71af6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ -data +__pycache__ assets/showcase.psd +data diff --git a/config/constants.py b/config/constants.py index 9e6e198..a55a961 100644 --- a/config/constants.py +++ b/config/constants.py @@ -15,6 +15,7 @@ logFilePath = os.path.join(dataDirectoryPath, "console.log") isUnix = sys.platform in ["linux", "darwin"] processID = os.getpid() isInteractive = sys.stdin and sys.stdin.isatty() +plexServerNameInput = os.environ.get("DRPP_PLEX_SERVER_NAME_INPUT") noPipInstall = os.environ.get("DRPP_NO_PIP_INSTALL", "") == "true" isInContainer = os.environ.get("DRPP_IS_IN_CONTAINER", "") == "true" runtimeDirectory = "/run/app" if isInContainer else os.environ.get("XDG_RUNTIME_DIR", os.environ.get("TMPDIR", os.environ.get("TMP", os.environ.get("TEMP", "/tmp")))) diff --git a/core/config.py b/core/config.py index c83e486..e77fea9 100644 --- a/core/config.py +++ b/core/config.py @@ -49,9 +49,9 @@ def loadConfig() -> None: try: with open(configFilePath, "r", encoding = "UTF-8") as configFile: if configFileType == "yaml": - loadedConfig = yaml.safe_load(configFile) or {} + loadedConfig = yaml.safe_load(configFile) or {} # pyright: ignore[reportUnknownVariableType] else: - loadedConfig = json.load(configFile) or {} + loadedConfig = json.load(configFile) or {} # pyright: ignore[reportUnknownVariableType] except: os.rename(configFilePath, f"{configFilePathBase}-{time.time():.0f}.{configFileExtension}") logger.exception("Failed to parse the config file. A new one will be created.") diff --git a/core/imgur.py b/core/imgur.py index e281541..ae4b4dc 100644 --- a/core/imgur.py +++ b/core/imgur.py @@ -6,12 +6,13 @@ import io import models.imgur import requests -def uploadToImgur(url: str, maxSize: int = 0) -> Optional[str]: +def uploadToImgur(url: str) -> Optional[str]: try: originalImageBytesIO = io.BytesIO(requests.get(url).content) originalImage = Image.open(originalImageBytesIO) newImage = Image.new("RGB", originalImage.size) - newImage.putdata(originalImage.getdata()) # pyright: ignore[reportUnknownMemberType,reportUnknownArgumentType] + newImage.putdata(originalImage.getdata()) # pyright: ignore[reportArgumentType] + maxSize = config["display"]["posters"]["maxSize"] if maxSize: newImage.thumbnail((maxSize, maxSize)) newImageBytesIO = io.BytesIO() diff --git a/core/plex.py b/core/plex.py index 795e7ae..3968b2b 100644 --- a/core/plex.py +++ b/core/plex.py @@ -1,11 +1,10 @@ -# pyright: reportUnknownArgumentType=none,reportUnknownMemberType=none,reportUnknownVariableType=none +# pyright: reportUnknownArgumentType=none,reportUnknownMemberType=none,reportUnknownVariableType=none,reportTypedDictNotRequiredAccess=none,reportOptionalMemberAccess=none,reportMissingTypeStubs=none from .config import config from .discord import DiscordIpcService from .imgur import uploadToImgur from config.constants import name, plexClientID from plexapi.alert import AlertListener -from plexapi.base import PlexSession, PlexPartialObject from plexapi.media import Genre, Guid from plexapi.myplex import MyPlexAccount, PlexServer from typing import Optional @@ -56,7 +55,7 @@ class PlexAlertListener(threading.Thread): self.daemon = True self.token = token self.serverConfig = serverConfig - self.logger = LoggerWithPrefix(f"[{self.serverConfig['name']}] ") # pyright: ignore[reportTypedDictNotRequiredAccess] + self.logger = LoggerWithPrefix(f"[{self.serverConfig['name']}] ") self.discordIpcService = DiscordIpcService(self.serverConfig.get("ipcPipeNumber")) self.updateTimeoutTimer: Optional[threading.Timer] = None self.connectionCheckTimer: Optional[threading.Timer] = None @@ -94,7 +93,7 @@ class PlexAlertListener(threading.Thread): if not self.server: raise Exception("Server not found") except Exception as e: - self.logger.error("Failed to connect to %s '%s': %s", self.productName, self.serverConfig["name"], e) # pyright: ignore[reportTypedDictNotRequiredAccess] + self.logger.error("Failed to connect to %s '%s': %s", self.productName, self.serverConfig["name"], e) self.logger.error("Reconnecting in 10 seconds") time.sleep(10) @@ -154,11 +153,11 @@ class PlexAlertListener(threading.Thread): self.logger.debug("Received alert: %s", stateNotification) ratingKey = int(stateNotification["ratingKey"]) assert self.server - item: PlexPartialObject = self.server.fetchItem(ratingKey) + item = self.server.fetchItem(ratingKey) if item.key and item.key.startswith("/livetv"): mediaType = "live_episode" else: - mediaType: str = item.type + mediaType = item.type if mediaType not in validMediaTypes: self.logger.debug("Unsupported media type '%s', ignoring", mediaType) return @@ -166,7 +165,7 @@ class PlexAlertListener(threading.Thread): sessionKey = int(stateNotification["sessionKey"]) viewOffset = int(stateNotification["viewOffset"]) try: - libraryName: str = item.section().title + libraryName = item.section().title except: libraryName = "ERROR" if "blacklistedLibraries" in self.serverConfig and libraryName in self.serverConfig["blacklistedLibraries"]: @@ -195,7 +194,7 @@ class PlexAlertListener(threading.Thread): return if self.isServerOwner: self.logger.debug("Searching sessions for session key %s", sessionKey) - sessions: list[PlexSession] = self.server.sessions() + sessions = self.server.sessions() if len(sessions) < 1: self.logger.debug("Empty session list, ignoring") return @@ -203,7 +202,7 @@ class PlexAlertListener(threading.Thread): self.logger.debug("%s, Session Key: %s, Usernames: %s", session, session.sessionKey, session.usernames) if session.sessionKey == sessionKey: self.logger.debug("Session found") - sessionUsername: str = session.usernames[0] + sessionUsername = session.usernames[0] if sessionUsername.lower() == self.listenForUser.lower(): self.logger.debug("Username '%s' matches '%s', continuing", sessionUsername, self.listenForUser) break @@ -268,7 +267,7 @@ class PlexAlertListener(threading.Thread): thumbUrl = getCacheKey(thumb) if not thumbUrl or not isinstance(thumbUrl, str): self.logger.debug("Uploading poster to Imgur") - thumbUrl = uploadToImgur(self.server.url(thumb, True), config["display"]["posters"]["maxSize"]) + thumbUrl = uploadToImgur(self.server.url(thumb, True)) setCacheKey(thumb, thumbUrl) activity: models.discord.Activity = { "details": truncate(title, 128), @@ -326,9 +325,9 @@ class PlexAlertListener(threading.Thread): if state == "playing": currentTimestamp = int(time.time()) if config["display"]["useRemainingTime"]: - activity["timestamps"] = {"end": round(currentTimestamp + ((item.duration - viewOffset) / 1000))} + activity["timestamps"] = { "end": round(currentTimestamp + ((item.duration - viewOffset) / 1000)) } else: - activity["timestamps"] = {"start": round(currentTimestamp - (viewOffset / 1000))} + activity["timestamps"] = { "start": round(currentTimestamp - (viewOffset / 1000)) } if not self.discordIpcService.connected: self.discordIpcService.connect() if self.discordIpcService.connected: diff --git a/lint.bat b/lint.bat new file mode 100644 index 0000000..fd5734f --- /dev/null +++ b/lint.bat @@ -0,0 +1,2 @@ +@echo off +pyright --pythonversion 3.10.0 diff --git a/main.py b/main.py index 958c11e..e06da4c 100644 --- a/main.py +++ b/main.py @@ -13,7 +13,7 @@ if isInContainer: uid, gid = statResult.st_uid, statResult.st_gid else: if noRuntimeDirChown: - logger.warning(f"DRPP_NO_RUNTIME_DIR_CHOWN is set to true. Manually ensure appropriate ownership of {runtimeDirectory}") + logger.warning(f"Environment variable DRPP_NO_RUNTIME_DIR_CHOWN is set to true. Manually ensure appropriate ownership of {runtimeDirectory}") else: os.system(f"chmod 700 {runtimeDirectory}") os.system(f"chown -R {uid}:{gid} {runtimeDirectory}") @@ -21,7 +21,7 @@ if isInContainer: os.setgid(gid) # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType] os.setuid(uid) # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType] else: - logger.warning(f"Not running as the superuser. Manually ensure appropriate ownership of mounted contents") + logger.warning("Not running as the superuser. Manually ensure appropriate ownership of mounted contents") from config.constants import noPipInstall import sys @@ -43,7 +43,7 @@ if not noPipInstall: except Exception as e: logger.exception("An unexpected error occured during automatic installation of dependencies. Install them manually by running the following command: python -m pip install -U -r requirements.txt") -from config.constants import dataDirectoryPath, logFilePath, name, version, isInteractive +from config.constants import dataDirectoryPath, logFilePath, name, version, isInteractive, plexServerNameInput from core.config import config, loadConfig, saveConfig from core.discord import DiscordIpcService from core.plex import PlexAlertListener, initiateAuth, getAuthToken @@ -72,7 +72,6 @@ def init() -> None: loadCache() def main() -> None: - init() if not config["users"]: logger.info("No users found in the config file") user = authNewUser() @@ -104,9 +103,15 @@ def authNewUser() -> Optional[models.config.User]: authToken = getAuthToken(id, code) if authToken: logger.info("Authentication successful") - serverName = os.environ.get("DRPP_PLEX_SERVER_NAME_INPUT") + serverName = plexServerNameInput if not serverName: - serverName = input("Enter the name of the Plex Media Server you wish to connect to: ") if isInteractive else "ServerName" + if isInteractive: + serverName = input("Enter the name of the Plex Media Server to connect to: ") + else: + serverName = "ServerName" + logger.warning("Environment variable DRPP_PLEX_SERVER_NAME_INPUT is not set and the environment is non-interactive") + logger.warning("\"ServerName\" will be used as a placeholder for the name of the Plex Media Server to connect to") + logger.warning("Change this by editing the config file and restarting the script") return { "token": authToken, "servers": [{ "name": serverName }] } time.sleep(5) else: @@ -134,6 +139,7 @@ if __name__ == "__main__": mode = sys.argv[1] if len(sys.argv) > 1 else "" try: if not mode: + init() main() elif mode == "test-ipc": testIpc(int(sys.argv[2]) if len(sys.argv) > 2 else -1) diff --git a/pyrightconfig.json b/pyrightconfig.json index 9b758be..ccc9ce0 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -1,3 +1,3 @@ { - "reportMissingTypeStubs": "information" + "strict": ["**"] } diff --git a/requirements.txt b/requirements.txt index 33f402a..f8e5a25 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ -PlexAPI==4.15.9 -requests==2.31.0 -websocket-client==1.7.0 -PyYAML==6.0.1 -Pillow==10.2.0 +PlexAPI==4.15.16 +requests==2.32.3 +websocket-client==1.8.0 +PyYAML==6.0.2 +Pillow==10.4.0 diff --git a/utils/text.py b/utils/text.py index 3b4f726..cd3c66e 100644 --- a/utils/text.py +++ b/utils/text.py @@ -2,7 +2,7 @@ from typing import Optional def formatSeconds(seconds: int | float, joiner: Optional[str] = None) -> str: seconds = round(seconds) - timeValues = {"h": seconds // 3600, "m": seconds // 60 % 60, "s": seconds % 60} + timeValues = { "h": seconds // 3600, "m": seconds // 60 % 60, "s": seconds % 60 } if not joiner: return "".join(str(v) + k for k, v in timeValues.items() if v > 0) if timeValues["h"] == 0: