mirror of
https://github.com/pkkid/python-plexapi
synced 2024-11-22 19:53:17 +00:00
68fc970d7a
* lets begin * skip plexpass tests if there is not plexpass on account * test new myplex attrubutes * bootstrap: proper photos organisation * fix rest of photos tests * fix myplex new attributes test * fix music bootstrap by setting agent to lastfm * fix sync tests * increase bootstrap timeout * remove timeout from .travis.yml * do not create playlist-style photoalbums in plex-bootstraptest.py * allow negative filtering in LibrarySection.search() * fix sync tests once again * use sendCrashReports in test_settings * fix test_settings * fix test_video * do not accept eula in bootstrap * fix PlexServer.isLatest() * add test against old version of PlexServer * fix MyPlexAccount.OutOut * add flag for one-time testing in Travis * fix test_library onDeck tests * fix more tests * use tqdm in plex-bootstraptest for media scanning progress * create sections one-by-one * update docs on AlertListener for timeline entries * fix plex-bootstraptest for server version 1.3.2 * display skip/xpass/xfail reasons * fix tests on 1.3 * wait for music to be fully processed in plex-bootstraptest * fix misplaced TEST_ACCOUNT_ONCE * fix test_myplex_users, not sure if in proper-way * add pytest-rerunfailures; mark test_myplex_optout as flaky * fix comment * Revert "add pytest-rerunfailures; mark test_myplex_optout as flaky" This reverts commit580e4c95a7
. * restart plex container on failure * add conftest.wait_until() and used where some retries are required * add more wait_until() usage in test_sync * fix managed user search * fix updating managed users in myplex * allow to add new servers to existent users * add new server to a shared user while bootstrapping * add some docs on testing process * perform few attemps when unable to get the claim token * unlock websocket-client in requirements_dev * fix docblock in tools/plex-teardowntest * do not hardcode mediapart size in test_video * remove cache:pip from travis * Revert "unlock websocket-client in requirements_dev" This reverts commit0d536bd06d
. * remove debug from server.py * improve webhook tests * fix type() check to isinstance() * remove excessive `else` branch due to Hellowlol advice * add `unknown` as allowed `myPlexMappingState` in test_server
261 lines
9.3 KiB
Python
261 lines
9.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
import pytest, re, time
|
|
from plexapi.exceptions import BadRequest, NotFound
|
|
from plexapi.server import PlexServer
|
|
from plexapi.utils import download
|
|
from PIL import Image, ImageStat
|
|
from requests import Session
|
|
from . import conftest as utils
|
|
|
|
|
|
def test_server_attr(plex, account):
|
|
assert plex._baseurl == utils.SERVER_BASEURL
|
|
assert len(plex.friendlyName) >= 1
|
|
assert len(plex.machineIdentifier) == 40
|
|
assert plex.myPlex is True
|
|
# if you run the tests very shortly after server creation the state in rare cases may be `unknown`
|
|
assert plex.myPlexMappingState in ('mapped', 'unknown')
|
|
assert plex.myPlexSigninState == 'ok'
|
|
assert utils.is_int(plex.myPlexSubscription, gte=0)
|
|
assert re.match(utils.REGEX_EMAIL, plex.myPlexUsername)
|
|
assert plex.platform in ('Linux', 'Windows')
|
|
assert len(plex.platformVersion) >= 5
|
|
assert plex._token == account.authenticationToken
|
|
assert utils.is_int(plex.transcoderActiveVideoSessions, gte=0)
|
|
assert utils.is_datetime(plex.updatedAt)
|
|
assert len(plex.version) >= 5
|
|
|
|
|
|
def test_server_alert_listener(plex, movies):
|
|
try:
|
|
messages = []
|
|
listener = plex.startAlertListener(messages.append)
|
|
movies.refresh()
|
|
utils.wait_until(lambda: len(messages) >= 3, delay=1, timeout=30)
|
|
assert len(messages) >= 3
|
|
finally:
|
|
listener.stop()
|
|
|
|
|
|
@pytest.mark.req_client
|
|
def test_server_session():
|
|
# TODO: Implement test_server_session
|
|
pass
|
|
|
|
|
|
def test_server_library(plex):
|
|
# TODO: Implement test_server_library
|
|
assert plex.library
|
|
|
|
|
|
def test_server_url(plex):
|
|
assert 'ohno' in plex.url('ohno')
|
|
|
|
|
|
def test_server_transcodeImage(tmpdir, plex, show):
|
|
width, height = 500, 500
|
|
imgurl = plex.transcodeImage(show.banner, height, width)
|
|
gray = imgurl = plex.transcodeImage(show.banner, height, width, saturation=0)
|
|
resized_img = download(imgurl, plex._token, savepath=str(tmpdir), filename='resize_image')
|
|
original_img = download(show._server.url(show.banner), plex._token, savepath=str(tmpdir), filename='original_img')
|
|
grayscale_img = download(gray, plex._token, savepath=str(tmpdir), filename='grayscale_img')
|
|
with Image.open(resized_img) as image:
|
|
assert width, height == image.size
|
|
with Image.open(original_img) as image:
|
|
assert width, height != image.size
|
|
assert _detect_color_image(grayscale_img, thumb_size=150) == 'grayscale'
|
|
|
|
|
|
def _detect_color_image(file, thumb_size=150, MSE_cutoff=22, adjust_color_bias=True):
|
|
# http://stackoverflow.com/questions/20068945/detect-if-image-is-color-grayscale-or-black-and-white-with-python-pil
|
|
pilimg = Image.open(file)
|
|
bands = pilimg.getbands()
|
|
if bands == ('R', 'G', 'B') or bands == ('R', 'G', 'B', 'A'):
|
|
thumb = pilimg.resize((thumb_size, thumb_size))
|
|
sse, bias = 0, [0, 0, 0]
|
|
if adjust_color_bias:
|
|
bias = ImageStat.Stat(thumb).mean[:3]
|
|
bias = [b - sum(bias) / 3 for b in bias]
|
|
for pixel in thumb.getdata():
|
|
mu = sum(pixel) / 3
|
|
sse += sum((pixel[i] - mu - bias[i]) * (pixel[i] - mu - bias[i]) for i in [0, 1, 2])
|
|
mse = float(sse) / (thumb_size * thumb_size)
|
|
return 'grayscale' if mse <= MSE_cutoff else 'color'
|
|
elif len(bands) == 1:
|
|
return 'blackandwhite'
|
|
|
|
|
|
def test_server_search(plex, movie):
|
|
title = movie.title
|
|
assert plex.search(title)
|
|
assert plex.search(title, mediatype='movie')
|
|
|
|
|
|
def test_server_playlist(plex, show):
|
|
episodes = show.episodes()
|
|
playlist = plex.createPlaylist('test_playlist', episodes[:3])
|
|
try:
|
|
assert playlist.title == 'test_playlist'
|
|
with pytest.raises(NotFound):
|
|
plex.playlist('<playlist-not-found>')
|
|
finally:
|
|
playlist.delete()
|
|
|
|
|
|
def test_server_playlists(plex, show):
|
|
playlists = plex.playlists()
|
|
count = len(playlists)
|
|
episodes = show.episodes()
|
|
playlist = plex.createPlaylist('test_playlist', episodes[:3])
|
|
try:
|
|
playlists = plex.playlists()
|
|
assert len(playlists) == count + 1
|
|
finally:
|
|
playlist.delete()
|
|
|
|
|
|
def test_server_history(plex, movie):
|
|
movie.markWatched()
|
|
history = plex.history()
|
|
assert len(history)
|
|
movie.markUnwatched()
|
|
|
|
|
|
def test_server_Server_query(plex):
|
|
assert plex.query('/')
|
|
with pytest.raises(BadRequest):
|
|
assert plex.query('/asdf/1234/asdf', headers={'random_headers': '1234'})
|
|
with pytest.raises(BadRequest):
|
|
# This is really requests.exceptions.HTTPError
|
|
# 401 Client Error: Unauthorized for url
|
|
PlexServer(utils.SERVER_BASEURL, '1234')
|
|
|
|
|
|
def test_server_Server_session(account):
|
|
# Mock Sesstion
|
|
class MySession(Session):
|
|
def __init__(self):
|
|
super(self.__class__, self).__init__()
|
|
self.plexapi_session_test = True
|
|
# Test Code
|
|
plex = PlexServer(utils.SERVER_BASEURL, account.authenticationToken, session=MySession())
|
|
assert hasattr(plex._session, 'plexapi_session_test')
|
|
|
|
|
|
def test_server_token_in_headers(plex):
|
|
headers = plex._headers()
|
|
assert 'X-Plex-Token' in headers
|
|
assert len(headers['X-Plex-Token']) >= 1
|
|
|
|
|
|
def test_server_createPlayQueue(plex, movie):
|
|
playqueue = plex.createPlayQueue(movie, shuffle=1, repeat=1)
|
|
assert 'shuffle=1' in playqueue._initpath
|
|
assert 'repeat=1' in playqueue._initpath
|
|
assert playqueue.playQueueShuffled is True
|
|
|
|
|
|
def test_server_client_not_found(plex):
|
|
with pytest.raises(NotFound):
|
|
plex.client('<This-client-should-not-be-found>')
|
|
|
|
|
|
def test_server_sessions(plex):
|
|
assert len(plex.sessions()) >= 0
|
|
|
|
|
|
def test_server_isLatest(plex, mocker):
|
|
from os import environ
|
|
is_latest = plex.isLatest()
|
|
if environ.get('PLEX_CONTAINER_TAG') and environ['PLEX_CONTAINER_TAG'] != 'latest':
|
|
assert not is_latest
|
|
else:
|
|
return pytest.skip('Do not forget to run with PLEX_CONTAINER_TAG != latest to ensure that update is available')
|
|
|
|
|
|
def test_server_installUpdate(plex, mocker):
|
|
m = mocker.MagicMock(release='aa')
|
|
mocker.patch('plexapi.server.PlexServer.check_for_update', return_value=m)
|
|
with utils.callable_http_patch():
|
|
plex.installUpdate()
|
|
|
|
|
|
def test_server_check_for_update(plex, mocker):
|
|
class R():
|
|
def __init__(self, **kwargs):
|
|
self.download_key = 'plex.tv/release/1337'
|
|
self.version = '1337'
|
|
self.added = 'gpu transcode'
|
|
self.fixed = 'fixed rare bug'
|
|
self.downloadURL = 'http://path-to-update'
|
|
self.state = 'downloaded'
|
|
|
|
with mocker.patch('plexapi.server.PlexServer.check_for_update', return_value=R()):
|
|
rel = plex.check_for_update(force=False, download=True)
|
|
assert rel.download_key == 'plex.tv/release/1337'
|
|
assert rel.version == '1337'
|
|
assert rel.added == 'gpu transcode'
|
|
assert rel.fixed == 'fixed rare bug'
|
|
assert rel.downloadURL == 'http://path-to-update'
|
|
assert rel.state == 'downloaded'
|
|
|
|
|
|
@pytest.mark.client
|
|
def test_server_clients(plex):
|
|
assert len(plex.clients())
|
|
client = plex.clients()[0]
|
|
assert client._baseurl == 'http://127.0.0.1:32400'
|
|
assert client.device is None
|
|
assert client.deviceClass == 'pc'
|
|
assert client.machineIdentifier == '89hgkrbqxaxmf45o1q2949ru'
|
|
assert client.model is None
|
|
assert client.platform is None
|
|
assert client.platformVersion is None
|
|
assert client.product == 'Plex Web'
|
|
assert client.protocol == 'plex'
|
|
assert client.protocolCapabilities == ['timeline', 'playback', 'navigation', 'mirror', 'playqueues']
|
|
assert client.protocolVersion == '1'
|
|
assert client._server._baseurl == 'http://138.68.157.5:32400'
|
|
assert client.state is None
|
|
assert client.title == 'Plex Web (Chrome)'
|
|
assert client.token is None
|
|
assert client.vendor is None
|
|
assert client.version == '2.12.5'
|
|
|
|
|
|
def test_server_account(plex):
|
|
account = plex.account()
|
|
assert account.authToken
|
|
# TODO: Figure out why this is missing from time to time.
|
|
# assert account.mappingError == 'publisherror'
|
|
assert account.mappingErrorMessage is None
|
|
assert account.mappingState == 'mapped'
|
|
if account.mappingError != 'unreachable':
|
|
assert re.match(utils.REGEX_IPADDR, account.privateAddress)
|
|
assert int(account.privatePort) >= 1000
|
|
assert re.match(utils.REGEX_IPADDR, account.publicAddress)
|
|
assert int(account.publicPort) >= 1000
|
|
else:
|
|
assert account.privateAddress == ''
|
|
assert int(account.privatePort) == 0
|
|
assert account.publicAddress == ''
|
|
assert int(account.publicPort) == 0
|
|
assert account.signInState == 'ok'
|
|
assert isinstance(account.subscriptionActive, bool)
|
|
if account.subscriptionActive:
|
|
assert len(account.subscriptionFeatures)
|
|
# Below check keeps failing.. it should go away.
|
|
# else: assert sorted(account.subscriptionFeatures) == ['adaptive_bitrate',
|
|
# 'download_certificates', 'federated-auth', 'news']
|
|
assert account.subscriptionState == 'Active' if account.subscriptionActive else 'Unknown'
|
|
assert re.match(utils.REGEX_EMAIL, account.username)
|
|
|
|
|
|
def test_server_downloadLogs(tmpdir, plex):
|
|
plex.downloadLogs(savepath=str(tmpdir), unpack=True)
|
|
assert len(tmpdir.listdir()) > 1
|
|
|
|
|
|
def test_server_downloadDatabases(tmpdir, plex):
|
|
plex.downloadDatabases(savepath=str(tmpdir), unpack=True)
|
|
assert len(tmpdir.listdir()) > 1
|