discord-rich-presence-plex/core/plex.py

284 lines
12 KiB
Python

# pyright: reportUnknownArgumentType=none,reportUnknownMemberType=none,reportUnknownVariableType=none
from .config import config
from .discord import DiscordIpcService
from .imgur import uploadToImgur
from config.constants import name, plexClientID
from plexapi.alert import AlertListener
from plexapi.base import PlexSession, PlexPartialObject
from plexapi.media import Genre, Guid
from plexapi.myplex import MyPlexAccount, PlexServer
from typing import Optional
from utils.cache import getCacheKey, setCacheKey
from utils.logging import LoggerWithPrefix
from utils.text import formatSeconds
import models.config
import models.discord
import models.plex
import requests
import threading
import time
import urllib.parse
def initiateAuth() -> tuple[str, str, str]:
response = requests.post("https://plex.tv/api/v2/pins.json?strong=true", headers = {
"X-Plex-Product": name,
"X-Plex-Client-Identifier": plexClientID,
}).json()
authUrl = f"https://app.plex.tv/auth#?clientID={plexClientID}&code={response['code']}&context%%5Bdevice%%5D%%5Bproduct%%5D={urllib.parse.quote(name)}"
return response["id"], response["code"], authUrl
def getAuthToken(id: str, code: str) -> Optional[str]:
response = requests.get(f"https://plex.tv/api/v2/pins/{id}.json?code={code}", headers = {
"X-Plex-Client-Identifier": plexClientID,
}).json()
return response["authToken"]
class PlexAlertListener(threading.Thread):
productName = "Plex Media Server"
updateTimeoutTimerInterval = 30
connectionTimeoutTimerInterval = 60
maximumIgnores = 2
def __init__(self, token: str, serverConfig: models.config.Server):
super().__init__()
self.daemon = True
self.token = token
self.serverConfig = serverConfig
self.logger = LoggerWithPrefix(f"[{self.serverConfig['name']}] ") # pyright: ignore[reportTypedDictNotRequiredAccess]
self.discordIpcService = DiscordIpcService(self.serverConfig.get("ipcPipeNumber"))
self.updateTimeoutTimer: Optional[threading.Timer] = None
self.connectionTimeoutTimer: Optional[threading.Timer] = None
self.account: Optional[MyPlexAccount] = None
self.server: Optional[PlexServer] = None
self.alertListener: Optional[AlertListener] = None
self.lastState, self.lastSessionKey, self.lastRatingKey = "", 0, 0
self.listenForUser, self.isServerOwner, self.ignoreCount = "", False, 0
self.start()
def run(self) -> None:
connected = False
while not connected:
try:
self.logger.info("Signing into Plex")
self.account = MyPlexAccount(token = self.token)
self.logger.info("Signed in as Plex user '%s'", self.account.username)
self.listenForUser = self.serverConfig.get("listenForUser", "") or self.account.username
self.server = None
for resource in self.account.resources():
if resource.product == self.productName and resource.name.lower() == self.serverConfig["name"].lower():
self.logger.info("Connecting to %s '%s'", self.productName, self.serverConfig["name"])
self.server = resource.connect()
try:
self.server.account()
self.isServerOwner = True
except:
pass
self.logger.info("Connected to %s '%s'", self.productName, resource.name)
self.alertListener = AlertListener(self.server, self.handleAlert, self.reconnect)
self.alertListener.start()
self.logger.info("Listening for alerts from user '%s'", self.listenForUser)
self.connectionTimeoutTimer = threading.Timer(self.connectionTimeoutTimerInterval, self.connectionTimeout)
self.connectionTimeoutTimer.start()
connected = True
break
if not self.server:
raise Exception("Server not found")
except Exception as e:
self.logger.error("Failed to connect to %s '%s': %s", self.productName, self.serverConfig["name"], e) # pyright: ignore[reportTypedDictNotRequiredAccess]
self.logger.error("Reconnecting in 10 seconds")
time.sleep(10)
def disconnect(self) -> None:
if self.alertListener:
try:
self.alertListener.stop()
except:
pass
self.disconnectRpc()
self.account, self.server, self.alertListener, self.listenForUser, self.isServerOwner, self.ignoreCount = None, None, None, "", False, 0
self.logger.info("Stopped listening for alerts")
def reconnect(self, exception: Exception) -> None:
self.logger.error("Connection to Plex lost: %s", exception)
self.disconnect()
self.logger.error("Reconnecting")
self.run()
def disconnectRpc(self) -> None:
self.lastState, self.lastSessionKey, self.lastRatingKey = "", 0, 0
if self.discordIpcService.connected:
self.discordIpcService.disconnect()
self.cancelTimers()
def cancelTimers(self) -> None:
if self.updateTimeoutTimer:
self.updateTimeoutTimer.cancel()
if self.connectionTimeoutTimer:
self.connectionTimeoutTimer.cancel()
self.updateTimeoutTimer, self.connectionTimeoutTimer = None, None
def updateTimeout(self) -> None:
self.logger.debug("No recent updates from session key %s", self.lastSessionKey)
self.disconnectRpc()
def connectionTimeout(self) -> None:
try:
assert self.server
self.logger.debug("Request for list of clients to check connection: %s", self.server.clients())
except Exception as e:
self.reconnect(e)
else:
self.connectionTimeoutTimer = threading.Timer(self.connectionTimeoutTimerInterval, self.connectionTimeout)
self.connectionTimeoutTimer.start()
def handleAlert(self, alert: models.plex.Alert) -> None:
try:
if alert["type"] == "playing" and "PlaySessionStateNotification" in alert:
stateNotification = alert["PlaySessionStateNotification"][0]
state = stateNotification["state"]
sessionKey = int(stateNotification["sessionKey"])
ratingKey = int(stateNotification["ratingKey"])
viewOffset = int(stateNotification["viewOffset"])
self.logger.debug("Received alert: %s", stateNotification)
assert self.server
item: PlexPartialObject = self.server.fetchItem(ratingKey)
libraryName: str = item.section().title
if "blacklistedLibraries" in self.serverConfig and libraryName in self.serverConfig["blacklistedLibraries"]:
self.logger.debug("Library '%s' is blacklisted, ignoring", libraryName)
return
if "whitelistedLibraries" in self.serverConfig and libraryName not in self.serverConfig["whitelistedLibraries"]:
self.logger.debug("Library '%s' is not whitelisted, ignoring", libraryName)
return
if self.lastSessionKey == sessionKey and self.lastRatingKey == ratingKey:
if self.updateTimeoutTimer:
self.updateTimeoutTimer.cancel()
self.updateTimeoutTimer = None
if self.lastState == state and self.ignoreCount < self.maximumIgnores:
self.logger.debug("Nothing changed, ignoring")
self.ignoreCount += 1
self.updateTimeoutTimer = threading.Timer(self.updateTimeoutTimerInterval, self.updateTimeout)
self.updateTimeoutTimer.start()
return
else:
self.ignoreCount = 0
if state == "stopped":
self.disconnectRpc()
return
elif state == "stopped":
self.logger.debug("Received 'stopped' state alert from unknown session, ignoring")
return
if self.isServerOwner:
self.logger.debug("Searching sessions for session key %s", sessionKey)
sessions: list[PlexSession] = self.server.sessions()
if len(sessions) < 1:
self.logger.debug("Empty session list, ignoring")
return
for session in sessions:
self.logger.debug("%s, Session Key: %s, Usernames: %s", session, session.sessionKey, session.usernames)
if session.sessionKey == sessionKey:
self.logger.debug("Session found")
sessionUsername: str = session.usernames[0]
if sessionUsername.lower() == self.listenForUser.lower():
self.logger.debug("Username '%s' matches '%s', continuing", sessionUsername, self.listenForUser)
break
self.logger.debug("Username '%s' doesn't match '%s', ignoring", sessionUsername, self.listenForUser)
return
else:
self.logger.debug("No matching session found, ignoring")
return
if self.updateTimeoutTimer:
self.updateTimeoutTimer.cancel()
self.updateTimeoutTimer = threading.Timer(self.updateTimeoutTimerInterval, self.updateTimeout)
self.updateTimeoutTimer.start()
self.lastState, self.lastSessionKey, self.lastRatingKey = state, sessionKey, ratingKey
mediaType: str = item.type
title: str
thumb: str
if mediaType in ["movie", "episode"]:
stateStrings: list[str] = [] if config["display"]["hideTotalTime"] else [formatSeconds(item.duration / 1000)]
if mediaType == "movie":
title = f"{item.title} ({item.year})"
genres: list[Genre] = item.genres[:3]
stateStrings.append(f"{', '.join(genre.tag for genre in genres)}")
largeText = "Watching a movie"
thumb = item.thumb
else:
title = item.grandparentTitle
stateStrings.append(f"S{item.parentIndex:02}E{item.index:02}")
stateStrings.append(item.title)
largeText = "Watching a TV show"
thumb = item.grandparentThumb
if state != "playing":
if config["display"]["useRemainingTime"]:
stateStrings.append(f"{formatSeconds((item.duration - viewOffset) / 1000, ':')} left")
else:
stateStrings.append(f"{formatSeconds(viewOffset / 1000, ':')} elapsed")
stateText = " · ".join(stateString for stateString in stateStrings if stateString)
elif mediaType == "track":
title = item.title
stateText = f"{item.originalTitle or item.grandparentTitle} - {item.parentTitle} ({self.server.fetchItem(item.parentRatingKey).year})"
largeText = "Listening to music"
thumb = item.thumb
else:
self.logger.debug("Unsupported media type '%s', ignoring", mediaType)
return
thumbUrl = ""
if thumb and config["display"]["posters"]["enabled"]:
thumbUrl = getCacheKey(thumb)
if not thumbUrl:
self.logger.debug("Uploading image to Imgur")
thumbUrl = uploadToImgur(self.server.url(thumb, True), config["display"]["posters"]["maxSize"])
setCacheKey(thumb, thumbUrl)
activity: models.discord.Activity = {
"details": title[:128],
"state": stateText[:128],
"assets": {
"large_text": largeText,
"large_image": thumbUrl or "logo",
"small_text": state.capitalize(),
"small_image": state,
},
}
if config["display"]["buttons"]:
guidTags: list[Guid] = []
if mediaType == "movie":
guidTags = item.guids
elif mediaType == "episode":
guidTags = self.server.fetchItem(item.grandparentRatingKey).guids
guids: list[str] = [guid.id for guid in guidTags]
buttons: list[models.discord.ActivityButton] = []
for button in config["display"]["buttons"]:
if button["url"].startswith("dynamic:"):
if guids:
newUrl = button["url"]
if button["url"] == "dynamic:imdb":
for guid in guids:
if guid.startswith("imdb://"):
newUrl = guid.replace("imdb://", "https://www.imdb.com/title/")
break
elif button["url"] == "dynamic:tmdb":
for guid in guids:
if guid.startswith("tmdb://"):
tmdbPathSegment = "movie" if mediaType == "movie" else "tv"
newUrl = guid.replace("tmdb://", f"https://www.themoviedb.org/{tmdbPathSegment}/")
break
if newUrl:
buttons.append({ "label": button["label"], "url": newUrl })
else:
buttons.append(button)
if buttons:
activity["buttons"] = buttons[:2]
if state == "playing":
currentTimestamp = int(time.time())
if config["display"]["useRemainingTime"]:
activity["timestamps"] = {"end": round(currentTimestamp + ((item.duration - viewOffset) / 1000))}
else:
activity["timestamps"] = {"start": round(currentTimestamp - (viewOffset / 1000))}
if not self.discordIpcService.connected:
self.discordIpcService.connect()
if self.discordIpcService.connected:
self.discordIpcService.setActivity(activity)
except:
self.logger.exception("An unexpected error occured in the Plex alert handler")