Update test_server

This commit is contained in:
Michael Shepanski 2017-04-16 23:33:33 -04:00
parent 5278d583bc
commit 29daf4c237
3 changed files with 149 additions and 138 deletions

View file

@ -9,5 +9,6 @@ flake8
pillow
pytest
pytest-cov
pytest-cache
requests
websocket-client

View file

@ -6,6 +6,9 @@ from plexapi.server import PlexServer
from functools import partial
MIN_DATETIME = datetime(2017, 1, 1)
REGEX_EMAIL = r'(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)'
REGEX_IPADDR = r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'
SERVER_BASEURL = plexapi.CONFIG.get('auth.server_baseurl')
SERVER_TOKEN = plexapi.CONFIG.get('auth.server_token')
MYPLEX_USERNAME = plexapi.CONFIG.get('auth.myplex_username')

View file

@ -1,218 +1,225 @@
# -*- coding: utf-8 -*-
import os, pytest
from plexapi import CONFIG
import pytest, re, time
from plexapi.exceptions import BadRequest, NotFound
from plexapi.server import PlexServer
from plexapi.utils import download
from PIL import Image, ImageStat
from requests import Session
from .conftest import is_datetime
from .conftest import REGEX_EMAIL, REGEX_IPADDR
from .conftest import SERVER_BASEURL, SERVER_TOKEN
def test_server_attr(pms):
assert pms._baseurl == 'http://138.68.157.5:32400'
assert pms.friendlyName == 'PMS_API_TEST_SERVER'
assert pms.machineIdentifier == 'e42470b5c527c7e5ebbdc017b5a32c8c683f6f8b'
assert pms.myPlex is True
assert pms.myPlexMappingState == 'mapped'
assert pms.myPlexSigninState == 'ok'
assert pms.myPlexSubscription == '0'
assert pms.myPlexUsername == 'testplexapi@gmail.com'
assert pms.platform == 'Linux'
assert pms.platformVersion == '4.4.0-59-generic (#80-Ubuntu SMP Fri Jan 6 17:47:47 UTC 2017)'
#assert pms.session == <requests.sessions.Session object at 0x029A5E10>
assert pms._token == CONFIG.get('auth.server_token')
assert pms.transcoderActiveVideoSessions == 0
#assert str(pms.updatedAt.date()) == '2017-01-20'
assert pms.version == '1.3.3.3148-b38628e'
def test_server_attr(plex):
assert plex._baseurl == SERVER_BASEURL
assert len(plex.friendlyName) >= 1
assert len(plex.machineIdentifier) == 40
assert plex.myPlex is True
assert plex.myPlexMappingState == 'mapped'
assert plex.myPlexSigninState == 'ok'
assert plex.myPlexSubscription == '0'
assert re.match(REGEX_EMAIL, plex.myPlexUsername)
assert plex.platform in ('Linux', 'Windows')
assert len(plex.platformVersion) >= 5
assert plex._token == SERVER_TOKEN
assert plex.transcoderActiveVideoSessions == 0
assert is_datetime(plex.updatedAt)
assert len(plex.version) >= 5
def test_server_alert_listener(pms, a_movie_section):
import time
messages = []
listener = pms.startAlertListener(messages.append)
a_movie_section.refresh()
time.sleep(5)
listener.stop()
assert len(messages) >= 3
def test_server_alert_listener(plex, movies):
try:
messages = []
listener = plex.startAlertListener(messages.append)
movies.refresh()
time.sleep(5)
assert len(messages) >= 3
finally:
listener.stop()
@pytest.mark.req_client
def test_server_session():
# TODO: Implement test_server_session
pass
def test_server_library(pms):
assert pms.library
def test_server_library(plex):
# TODO: Implement test_server_library
assert plex.library
def test_server_url(pms):
assert 'ohno' in pms.url('ohno')
def test_server_url(plex):
assert 'ohno' in plex.url('ohno')
def test_server_transcodeImage(tmpdir, pms, a_show):
# Ideally we should also test the black white but this has to do for now.
from PIL import Image
def test_server_transcodeImage(tmpdir, plex, show):
width, height = 500, 500
img_url_resize = pms.transcodeImage(a_show.banner, height, width)
gray = img_url_resize = pms.transcodeImage(a_show.banner, height, width, saturation=0)
resized_image = download(img_url_resize, savepath=str(tmpdir), filename='resize_image')
org_image = download(a_show._server.url(a_show.banner), savepath=str(tmpdir), filename='org_image')
gray_image = download(gray, savepath=str(tmpdir), filename='gray_image')
with Image.open(resized_image) as im:
assert width, height == im.size
with Image.open(org_image) as im:
assert width, height != im.size
assert _detect_color_image(gray_image, thumb_size=150) == 'grayscale'
imgurl = plex.transcodeImage(show.banner, height, width)
gray = imgurl = plex.transcodeImage(show.banner, height, width, saturation=0)
resized_img = download(imgurl, savepath=str(tmpdir), filename='resize_image')
original_img = download(show._server.url(show.banner), savepath=str(tmpdir), filename='original_img')
grayscale_img = download(gray, savepath=str(tmpdir), filename='grayscale_img')
with Image.open(resized_img) as image:
assert width, height == image.size
with Image.open(original_img) as image:
assert width, height != image.size
assert _detect_color_image(grayscale_img, thumb_size=150) == 'grayscale'
def _detect_color_image(file, thumb_size=150, MSE_cutoff=22, adjust_color_bias=True):
# from http://stackoverflow.com/questions/20068945/detect-if-image-is-color-grayscale-or-black-and-white-with-python-pil
from PIL import Image, ImageStat
pil_img = Image.open(file)
bands = pil_img.getbands()
# http://stackoverflow.com/questions/20068945/detect-if-image-is-color-grayscale-or-black-and-white-with-python-pil
pilimg = Image.open(file)
bands = pilimg.getbands()
if bands == ('R', 'G', 'B') or bands == ('R', 'G', 'B', 'A'):
thumb = pil_img.resize((thumb_size, thumb_size))
SSE, bias = 0, [0, 0, 0]
thumb = pilimg.resize((thumb_size, thumb_size))
sse, bias = 0, [0, 0, 0]
if adjust_color_bias:
bias = ImageStat.Stat(thumb).mean[:3]
bias = [b - sum(bias) / 3 for b in bias]
for pixel in thumb.getdata():
mu = sum(pixel) / 3
SSE += sum((pixel[i] - mu - bias[i]) * (pixel[i] - mu - bias[i]) for i in [0, 1, 2])
MSE = float(SSE) / (thumb_size * thumb_size)
if MSE <= MSE_cutoff:
return 'grayscale'
else:
return 'color'
sse += sum((pixel[i] - mu - bias[i]) * (pixel[i] - mu - bias[i]) for i in [0, 1, 2])
mse = float(sse) / (thumb_size * thumb_size)
return 'grayscale' if mse <= MSE_cutoff else 'color'
elif len(bands) == 1:
return 'blackandwhite'
def test_server_search(pms):
# basic search. see test_search.py
assert pms.search('16 Blocks')
assert pms.search('16 blocks', mediatype='movie')
def test_server_search(plex):
assert plex.search('16 Blocks')
assert plex.search('16 blocks', mediatype='movie')
def test_server_playlist(pms):
pl = pms.playlist('some_playlist')
assert pl.title == 'some_playlist'
with pytest.raises(NotFound):
pms.playlist('124xxx11y')
def test_server_playlist(plex, show):
episodes = show.episodes()
playlist = plex.createPlaylist('test_playlist', episodes[:3])
try:
assert playlist.title == 'test_playlist'
with pytest.raises(NotFound):
plex.playlist('<playlist-not-found>')
finally:
playlist.delete()
def test_server_playlists(pms):
playlists = pms.playlists()
assert len(playlists)
def test_server_playlists(plex, show):
playlists = plex.playlists()
assert len(playlists) == 0
episodes = show.episodes()
playlist = plex.createPlaylist('test_playlist', episodes[:3])
try:
playlists = plex.playlists()
assert len(playlists) == 1
finally:
playlist.delete()
def test_server_history(pms):
history = pms.history()
def test_server_history(plex):
history = plex.history()
assert len(history)
def test_server_Server_query(pms):
assert pms.query('/')
from plexapi.server import PlexServer
def test_server_Server_query(plex):
assert plex.query('/')
with pytest.raises(BadRequest):
assert pms.query('/asdasdsada/12123127/aaaa', headers={'random_headers': '1337'})
assert plex.query('/asdf/1234/asdf', headers={'random_headers': '1234'})
with pytest.raises(BadRequest):
# This is really requests.exceptions.HTTPError:
# 401 Client Error: Unauthorized for url:
PlexServer('http://138.68.157.5:32400', '1234')
# This is really requests.exceptions.HTTPError
# 401 Client Error: Unauthorized for url
PlexServer(SERVER_BASEURL, '1234')
def test_server_Server_session():
from requests import Session
from plexapi.server import PlexServer
# Mock Sesstion
class MySession(Session):
def __init__(self):
super(self.__class__, self).__init__()
self.plexapi_session_test = True
plex = PlexServer('http://138.68.157.5:32400',
CONFIG.get('auth.server_token'), session=MySession())
# Test Code
plex = PlexServer(SERVER_BASEURL, SERVER_TOKEN, session=MySession())
assert hasattr(plex._session, 'plexapi_session_test')
pl = plex.playlists()
assert hasattr(pl[0]._server._session, 'plexapi_session_test')
# TODO: Check client in test_server_Server_session.
# TODO: Check myplex in test_server_Server_session.
def test_server_token_in_headers(pms):
h = pms._headers()
assert 'X-Plex-Token' in h and len(h['X-Plex-Token'])
def test_server_token_in_headers(plex):
headers = plex._headers()
assert 'X-Plex-Token' in headers
assert len(headers['X-Plex-Token']) >= 1
def test_server_createPlayQueue(pms, a_movie):
pq = pms.createPlayQueue(a_movie, **dict(shuffle=1, repeat=1))
assert 'shuffle=1' and 'repeat=1' in pq._initpath
assert pq.playQueueShuffled is True
def test_server_createPlayQueue(plex, movie):
playqueue = plex.createPlayQueue(movie, shuffle=1, repeat=1)
assert 'shuffle=1' in playqueue._initpath
assert 'repeat=1' in playqueue._initpath
assert playqueue.playQueueShuffled is True
def _test_server_createPlaylist():
# TODO: Implement _test_server_createPlaylist()
# see test_playlists.py
pass
def test_server_client_not_found(pms):
def test_server_client_not_found(plex):
with pytest.raises(NotFound):
pms.client('<This-client-should-not-be-found>')
plex.client('<This-client-should-not-be-found>')
@pytest.mark.req_client
def test_server_client(pms):
assert pms.client('Plex Web (Chrome)')
def test_server_client(plex):
assert plex.client('Plex Web (Chrome)')
def test_server_Server_sessions(pms):
assert len(pms.sessions()) == 0
def test_server_Server_sessions(plex):
assert len(plex.sessions()) == 0
@pytest.mark.req_client
def test_server_clients(pms):
assert len(pms.clients())
m = pms.clients()[0]
assert m._baseurl == 'http://127.0.0.1:32400'
assert m.device is None
assert m.deviceClass == 'pc'
assert m.machineIdentifier == '89hgkrbqxaxmf45o1q2949ru'
assert m.model is None
assert m.platform is None
assert m.platformVersion is None
assert m.product == 'Plex Web'
assert m.protocol == 'plex'
assert m.protocolCapabilities == ['timeline', 'playback', 'navigation', 'mirror', 'playqueues']
assert m.protocolVersion == '1'
assert m._server._baseurl == 'http://138.68.157.5:32400'
assert m.state is None
assert m.title == 'Plex Web (Chrome)'
assert m.token is None
assert m.vendor is None
assert m.version == '2.12.5'
def test_server_clients(plex):
assert len(plex.clients())
client = plex.clients()[0]
assert client._baseurl == 'http://127.0.0.1:32400'
assert client.device is None
assert client.deviceClass == 'pc'
assert client.machineIdentifier == '89hgkrbqxaxmf45o1q2949ru'
assert client.model is None
assert client.platform is None
assert client.platformVersion is None
assert client.product == 'Plex Web'
assert client.protocol == 'plex'
assert client.protocolCapabilities == ['timeline', 'playback', 'navigation', 'mirror', 'playqueues']
assert client.protocolVersion == '1'
assert client._server._baseurl == 'http://138.68.157.5:32400'
assert client.state is None
assert client.title == 'Plex Web (Chrome)'
assert client.token is None
assert client.vendor is None
assert client.version == '2.12.5'
def test_server_account(pms):
acc = pms.account()
assert acc.authToken
def test_server_account(plex):
account = plex.account()
assert account.authToken
# TODO: Figure out why this is missing from time to time.
#assert acc.mappingError == 'publisherror'
assert acc.mappingErrorMessage is None
assert acc.mappingState == 'mapped'
assert acc.privateAddress == '138.68.157.5'
assert acc.privatePort == '32400'
assert acc.publicAddress == '138.68.157.5'
assert acc.publicPort == '32400'
assert acc.signInState == 'ok'
assert acc.subscriptionActive == False
assert acc.subscriptionFeatures == []
assert acc.subscriptionState == 'Unknown'
assert acc.username == 'testplexapi@gmail.com'
# assert account.mappingError == 'publisherror'
assert account.mappingErrorMessage is None
assert account.mappingState == 'mapped'
assert re.match(REGEX_IPADDR, account.privateAddress)
assert int(account.privatePort) >= 1000
assert re.match(REGEX_IPADDR, account.publicAddress)
assert int(account.publicPort) >= 1000
assert account.signInState == 'ok'
assert account.subscriptionActive is False
assert account.subscriptionFeatures == []
assert account.subscriptionState == 'Unknown'
assert re.match(REGEX_EMAIL, account.username)
def test_server_downloadLogs(tmpdir, pms):
pms.downloadLogs(savepath=str(tmpdir), unpack=True)
def test_server_downloadLogs(tmpdir, plex):
plex.downloadLogs(savepath=str(tmpdir), unpack=True)
assert len(tmpdir.listdir()) > 1
def test_server_downloadDatabases(tmpdir, pms):
pms.downloadDatabases(savepath=str(tmpdir), unpack=True)
def test_server_downloadDatabases(tmpdir, plex):
plex.downloadDatabases(savepath=str(tmpdir), unpack=True)
assert len(tmpdir.listdir()) > 1