2023-11-05 05:24:45 +00:00
from config . constants import isUnix , containerDemotionUidGid
2023-10-01 13:41:42 +00:00
import os
2023-11-04 18:13:42 +00:00
import sys
2023-10-01 13:41:42 +00:00
2023-11-05 05:24:45 +00:00
if isUnix and containerDemotionUidGid :
uidGid = int ( containerDemotionUidGid )
os . system ( f " chown -R { uidGid } : { uidGid } { os . path . dirname ( os . path . realpath ( __file__ ) ) } " )
os . setgid ( uidGid ) # pyright: ignore[reportGeneralTypeIssues,reportUnknownMemberType]
os . setuid ( uidGid ) # pyright: ignore[reportGeneralTypeIssues,reportUnknownMemberType]
2023-11-04 18:13:42 +00:00
else :
2023-11-05 05:24:45 +00:00
try :
import subprocess
def parsePipPackages ( packagesStr : str ) - > dict [ str , str ] :
return { packageSplit [ 0 ] : packageSplit [ 1 ] if len ( packageSplit ) > 1 else " " for packageSplit in [ package . split ( " == " ) for package in packagesStr . splitlines ( ) ] }
pipFreezeResult = subprocess . run ( [ sys . executable , " -m " , " pip " , " freeze " ] , stdout = subprocess . PIPE , text = True , check = True )
installedPackages = parsePipPackages ( pipFreezeResult . stdout )
with open ( " requirements.txt " , " r " , encoding = " UTF-8 " ) as requirementsFile :
requiredPackages = parsePipPackages ( requirementsFile . read ( ) )
for packageName , packageVersion in requiredPackages . items ( ) :
if packageName not in installedPackages :
package = f " { packageName } { f ' == { packageVersion } ' if packageVersion else ' ' } "
print ( f " Installing missing dependency: { package } " )
subprocess . run ( [ sys . executable , " -m " , " pip " , " install " , " -U " , package ] , check = True )
except Exception as e :
import traceback
traceback . print_exception ( e )
print ( " An unexpected error occured during automatic installation of dependencies. Install them manually by running the following command: python -m pip install -U -r requirements.txt " )
2023-10-01 13:41:42 +00:00
2023-11-05 05:24:45 +00:00
from config . constants import dataDirectoryPath , logFilePath , name , version , isInteractive
2023-11-04 18:13:42 +00:00
from core . config import config , loadConfig , saveConfig
from core . discord import DiscordIpcService
2023-11-05 05:24:45 +00:00
from core . plex import PlexAlertListener , initiateAuth , getAuthToken
2023-11-04 18:13:42 +00:00
from typing import Optional
from utils . cache import loadCache
2022-05-22 04:37:09 +00:00
from utils . logging import formatter , logger
2023-11-04 18:13:42 +00:00
from utils . text import formatSeconds
2022-05-10 20:23:12 +00:00
import logging
2023-11-04 18:13:42 +00:00
import models . config
2022-05-11 00:03:51 +00:00
import time
2022-05-10 20:23:12 +00:00
2023-11-04 18:13:42 +00:00
def main ( ) - > None :
if not os . path . exists ( dataDirectoryPath ) :
os . mkdir ( dataDirectoryPath )
2023-10-01 13:41:42 +00:00
for oldFilePath in [ " config.json " , " cache.json " , " console.log " ] :
if os . path . isfile ( oldFilePath ) :
2023-11-04 18:13:42 +00:00
os . rename ( oldFilePath , os . path . join ( dataDirectoryPath , oldFilePath ) )
2022-05-22 17:03:17 +00:00
loadConfig ( )
if config [ " logging " ] [ " debug " ] :
logger . setLevel ( logging . DEBUG )
if config [ " logging " ] [ " writeToFile " ] :
fileHandler = logging . FileHandler ( logFilePath )
fileHandler . setFormatter ( formatter )
logger . addHandler ( fileHandler )
logger . info ( " %s - v %s " , name , version )
loadCache ( )
2023-11-04 18:13:42 +00:00
if not config [ " users " ] :
2023-11-04 19:39:30 +00:00
logger . info ( " No users found in the config file " )
2023-11-05 05:24:45 +00:00
user = authNewUser ( )
2023-11-04 18:13:42 +00:00
if not user :
exit ( )
config [ " users " ] . append ( user )
saveConfig ( )
plexAlertListeners = [ PlexAlertListener ( user [ " token " ] , server ) for user in config [ " users " ] for server in user [ " servers " ] ]
try :
if isInteractive :
while True :
userInput = input ( )
if userInput in [ " exit " , " quit " ] :
raise KeyboardInterrupt
else :
while True :
time . sleep ( 3600 )
except KeyboardInterrupt :
for plexAlertListener in plexAlertListeners :
plexAlertListener . disconnect ( )
2023-11-05 05:24:45 +00:00
def authNewUser ( ) - > Optional [ models . config . User ] :
id , code , url = initiateAuth ( )
2023-11-04 18:13:42 +00:00
logger . info ( " Open the below URL in your web browser and sign in: " )
2023-11-05 05:24:45 +00:00
logger . info ( url )
2023-11-04 18:13:42 +00:00
time . sleep ( 5 )
for i in range ( 35 ) :
2023-11-05 05:24:45 +00:00
logger . info ( f " Checking whether authentication is successful ( { formatSeconds ( ( i + 1 ) * 5 ) } ) " )
authToken = getAuthToken ( id , code )
if authToken :
2023-11-04 18:13:42 +00:00
logger . info ( " Authentication successful " )
serverName = os . environ . get ( " PLEX_SERVER_NAME " )
if not serverName :
serverName = input ( " Enter the name of the Plex Media Server you wish to connect to: " ) if isInteractive else " ServerName "
2023-11-05 05:24:45 +00:00
return { " token " : authToken , " servers " : [ { " name " : serverName } ] }
2022-05-11 02:19:49 +00:00
time . sleep ( 5 )
2022-05-22 17:03:17 +00:00
else :
2023-11-04 18:13:42 +00:00
logger . info ( f " Authentication timed out ( { formatSeconds ( 180 ) } ) " )
def testIpc ( ) - > None :
logger . info ( " Testing Discord IPC connection " )
discordIpcService = DiscordIpcService ( )
discordIpcService . connect ( )
discordIpcService . setActivity ( {
" details " : " details " ,
" state " : " state " ,
" assets " : {
" large_text " : " large_text " ,
" large_image " : " logo " ,
" small_text " : " small_text " ,
" small_image " : " playing " ,
} ,
} )
time . sleep ( 15 )
discordIpcService . disconnect ( )
if __name__ == " __main__ " :
2023-11-05 06:13:52 +00:00
mode = sys . argv [ 1 ] if len ( sys . argv ) > 1 else " "
if not mode :
main ( )
elif mode == " test-ipc " :
2023-11-04 18:13:42 +00:00
testIpc ( )
else :
2023-11-05 06:13:52 +00:00
print ( f " Invalid mode: { mode } " )