Ability to display posters in Rich Presence

More refactoring
This commit is contained in:
Phin 2022-05-12 12:44:23 +05:30
parent 7ee73b8dd6
commit a54521ec9e
15 changed files with 170 additions and 71 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
config.json config.json
cache.json

View file

@ -2,7 +2,7 @@
A Python script that displays your [Plex](https://www.plex.tv) status on [Discord](https://discord.com) using [Rich Presence](https://discord.com/developers/docs/rich-presence/how-to). A Python script that displays your [Plex](https://www.plex.tv) status on [Discord](https://discord.com) using [Rich Presence](https://discord.com/developers/docs/rich-presence/how-to).
Current Version: 2.1.1 Current Version: 2.2.0
## Getting Started ## Getting Started
@ -25,6 +25,9 @@ The script must be running on the same machine as your Discord client.
* `debug` (boolean, default: `true`) - Outputs additional debug-helpful information to the console if enabled. * `debug` (boolean, default: `true`) - Outputs additional debug-helpful information to the console if enabled.
* `display` * `display`
* `useRemainingTime` (boolean, default: `false`) - Displays your media's remaining time instead of elapsed time in your Rich Presence if enabled. * `useRemainingTime` (boolean, default: `false`) - Displays your media's remaining time instead of elapsed time in your Rich Presence if enabled.
* `posters`
* `enabled` (boolean, default: `false`)
* `imgurClientID` (string, default: `""`)
* `users` (list) * `users` (list)
* `token` (string) - An access token associated with your Plex account. ([X-Plex-Token](https://support.plex.tv/articles/204059436-finding-an-authentication-token-x-plex-token), [Authenticating with Plex](https://forums.plex.tv/t/authenticating-with-plex/609370)) * `token` (string) - An access token associated with your Plex account. ([X-Plex-Token](https://support.plex.tv/articles/204059436-finding-an-authentication-token-x-plex-token), [Authenticating with Plex](https://forums.plex.tv/t/authenticating-with-plex/609370))
* `servers` (list) * `servers` (list)
@ -33,6 +36,12 @@ The script must be running on the same machine as your Discord client.
* `blacklistedLibraries` (list, optional) - Alerts originating from libraries in this list are ignored. * `blacklistedLibraries` (list, optional) - Alerts originating from libraries in this list are ignored.
* `whitelistedLibraries` (list, optional) - If set, alerts originating from libraries that are not in this list are ignored. * `whitelistedLibraries` (list, optional) - If set, alerts originating from libraries that are not in this list are ignored.
### Obtaining an Imgur client ID
1. Go to Imgur's [application registration page](https://api.imgur.com/oauth2/addclient)
2. Enter any name for the application and pick OAuth2 without a callback URL as the authorisation type
3. Submit the form to obtain your application's client ID
### Example ### Example
```json ```json
@ -41,7 +50,11 @@ The script must be running on the same machine as your Discord client.
"debug": true "debug": true
}, },
"display": { "display": {
"useRemainingTime": false "useRemainingTime": false,
"posters": {
"enabled": true,
"imgurClientID": "9e9sf637S8bRp4z"
}
}, },
"users": [ "users": [
{ {

17
main.py
View file

@ -1,6 +1,8 @@
from services import ConfigService, PlexAlertListener from services import PlexAlertListener
from services.cache import loadCache
from services.config import config, loadConfig, saveConfig
from store.constants import isUnix, name, plexClientID, version from store.constants import isUnix, name, plexClientID, version
from utils.logs import logger from utils.logging import logger
import logging import logging
import os import os
import requests import requests
@ -9,12 +11,11 @@ import urllib.parse
os.system("clear" if isUnix else "cls") os.system("clear" if isUnix else "cls")
logger.info("%s - v%s", name, version) logger.info("%s - v%s", name, version)
configService = ConfigService("config.json") loadConfig()
config = configService.config loadCache()
if config["logging"]["debug"]: if config["logging"]["debug"]:
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
PlexAlertListener.useRemainingTime = config["display"]["useRemainingTime"]
if len(config["users"]) == 0: if len(config["users"]) == 0:
logger.info("No users found in the config file. Initiating authentication flow.") logger.info("No users found in the config file. Initiating authentication flow.")
response = requests.post("https://plex.tv/api/v2/pins.json?strong=true", headers = { response = requests.post("https://plex.tv/api/v2/pins.json?strong=true", headers = {
@ -33,7 +34,7 @@ if len(config["users"]) == 0:
logger.info("Authentication successful.") logger.info("Authentication successful.")
serverName = input("Enter the name of the Plex Media Server you wish to connect to: ") serverName = input("Enter the name of the Plex Media Server you wish to connect to: ")
config["users"].append({ "token": authCheckResponse["authToken"], "servers": [{ "name": serverName }] }) config["users"].append({ "token": authCheckResponse["authToken"], "servers": [{ "name": serverName }] })
configService.saveConfig() saveConfig()
break break
time.sleep(5) time.sleep(5)
else: else:
@ -48,7 +49,7 @@ try:
while True: while True:
userInput = input() userInput = input()
if userInput in ["exit", "quit"]: if userInput in ["exit", "quit"]:
break raise KeyboardInterrupt
except KeyboardInterrupt: except KeyboardInterrupt:
for plexAlertListener in plexAlertListeners: for plexAlertListener in plexAlertListeners:
plexAlertListener.disconnect() plexAlertListener.disconnect()

View file

@ -3,8 +3,13 @@ from typing import TypedDict
class Logging(TypedDict): class Logging(TypedDict):
debug: bool debug: bool
class Posters(TypedDict):
enabled: bool
imgurClientID: str
class Display(TypedDict): class Display(TypedDict):
useRemainingTime: bool useRemainingTime: bool
posters: Posters
class Server(TypedDict, total = False): class Server(TypedDict, total = False):
name: str name: str

12
models/imgur.py Normal file
View file

@ -0,0 +1,12 @@
from typing import TypedDict
class ImgurResponse(TypedDict):
success: bool
status: int
class ImgurUploadResponseData(TypedDict):
error: str
link: str
class ImgurUploadResponse(ImgurResponse):
data: ImgurUploadResponseData

View file

@ -1,41 +0,0 @@
from models.config import Config
from utils.logs import logger
import json
import os
import time
class ConfigService:
config: Config
def __init__(self, configFilePath: str) -> None:
self.configFilePath = configFilePath
if os.path.isfile(self.configFilePath):
try:
with open(self.configFilePath, "r", encoding = "UTF-8") as configFile:
self.config = json.load(configFile)
except:
os.rename(configFilePath, configFilePath.replace(".json", f"-{time.time():.0f}.json"))
logger.exception("Failed to parse the application's config file. A new one will be created.")
self.resetConfig()
else:
self.resetConfig()
def resetConfig(self) -> None:
self.config = {
"logging": {
"debug": True,
},
"display": {
"useRemainingTime": False,
},
"users": [],
}
self.saveConfig()
def saveConfig(self) -> None:
try:
with open(self.configFilePath, "w", encoding = "UTF-8") as configFile:
json.dump(self.config, configFile, indent = "\t")
except:
logger.exception("Failed to write to the application's config file.\n%s")

View file

@ -1,7 +1,7 @@
# type: ignore # type: ignore
from store.constants import isUnix, processID from store.constants import discordClientID, isUnix, processID
from utils.logs import logger from utils.logging import logger
import asyncio import asyncio
import json import json
import os import os
@ -10,7 +10,6 @@ import time
class DiscordRpcService: class DiscordRpcService:
clientID = "413407336082833418"
ipcPipe = ((os.environ.get("XDG_RUNTIME_DIR", None) or os.environ.get("TMPDIR", None) or os.environ.get("TMP", None) or os.environ.get("TEMP", None) or "/tmp") + "/discord-ipc-0") if isUnix else r"\\?\pipe\discord-ipc-0" ipcPipe = ((os.environ.get("XDG_RUNTIME_DIR", None) or os.environ.get("TMPDIR", None) or os.environ.get("TMP", None) or os.environ.get("TEMP", None) or "/tmp") + "/discord-ipc-0") if isUnix else r"\\?\pipe\discord-ipc-0"
def __init__(self): def __init__(self):
@ -34,7 +33,7 @@ class DiscordRpcService:
else: else:
self.pipeReader = asyncio.StreamReader() self.pipeReader = asyncio.StreamReader()
self.pipeWriter, _ = await self.loop.create_pipe_connection(lambda: asyncio.StreamReaderProtocol(self.pipeReader), self.ipcPipe) self.pipeWriter, _ = await self.loop.create_pipe_connection(lambda: asyncio.StreamReaderProtocol(self.pipeReader), self.ipcPipe)
self.write(0, { "v": 1, "client_id": self.clientID }) self.write(0, { "v": 1, "client_id": discordClientID })
if await self.read(): if await self.read():
self.connected = True self.connected = True
except: except:

View file

@ -1,23 +1,27 @@
# type: ignore # type: ignore
from .DiscordRpcService import DiscordRpcService
from .cache import getKey, setKey
from .config import config
from .imgur import uploadImage
from plexapi.alert import AlertListener from plexapi.alert import AlertListener
from plexapi.myplex import MyPlexAccount from plexapi.myplex import MyPlexAccount
from services import DiscordRpcService from utils.logging import LoggerWithPrefix
from utils.logs import LoggerWithPrefix
from utils.text import formatSeconds from utils.text import formatSeconds
import hashlib import hashlib
import threading import threading
import time import time
class PlexAlertListener: class PlexAlertListener(threading.Thread):
productName = "Plex Media Server" productName = "Plex Media Server"
updateTimeoutTimerInterval = 30 updateTimeoutTimerInterval = 30
connectionTimeoutTimerInterval = 60 connectionTimeoutTimerInterval = 60
maximumIgnores = 2 maximumIgnores = 2
useRemainingTime = False
def __init__(self, token, serverConfig): def __init__(self, token, serverConfig):
super().__init__()
self.daemon = True
self.token = token self.token = token
self.serverConfig = serverConfig self.serverConfig = serverConfig
self.logger = LoggerWithPrefix(f"[{self.serverConfig['name']}/{hashlib.md5(str(id(self)).encode('UTF-8')).hexdigest()[:5].upper()}] ") self.logger = LoggerWithPrefix(f"[{self.serverConfig['name']}/{hashlib.md5(str(id(self)).encode('UTF-8')).hexdigest()[:5].upper()}] ")
@ -25,7 +29,7 @@ class PlexAlertListener:
self.updateTimeoutTimer = None self.updateTimeoutTimer = None
self.connectionTimeoutTimer = None self.connectionTimeoutTimer = None
self.reset() self.reset()
self.connect() self.start()
def reset(self): def reset(self):
self.plexAccount = None self.plexAccount = None
@ -38,7 +42,7 @@ class PlexAlertListener:
self.lastRatingKey = 0 self.lastRatingKey = 0
self.ignoreCount = 0 self.ignoreCount = 0
def connect(self): def run(self):
connected = False connected = False
while not connected: while not connected:
try: try:
@ -85,7 +89,7 @@ class PlexAlertListener:
self.logger.error("Connection to Plex lost: %s", exception) self.logger.error("Connection to Plex lost: %s", exception)
self.disconnect() self.disconnect()
self.logger.error("Reconnecting") self.logger.error("Reconnecting")
self.connect() self.run()
def cancelTimers(self): def cancelTimers(self):
if self.updateTimeoutTimer: if self.updateTimeoutTimer:
@ -180,12 +184,12 @@ class PlexAlertListener:
if len(item.genres) > 0: if len(item.genres) > 0:
stateText += f" · {', '.join(genre.tag for genre in item.genres[:3])}" stateText += f" · {', '.join(genre.tag for genre in item.genres[:3])}"
largeText = "Watching a movie" largeText = "Watching a movie"
# self.logger.debug("Poster: %s", item.thumbUrl) plexThumb = item.thumb
elif mediaType == "episode": elif mediaType == "episode":
title = item.grandparentTitle title = item.grandparentTitle
stateText += f" · S{item.parentIndex:02}E{item.index:02} - {item.title}" stateText += f" · S{item.parentIndex:02}E{item.index:02} - {item.title}"
largeText = "Watching a TV show" largeText = "Watching a TV show"
# self.logger.debug("Poster: %s", self.plexServer.url(item.grandparentThumb, True)) plexThumb = item.grandparentThumb
elif mediaType == "track": elif mediaType == "track":
title = item.title title = item.title
artist = item.originalTitle artist = item.originalTitle
@ -193,23 +197,29 @@ class PlexAlertListener:
artist = item.grandparentTitle artist = item.grandparentTitle
stateText = f"{artist} - {item.parentTitle}" stateText = f"{artist} - {item.parentTitle}"
largeText = "Listening to music" largeText = "Listening to music"
# self.logger.debug("Album Art: %s", item.thumbUrl) plexThumb = item.thumb
else: else:
self.logger.debug("Unsupported media type \"%s\", ignoring", mediaType) self.logger.debug("Unsupported media type \"%s\", ignoring", mediaType)
return return
thumbUrl = None
if config["display"]["posters"]["enabled"]:
if not (thumbUrl := getKey(plexThumb)):
self.logger.debug("Uploading image")
thumbUrl = uploadImage(self.plexServer.url(plexThumb, True))
setKey(plexThumb, thumbUrl)
activity = { activity = {
"details": title[:128], "details": title[:128],
"state": stateText[:128], "state": stateText[:128],
"assets": { "assets": {
"large_text": largeText, "large_text": largeText,
"large_image": "logo", "large_image": thumbUrl or "logo",
"small_text": state.capitalize(), "small_text": state.capitalize(),
"small_image": state, "small_image": state,
}, },
} }
if state == "playing": if state == "playing":
currentTimestamp = int(time.time()) currentTimestamp = int(time.time())
if self.useRemainingTime: if config["display"]["useRemainingTime"]:
activity["timestamps"] = {"end": round(currentTimestamp + ((item.duration - viewOffset) / 1000))} activity["timestamps"] = {"end": round(currentTimestamp + ((item.duration - viewOffset) / 1000))}
else: else:
activity["timestamps"] = {"start": round(currentTimestamp - (viewOffset / 1000))} activity["timestamps"] = {"start": round(currentTimestamp - (viewOffset / 1000))}

View file

@ -1,3 +1,2 @@
from .ConfigService import ConfigService as ConfigService
from .DiscordRpcService import DiscordRpcService as DiscordRpcService from .DiscordRpcService import DiscordRpcService as DiscordRpcService
from .PlexAlertListener import PlexAlertListener as PlexAlertListener from .PlexAlertListener import PlexAlertListener as PlexAlertListener

28
services/cache.py Normal file
View file

@ -0,0 +1,28 @@
from typing import Any
from store.constants import cacheFilePath
from utils.logging import logger
import json
import os
cache: dict[str, Any] = {}
def loadCache() -> None:
global cache
if os.path.isfile(cacheFilePath):
try:
with open(cacheFilePath, "r", encoding = "UTF-8") as cacheFile:
cache = json.load(cacheFile)
except:
logger.exception("Failed to parse the application's cache file.")
def getKey(key: str) -> Any:
return cache.get(key)
def setKey(key: str, value: Any) -> None:
cache[key] = value
try:
with open(cacheFilePath, "w", encoding = "UTF-8") as cacheFile:
json.dump(cache, cacheFile, indent = "\t")
cacheFile.write("\n")
except:
logger.exception("Failed to write to the application's cache file.")

40
services/config.py Normal file
View file

@ -0,0 +1,40 @@
from models.config import Config
from store.constants import configFilePath
from utils.logging import logger
from utils.dict import merge
import json
import os
import time
config: Config = {
"logging": {
"debug": True,
},
"display": {
"useRemainingTime": False,
"posters": {
"enabled": False,
"imgurClientID": "",
},
},
"users": [],
}
def loadConfig() -> None:
if os.path.isfile(configFilePath):
try:
with open(configFilePath, "r", encoding = "UTF-8") as configFile:
loadedConfig = json.load(configFile)
merge(loadedConfig, config)
except:
os.rename(configFilePath, configFilePath.replace(".json", f"-{time.time():.0f}.json"))
logger.exception("Failed to parse the application's config file. A new one will be created.")
saveConfig()
def saveConfig() -> None:
try:
with open(configFilePath, "w", encoding = "UTF-8") as configFile:
json.dump(config, configFile, indent = "\t")
configFile.write("\n")
except:
logger.exception("Failed to write to the application's config file.")

19
services/imgur.py Normal file
View file

@ -0,0 +1,19 @@
from models.imgur import ImgurUploadResponse
from services.config import config
from typing import Optional
from utils.logging import logger
import requests
def uploadImage(url: str) -> Optional[str]:
try:
data: ImgurUploadResponse = requests.post(
"https://api.imgur.com/3/image",
headers = { "Authorization": f"Client-ID {config['display']['posters']['imgurClientID']}" },
files = { "image": requests.get(url).content }
).json()
if not data["success"]:
raise Exception(data["data"]["error"])
return data["data"]["link"]
except:
logger.exception("An unexpected error occured while uploading an image")
return None

View file

@ -1,8 +1,13 @@
import sys
import os import os
import sys
name = "Discord Rich Presence for Plex" name = "Discord Rich Presence for Plex"
version = "2.1.1" version = "2.2.0"
plexClientID = "discord-rich-presence-plex" plexClientID = "discord-rich-presence-plex"
discordClientID = "413407336082833418"
configFilePath = "config.json"
cacheFilePath = "cache.json"
isUnix = sys.platform in ["linux", "darwin"] isUnix = sys.platform in ["linux", "darwin"]
processID = os.getpid() processID = os.getpid()

8
utils/dict.py Normal file
View file

@ -0,0 +1,8 @@
from typing import Any
def merge(source: Any, target: Any) -> None:
for key, value in source.items():
if isinstance(value, dict):
merge(value, target.setdefault(key, {}))
else:
target[key] = source[key]