mirror of
https://github.com/phin05/discord-rich-presence-plex
synced 2024-11-24 18:43:02 +00:00
More static typing and other tweaks
This commit is contained in:
parent
a73e34a8bf
commit
2952870e1f
14 changed files with 159 additions and 129 deletions
|
@ -4,7 +4,7 @@
|
|||
|
||||
A Python script that displays your [Plex](https://www.plex.tv) status on [Discord](https://discord.com) using [Rich Presence](https://discord.com/developers/docs/rich-presence/how-to).
|
||||
|
||||
Current Version: 2.2.1
|
||||
Current Version: 2.2.2
|
||||
|
||||
## Getting Started
|
||||
|
||||
|
|
4
main.py
4
main.py
|
@ -43,9 +43,7 @@ if len(config["users"]) == 0:
|
|||
|
||||
plexAlertListeners: list[PlexAlertListener] = []
|
||||
try:
|
||||
for user in config["users"]:
|
||||
for server in user["servers"]:
|
||||
plexAlertListeners.append(PlexAlertListener(user["token"], server))
|
||||
plexAlertListeners = [PlexAlertListener(user["token"], server) for user in config["users"] for server in user["servers"]]
|
||||
while True:
|
||||
userInput = input()
|
||||
if userInput in ["exit", "quit"]:
|
||||
|
|
17
models/discord.py
Normal file
17
models/discord.py
Normal file
|
@ -0,0 +1,17 @@
|
|||
from typing import TypedDict
|
||||
|
||||
class ActivityAssets(TypedDict):
|
||||
large_text: str
|
||||
large_image: str
|
||||
small_text: str
|
||||
small_image: str
|
||||
|
||||
class ActivityTimestamps(TypedDict, total = False):
|
||||
start: int
|
||||
end: int
|
||||
|
||||
class Activity(TypedDict, total = False):
|
||||
details: str
|
||||
state: str
|
||||
assets: ActivityAssets
|
||||
timestamps: ActivityTimestamps
|
|
@ -1,12 +1,12 @@
|
|||
from typing import TypedDict
|
||||
|
||||
class ImgurResponse(TypedDict):
|
||||
class Response(TypedDict):
|
||||
success: bool
|
||||
status: int
|
||||
|
||||
class ImgurUploadResponseData(TypedDict):
|
||||
class UploadResponseData(TypedDict):
|
||||
error: str
|
||||
link: str
|
||||
|
||||
class ImgurUploadResponse(ImgurResponse):
|
||||
data: ImgurUploadResponseData
|
||||
class UploadResponse(Response):
|
||||
data: UploadResponseData
|
||||
|
|
11
models/plex.py
Normal file
11
models/plex.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
from typing import TypedDict
|
||||
|
||||
class StateNotification(TypedDict):
|
||||
state: str
|
||||
sessionKey: int
|
||||
ratingKey: int
|
||||
viewOffset: int
|
||||
|
||||
class Alert(TypedDict):
|
||||
type: str
|
||||
PlaySessionStateNotification: list[StateNotification]
|
5
pyrightconfig.json
Normal file
5
pyrightconfig.json
Normal file
|
@ -0,0 +1,5 @@
|
|||
{
|
||||
"reportMissingTypeStubs": "information",
|
||||
"reportUnknownArgumentType": "none",
|
||||
"reportUnknownMemberType": "none"
|
||||
}
|
|
@ -1,9 +1,11 @@
|
|||
# type: ignore
|
||||
# pyright: reportOptionalMemberAccess=none
|
||||
|
||||
from store.constants import discordClientID, isUnix, processID
|
||||
from typing import Any, Optional
|
||||
from utils.logging import logger
|
||||
import asyncio
|
||||
import json
|
||||
import models.discord
|
||||
import os
|
||||
import struct
|
||||
import time
|
||||
|
@ -12,34 +14,34 @@ class DiscordRpcService:
|
|||
|
||||
ipcPipe = ((os.environ.get("XDG_RUNTIME_DIR", None) or os.environ.get("TMPDIR", None) or os.environ.get("TMP", None) or os.environ.get("TEMP", None) or "/tmp") + "/discord-ipc-0") if isUnix else r"\\?\pipe\discord-ipc-0"
|
||||
|
||||
def __init__(self):
|
||||
self.loop = None
|
||||
self.pipeReader = None
|
||||
self.pipeWriter = None
|
||||
def __init__(self) -> None:
|
||||
self.loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
self.pipeReader: Optional[asyncio.StreamReader] = None
|
||||
self.pipeWriter: Optional[Any] = None
|
||||
self.connected = False
|
||||
|
||||
def connect(self):
|
||||
def connect(self) -> None:
|
||||
if self.connected:
|
||||
logger.debug("Attempt to connect Discord IPC Pipe while already connected")
|
||||
return
|
||||
logger.info("Connecting Discord IPC Pipe")
|
||||
self.loop = asyncio.new_event_loop() if isUnix else asyncio.ProactorEventLoop()
|
||||
self.loop = asyncio.new_event_loop()
|
||||
self.loop.run_until_complete(self.handshake())
|
||||
|
||||
async def handshake(self):
|
||||
async def handshake(self) -> None:
|
||||
try:
|
||||
if isUnix:
|
||||
self.pipeReader, self.pipeWriter = await asyncio.open_unix_connection(self.ipcPipe)
|
||||
self.pipeReader, self.pipeWriter = await asyncio.open_unix_connection(self.ipcPipe) # type: ignore
|
||||
else:
|
||||
self.pipeReader = asyncio.StreamReader()
|
||||
self.pipeWriter, _ = await self.loop.create_pipe_connection(lambda: asyncio.StreamReaderProtocol(self.pipeReader), self.ipcPipe)
|
||||
self.pipeWriter, _ = await self.loop.create_pipe_connection(lambda: asyncio.StreamReaderProtocol(self.pipeReader), self.ipcPipe) # type: ignore
|
||||
self.write(0, { "v": 1, "client_id": discordClientID })
|
||||
if await self.read():
|
||||
self.connected = True
|
||||
except:
|
||||
logger.exception("An unexpected error occured during a RPC handshake operation")
|
||||
|
||||
async def read(self):
|
||||
async def read(self) -> Optional[Any]:
|
||||
try:
|
||||
dataBytes = await self.pipeReader.read(1024)
|
||||
data = json.loads(dataBytes[8:].decode("utf-8"))
|
||||
|
@ -49,7 +51,7 @@ class DiscordRpcService:
|
|||
logger.exception("An unexpected error occured during a RPC read operation")
|
||||
self.connected = False
|
||||
|
||||
def write(self, op, payload):
|
||||
def write(self, op: int, payload: Any) -> None:
|
||||
try:
|
||||
logger.debug("[WRITE] %s", payload)
|
||||
payload = json.dumps(payload)
|
||||
|
@ -58,7 +60,7 @@ class DiscordRpcService:
|
|||
logger.exception("An unexpected error occured during a RPC write operation")
|
||||
self.connected = False
|
||||
|
||||
def disconnect(self):
|
||||
def disconnect(self) -> None:
|
||||
if not self.connected:
|
||||
logger.debug("Attempt to disconnect Discord IPC Pipe while not connected")
|
||||
return
|
||||
|
@ -77,7 +79,7 @@ class DiscordRpcService:
|
|||
logger.exception("An unexpected error occured while closing an asyncio event loop")
|
||||
self.connected = False
|
||||
|
||||
def sendActivity(self, activity):
|
||||
def setActivity(self, activity: models.discord.Activity) -> None:
|
||||
logger.info("Activity update: %s", activity)
|
||||
payload = {
|
||||
"cmd": "SET_ACTIVITY",
|
||||
|
|
|
@ -1,14 +1,19 @@
|
|||
# type: ignore
|
||||
# pyright: reportTypedDictNotRequiredAccess=none
|
||||
|
||||
from .DiscordRpcService import DiscordRpcService
|
||||
from .cache import getKey, setKey
|
||||
from .config import config
|
||||
from .imgur import uploadImage
|
||||
from plexapi.alert import AlertListener
|
||||
from plexapi.myplex import MyPlexAccount
|
||||
from plexapi.base import Playable, PlexPartialObject
|
||||
from plexapi.myplex import MyPlexAccount, PlexServer
|
||||
from typing import Optional
|
||||
from utils.logging import LoggerWithPrefix
|
||||
from utils.text import formatSeconds
|
||||
import hashlib
|
||||
import models.config
|
||||
import models.discord
|
||||
import models.plex
|
||||
import threading
|
||||
import time
|
||||
|
||||
|
@ -19,55 +24,48 @@ class PlexAlertListener(threading.Thread):
|
|||
connectionTimeoutTimerInterval = 60
|
||||
maximumIgnores = 2
|
||||
|
||||
def __init__(self, token, serverConfig):
|
||||
def __init__(self, token: str, serverConfig: models.config.Server):
|
||||
super().__init__()
|
||||
self.daemon = True
|
||||
self.token = token
|
||||
self.serverConfig = serverConfig
|
||||
self.logger = LoggerWithPrefix(f"[{self.serverConfig['name']}/{hashlib.md5(str(id(self)).encode('UTF-8')).hexdigest()[:5].upper()}] ")
|
||||
self.discordRpcService = DiscordRpcService()
|
||||
self.updateTimeoutTimer = None
|
||||
self.connectionTimeoutTimer = None
|
||||
self.lastState = ""
|
||||
self.lastSessionKey = 0
|
||||
self.lastRatingKey = 0
|
||||
self.reset()
|
||||
self.updateTimeoutTimer: Optional[threading.Timer] = None
|
||||
self.connectionTimeoutTimer: Optional[threading.Timer] = None
|
||||
self.account: Optional[MyPlexAccount] = None
|
||||
self.server: Optional[PlexServer] = None
|
||||
self.alertListener: Optional[AlertListener] = None
|
||||
self.lastState, self.lastSessionKey, self.lastRatingKey = "", 0, 0
|
||||
self.listenForUser, self.isServerOwner, self.ignoreCount = "", False, 0
|
||||
self.start()
|
||||
|
||||
def reset(self):
|
||||
self.plexAccount = None
|
||||
self.listenForUser = ""
|
||||
self.plexServer = None
|
||||
self.isServerOwner = False
|
||||
self.plexAlertListener = None
|
||||
self.ignoreCount = 0
|
||||
|
||||
def run(self):
|
||||
def run(self) -> None:
|
||||
connected = False
|
||||
while not connected:
|
||||
try:
|
||||
self.plexAccount = MyPlexAccount(token = self.token)
|
||||
self.logger.info("Signed in as Plex User \"%s\"", self.plexAccount.username)
|
||||
self.listenForUser = self.serverConfig.get("listenForUser", self.plexAccount.username)
|
||||
self.plexServer = None
|
||||
for resource in self.plexAccount.resources():
|
||||
self.account = MyPlexAccount(token = self.token)
|
||||
self.logger.info("Signed in as Plex User \"%s\"", self.account.username)
|
||||
self.listenForUser = self.serverConfig.get("listenForUser", self.account.username)
|
||||
self.server = None
|
||||
for resource in self.account.resources():
|
||||
if resource.product == self.productName and resource.name.lower() == self.serverConfig["name"].lower():
|
||||
self.logger.info("Connecting to %s \"%s\"", self.productName, self.serverConfig["name"])
|
||||
self.plexServer = resource.connect()
|
||||
self.server = resource.connect()
|
||||
try:
|
||||
self.plexServer.account()
|
||||
self.server.account()
|
||||
self.isServerOwner = True
|
||||
except:
|
||||
pass
|
||||
self.logger.info("Connected to %s \"%s\"", self.productName, resource.name)
|
||||
self.plexAlertListener = AlertListener(self.plexServer, self.handlePlexAlert, self.reconnect)
|
||||
self.plexAlertListener.start()
|
||||
self.alertListener = AlertListener(self.server, self.handlePlexAlert, self.reconnect)
|
||||
self.alertListener.start()
|
||||
self.logger.info("Listening for alerts from user \"%s\"", self.listenForUser)
|
||||
self.connectionTimeoutTimer = threading.Timer(self.connectionTimeoutTimerInterval, self.connectionTimeout)
|
||||
self.connectionTimeoutTimer.start()
|
||||
connected = True
|
||||
break
|
||||
if not self.plexServer:
|
||||
if not self.server:
|
||||
self.logger.error("%s \"%s\" not found", self.productName, self.serverConfig["name"])
|
||||
break
|
||||
except Exception as e:
|
||||
|
@ -75,58 +73,60 @@ class PlexAlertListener(threading.Thread):
|
|||
self.logger.error("Reconnecting in 10 seconds")
|
||||
time.sleep(10)
|
||||
|
||||
def disconnect(self):
|
||||
try:
|
||||
self.plexAlertListener.stop()
|
||||
except:
|
||||
pass
|
||||
def disconnect(self) -> None:
|
||||
if self.alertListener:
|
||||
try:
|
||||
self.alertListener.stop()
|
||||
except:
|
||||
pass
|
||||
self.disconnectRpc()
|
||||
self.reset()
|
||||
self.account, self.server, self.alertListener, self.listenForUser, self.isServerOwner, self.ignoreCount = None, None, None, "", False, 0
|
||||
self.logger.info("Stopped listening for alerts")
|
||||
|
||||
def reconnect(self, exception):
|
||||
def reconnect(self, exception: Exception) -> None:
|
||||
self.logger.error("Connection to Plex lost: %s", exception)
|
||||
self.disconnect()
|
||||
self.logger.error("Reconnecting")
|
||||
self.run()
|
||||
|
||||
def disconnectRpc(self):
|
||||
def disconnectRpc(self) -> None:
|
||||
self.lastState, self.lastSessionKey, self.lastRatingKey = "", 0, 0
|
||||
self.discordRpcService.disconnect()
|
||||
self.cancelTimers()
|
||||
|
||||
def cancelTimers(self):
|
||||
def cancelTimers(self) -> None:
|
||||
if self.updateTimeoutTimer:
|
||||
self.updateTimeoutTimer.cancel()
|
||||
self.updateTimeoutTimer = None
|
||||
if self.connectionTimeoutTimer:
|
||||
self.connectionTimeoutTimer.cancel()
|
||||
self.connectionTimeoutTimer = None
|
||||
self.updateTimeoutTimer, self.connectionTimeoutTimer = None, None
|
||||
|
||||
def updateTimeout(self):
|
||||
def updateTimeout(self) -> None:
|
||||
self.logger.debug("No recent updates from session key %s", self.lastSessionKey)
|
||||
self.disconnectRpc()
|
||||
|
||||
def connectionTimeout(self):
|
||||
def connectionTimeout(self) -> None:
|
||||
try:
|
||||
self.logger.debug("Request for list of clients to check connection: %s", self.plexServer.clients())
|
||||
assert self.server
|
||||
self.logger.debug("Request for list of clients to check connection: %s", self.server.clients())
|
||||
except Exception as e:
|
||||
self.reconnect(e)
|
||||
else:
|
||||
self.connectionTimeoutTimer = threading.Timer(self.connectionTimeoutTimerInterval, self.connectionTimeout)
|
||||
self.connectionTimeoutTimer.start()
|
||||
|
||||
def handlePlexAlert(self, data):
|
||||
def handlePlexAlert(self, alert: models.plex.Alert) -> None:
|
||||
try:
|
||||
if data["type"] == "playing" and "PlaySessionStateNotification" in data:
|
||||
alert = data["PlaySessionStateNotification"][0]
|
||||
state = alert["state"]
|
||||
sessionKey = int(alert["sessionKey"])
|
||||
ratingKey = int(alert["ratingKey"])
|
||||
viewOffset = int(alert["viewOffset"])
|
||||
self.logger.debug("Received alert: %s", alert)
|
||||
item = self.plexServer.fetchItem(ratingKey)
|
||||
libraryName = item.section().title
|
||||
if alert["type"] == "playing" and "PlaySessionStateNotification" in alert:
|
||||
stateNotification = alert["PlaySessionStateNotification"][0]
|
||||
state = stateNotification["state"]
|
||||
sessionKey = int(stateNotification["sessionKey"])
|
||||
ratingKey = int(stateNotification["ratingKey"])
|
||||
viewOffset = int(stateNotification["viewOffset"])
|
||||
self.logger.debug("Received alert: %s", stateNotification)
|
||||
assert self.server
|
||||
item: PlexPartialObject = self.server.fetchItem(ratingKey)
|
||||
libraryName: str = item.section().title
|
||||
if "blacklistedLibraries" in self.serverConfig and libraryName in self.serverConfig["blacklistedLibraries"]:
|
||||
self.logger.debug("Library \"%s\" is blacklisted, ignoring", libraryName)
|
||||
return
|
||||
|
@ -153,15 +153,15 @@ class PlexAlertListener(threading.Thread):
|
|||
return
|
||||
if self.isServerOwner:
|
||||
self.logger.debug("Searching sessions for session key %s", sessionKey)
|
||||
plexServerSessions = self.plexServer.sessions()
|
||||
if len(plexServerSessions) < 1:
|
||||
sessions: list[Playable] = self.server.sessions()
|
||||
if len(sessions) < 1:
|
||||
self.logger.debug("Empty session list, ignoring")
|
||||
return
|
||||
for session in plexServerSessions:
|
||||
for session in sessions:
|
||||
self.logger.debug("%s, Session Key: %s, Usernames: %s", session, session.sessionKey, session.usernames)
|
||||
if session.sessionKey == sessionKey:
|
||||
self.logger.debug("Session found")
|
||||
sessionUsername = session.usernames[0]
|
||||
sessionUsername: str = session.usernames[0]
|
||||
if sessionUsername.lower() == self.listenForUser.lower():
|
||||
self.logger.debug("Username \"%s\" matches \"%s\", continuing", sessionUsername, self.listenForUser)
|
||||
break
|
||||
|
@ -175,40 +175,39 @@ class PlexAlertListener(threading.Thread):
|
|||
self.updateTimeoutTimer = threading.Timer(self.updateTimeoutTimerInterval, self.updateTimeout)
|
||||
self.updateTimeoutTimer.start()
|
||||
self.lastState, self.lastSessionKey, self.lastRatingKey = state, sessionKey, ratingKey
|
||||
if state != "playing":
|
||||
stateText = f"{formatSeconds(viewOffset / 1000, ':')} / {formatSeconds(item.duration / 1000, ':')}"
|
||||
else:
|
||||
stateText = formatSeconds(item.duration / 1000)
|
||||
mediaType = item.type
|
||||
if mediaType == "movie":
|
||||
title = f"{item.title} ({item.year})"
|
||||
if len(item.genres) > 0:
|
||||
stateText += f" · {', '.join(genre.tag for genre in item.genres[:3])}"
|
||||
largeText = "Watching a movie"
|
||||
plexThumb = item.thumb
|
||||
elif mediaType == "episode":
|
||||
title = item.grandparentTitle
|
||||
stateText += f" · S{item.parentIndex:02}E{item.index:02} - {item.title}"
|
||||
largeText = "Watching a TV show"
|
||||
plexThumb = item.grandparentThumb
|
||||
if mediaType in ["movie", "episode"]:
|
||||
stateStrings: list[str] = [formatSeconds(item.duration / 1000)]
|
||||
if mediaType == "movie":
|
||||
title = f"{item.title} ({item.year})"
|
||||
stateStrings.append(f"{', '.join(genre.tag for genre in item.genres[:3])}")
|
||||
largeText = "Watching a movie"
|
||||
thumb = item.thumb
|
||||
else:
|
||||
title = item.grandparentTitle
|
||||
stateStrings.append(f"S{item.parentIndex:02}E{item.index:02}")
|
||||
stateStrings.append(item.title)
|
||||
largeText = "Watching a TV show"
|
||||
thumb = item.grandparentThumb
|
||||
if state != "playing":
|
||||
stateStrings.append(f"{formatSeconds(viewOffset / 1000, ':')} elapsed")
|
||||
stateText = " · ".join(stateString for stateString in stateStrings if stateString)
|
||||
elif mediaType == "track":
|
||||
title = item.title
|
||||
artist = item.originalTitle
|
||||
if not artist:
|
||||
artist = item.grandparentTitle
|
||||
stateText = f"{artist} - {item.parentTitle}"
|
||||
stateText = f"{item.originalTitle or item.grandparentTitle} - {item.parentTitle} ({self.server.fetchItem(item.parentRatingKey).year})"
|
||||
largeText = "Listening to music"
|
||||
plexThumb = item.thumb
|
||||
thumb = item.thumb
|
||||
else:
|
||||
self.logger.debug("Unsupported media type \"%s\", ignoring", mediaType)
|
||||
return
|
||||
thumbUrl = ""
|
||||
if config["display"]["posters"]["enabled"]:
|
||||
if not (thumbUrl := getKey(plexThumb)):
|
||||
thumbUrl = getKey(thumb)
|
||||
if not thumbUrl:
|
||||
self.logger.debug("Uploading image")
|
||||
thumbUrl = uploadImage(self.plexServer.url(plexThumb, True))
|
||||
setKey(plexThumb, thumbUrl)
|
||||
activity = {
|
||||
thumbUrl = uploadImage(self.server.url(thumb, True))
|
||||
setKey(thumb, thumbUrl)
|
||||
activity: models.discord.Activity = {
|
||||
"details": title[:128],
|
||||
"state": stateText[:128],
|
||||
"assets": {
|
||||
|
@ -227,6 +226,6 @@ class PlexAlertListener(threading.Thread):
|
|||
if not self.discordRpcService.connected:
|
||||
self.discordRpcService.connect()
|
||||
if self.discordRpcService.connected:
|
||||
self.discordRpcService.sendActivity(activity)
|
||||
self.discordRpcService.setActivity(activity)
|
||||
except:
|
||||
self.logger.exception("An unexpected error occured in the alert handler")
|
||||
|
|
|
@ -1,19 +1,20 @@
|
|||
from typing import Any
|
||||
from store.constants import cacheFilePath
|
||||
from typing import Any
|
||||
from utils.logging import logger
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
cache: dict[str, Any] = {}
|
||||
|
||||
def loadCache() -> None:
|
||||
global cache
|
||||
if os.path.isfile(cacheFilePath):
|
||||
try:
|
||||
with open(cacheFilePath, "r", encoding = "UTF-8") as cacheFile:
|
||||
cache = json.load(cacheFile)
|
||||
cache.update(json.load(cacheFile))
|
||||
except:
|
||||
logger.exception("Failed to parse the application's cache file.")
|
||||
os.rename(cacheFilePath, cacheFilePath.replace(".json", f"-{time.time():.0f}.json"))
|
||||
logger.exception("Failed to parse the application's cache file. A new one will be created.")
|
||||
|
||||
def getKey(key: str) -> Any:
|
||||
return cache.get(key)
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
from models.config import Config
|
||||
from store.constants import configFilePath
|
||||
from utils.logging import logger
|
||||
from utils.dict import merge
|
||||
from utils.logging import logger
|
||||
import json
|
||||
import models.config
|
||||
import os
|
||||
import time
|
||||
|
||||
config: Config = {
|
||||
config: models.config.Config = {
|
||||
"logging": {
|
||||
"debug": True,
|
||||
},
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
from models.imgur import ImgurUploadResponse
|
||||
from services.config import config
|
||||
from typing import Optional
|
||||
from utils.logging import logger
|
||||
import models.imgur
|
||||
import requests
|
||||
|
||||
def uploadImage(url: str) -> Optional[str]:
|
||||
try:
|
||||
data: ImgurUploadResponse = requests.post(
|
||||
data: models.imgur.UploadResponse = requests.post(
|
||||
"https://api.imgur.com/3/image",
|
||||
headers = { "Authorization": f"Client-ID {config['display']['posters']['imgurClientID']}" },
|
||||
files = { "image": requests.get(url).content }
|
||||
|
|
|
@ -2,7 +2,7 @@ import os
|
|||
import sys
|
||||
|
||||
name = "Discord Rich Presence for Plex"
|
||||
version = "2.2.1"
|
||||
version = "2.2.2"
|
||||
|
||||
plexClientID = "discord-rich-presence-plex"
|
||||
discordClientID = "413407336082833418"
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Any
|
||||
from typing import Any, Callable
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger("discord-rich-presence-plex")
|
||||
|
@ -11,18 +11,13 @@ class LoggerWithPrefix:
|
|||
|
||||
def __init__(self, prefix: str) -> None:
|
||||
self.prefix = prefix
|
||||
self.info = self._wrapLoggerFunc(logger.info)
|
||||
self.warning = self._wrapLoggerFunc(logger.warning)
|
||||
self.error = self._wrapLoggerFunc(logger.error)
|
||||
self.exception = self._wrapLoggerFunc(logger.exception)
|
||||
self.debug = self._wrapLoggerFunc(logger.debug)
|
||||
|
||||
def info(self, obj: Any, *args: Any, **kwargs: Any) -> None:
|
||||
logger.info(self.prefix + str(obj), *args, **kwargs)
|
||||
|
||||
def warning(self, obj: Any, *args: Any, **kwargs: Any) -> None:
|
||||
logger.warning(self.prefix + str(obj), *args, **kwargs)
|
||||
|
||||
def error(self, obj: Any, *args: Any, **kwargs: Any) -> None:
|
||||
logger.error(self.prefix + str(obj), *args, **kwargs)
|
||||
|
||||
def exception(self, obj: Any, *args: Any, **kwargs: Any) -> None:
|
||||
logger.exception(self.prefix + str(obj), *args, **kwargs)
|
||||
|
||||
def debug(self, obj: Any, *args: Any, **kwargs: Any) -> None:
|
||||
logger.debug(self.prefix + str(obj), *args, **kwargs)
|
||||
def _wrapLoggerFunc(self, func: Callable[..., None]) -> Callable[..., None]:
|
||||
def wrappedFunc(obj: Any, *args: Any, **kwargs: Any) -> None:
|
||||
func(self.prefix + str(obj), *args, **kwargs)
|
||||
return wrappedFunc
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
def formatSeconds(seconds: int, joiner: str = "") -> str:
|
||||
from typing import Optional
|
||||
|
||||
def formatSeconds(seconds: int | float, joiner: Optional[str] = None) -> str:
|
||||
seconds = round(seconds)
|
||||
timeValues = {"h": seconds // 3600, "m": seconds // 60 % 60, "s": seconds % 60}
|
||||
if not joiner:
|
||||
|
|
Loading…
Reference in a new issue