Merge pull request #224 from pkkid/covcov

Covcov
This commit is contained in:
Hellowlol 2017-10-31 00:58:08 +01:00 committed by GitHub
commit 5ef90f3b2c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 246 additions and 80 deletions

View file

@ -3,7 +3,12 @@ exclude_lines =
pragma: no cover
raise NotImplementedError
raise Unsupported
raise Exception
except ImportError
except BadRequest
def __repr__
def __bool__
if __name__ == .__main__.:
def __iter__
def __hash__
def __len__
if __name__ == .__main__.:

View file

@ -35,10 +35,12 @@ logfile = CONFIG.get('log.path')
logformat = CONFIG.get('log.format', '%(asctime)s %(module)12s:%(lineno)-4s %(levelname)-9s %(message)s')
loglevel = CONFIG.get('log.level', 'INFO').upper()
loghandler = logging.NullHandler()
if logfile:
if logfile: # pragma: no cover
logbackups = CONFIG.get('log.backup_count', 3, int)
logbytes = CONFIG.get('log.rotate_bytes', 512000, int)
loghandler = RotatingFileHandler(os.path.expanduser(logfile), 'a', logbytes, logbackups)
loghandler.setFormatter(logging.Formatter(logformat))
log.addHandler(loghandler)
log.setLevel(loglevel)

View file

@ -32,7 +32,7 @@ class AlertListener(threading.Thread):
url = self._server.url(self.key).replace('http', 'ws')
log.info('Starting AlertListener: %s', url)
self._ws = websocket.WebSocketApp(url, on_message=self._onMessage,
on_error=self._onError)
on_error=self._onError)
self._ws.run_forever()
def stop(self):
@ -47,12 +47,12 @@ class AlertListener(threading.Thread):
""" Called when websocket message is recieved. """
try:
data = json.loads(message)['NotificationContainer']
log.debug('Alert: %s', data)
log.debug('Alert: %s %s %s', *data)
if self._callback:
self._callback(data)
except Exception as err:
except Exception as err: # pragma: no cover
log.error('AlertListener Msg Error: %s', err)
def _onError(self, ws, err):
def _onError(self, ws, err): # pragma: no cover
""" Called when websocket error is recieved. """
log.error('AlertListener Error: %s' % err)

View file

@ -444,7 +444,7 @@ class PlexClient(PlexObject):
if not self.product != 'OpenPHT':
try:
self.sendCommand('timeline/subscribe', port=server_url[1].strip('/'), protocol='http')
except:
except: # noqa: E722
# some clients dont need or like this and raises http 400.
# We want to include the exception in the log,
# but it might still work so we swallow it.

View file

@ -35,7 +35,7 @@ class PlexConfig(ConfigParser):
section, name = key.lower().split('.')
value = self.data.get(section, {}).get(name, default)
return cast(value) if cast else value
except:
except: # noqa: E722
return default
def _asDict(self):

View file

@ -148,7 +148,7 @@ class MediaPartStream(PlexObject):
self.type = cast(int, data.attrib.get('streamType'))
@staticmethod
def parse(server, data, initpath):
def parse(server, data, initpath): # pragma: no cover seems to be dead code.
""" Factory method returns a new MediaPartStream from xml data. """
STREAMCLS = {1: VideoStream, 2: AudioStream, 3: SubtitleStream}
stype = cast(int, data.attrib.get('streamType'))

View file

@ -150,10 +150,10 @@ class MyPlexAccount(PlexObject):
log.debug('%s %s %s', method.__name__.upper(), url, kwargs.get('json', ''))
headers = self._headers(**headers or {})
response = method(url, headers=headers, timeout=timeout, **kwargs)
if response.status_code not in (200, 201, 204):
if response.status_code not in (200, 201, 204): # pragma: no cover
codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ')
log.warn('BadRequest (%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
log.warning('BadRequest (%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
raise BadRequest('(%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data.strip() else None
@ -428,8 +428,8 @@ class MyPlexUser(PlexObject):
self.recommendationsPlaylistId = data.attrib.get('recommendationsPlaylistId')
self.restricted = data.attrib.get('restricted')
self.thumb = data.attrib.get('thumb')
self.title = data.attrib.get('title')
self.username = data.attrib.get('username')
self.title = data.attrib.get('title', '')
self.username = data.attrib.get('username', '')
self.servers = self.findItems(data, MyPlexServerShare)
def get_token(self, machineIdentifier):

View file

@ -54,7 +54,7 @@ class Photoalbum(PlexPartialObject):
def album(self, title):
""" Returns the :class:`~plexapi.photo.Photoalbum` that matches the specified title. """
for album in self.albums():
if album.attrib.get('title').lower() == title.lower():
if album.title.lower() == title.lower():
return album
raise NotFound('Unable to find album: %s' % title)
@ -66,17 +66,10 @@ class Photoalbum(PlexPartialObject):
def photo(self, title):
""" Returns the :class:`~plexapi.photo.Photo` that matches the specified title. """
for photo in self.photos():
if photo.attrib.get('title').lower() == title.lower():
if photo.title.lower() == title.lower():
return photo
raise NotFound('Unable to find photo: %s' % title)
def reload(self):
""" Reload the data for this object from self.key. """
self._initpath = self.key
data = self._server.query(self.key)
self._loadData(data)
return self
@utils.registerPlexObject
class Photo(PlexPartialObject):

View file

@ -34,13 +34,13 @@ class Playlist(PlexPartialObject, Playable):
self.updatedAt = toDatetime(data.attrib.get('updatedAt'))
self._items = None # cache for self.items
def __len__(self):
def __len__(self): # pragma: no cover
return len(self.items())
def __contains__(self, other):
def __contains__(self, other): # pragma: no cover
return any(i.key == other.key for i in self.items())
def __getitem__(self, key):
def __getitem__(self, key): # pragma: no cover
return self.items()[key]
def items(self):
@ -57,7 +57,7 @@ class Playlist(PlexPartialObject, Playable):
items = [items]
ratingKeys = []
for item in items:
if item.listType != self.playlistType:
if item.listType != self.playlistType: # pragma: no cover
raise BadRequest('Can not mix media types when building a playlist: %s and %s' %
(self.playlistType, item.listType))
ratingKeys.append(str(item.ratingKey))
@ -108,7 +108,7 @@ class Playlist(PlexPartialObject, Playable):
items = [items]
ratingKeys = []
for item in items:
if item.listType != items[0].listType:
if item.listType != items[0].listType: # pragma: no cover
raise BadRequest('Can not mix media types when building a playlist')
ratingKeys.append(str(item.ratingKey))
ratingKeys = ','.join(ratingKeys)

View file

@ -74,14 +74,6 @@ def cast(func, value):
return value
def getattributeOrNone(obj, self, attr):
""" Returns result from __getattribute__ or None if not found. """
try:
return super(obj, self).__getattribute__(attr)
except AttributeError:
return None
def joinArgs(args):
""" Returns a query string (uses for HTTP URLs) where only the value is URL encoded.
Example return value: '?genre=action&type=1337'.
@ -129,7 +121,7 @@ def rget(obj, attrstr, default=None, delim='.'): # pragma: no cover
if attrstr:
return rget(value, attrstr, default, delim)
return value
except:
except: # noqa: E722
return default
@ -198,7 +190,8 @@ def toList(value, itemcast=None, delim=','):
return [itemcast(item) for item in value.split(delim) if item != '']
def downloadSessionImages(server, filename=None, height=150, width=150, opacity=100, saturation=100):
def downloadSessionImages(server, filename=None, height=150, width=150,
opacity=100, saturation=100): # pragma: no cover
""" Helper to download a bif image or thumb.url from plex.server.sessions.
Parameters:
@ -243,7 +236,7 @@ def download(url, filename=None, savepath=None, session=None, chunksize=4024,
mocked (bool): Helper to do evertything except write the file.
unpack (bool): Unpack the zip file.
showstatus(bool): Display a progressbar.
Example:
>>> download(a_episode.getStreamURL(), a_episode.location)
/path/to/file
@ -278,7 +271,7 @@ def download(url, filename=None, savepath=None, session=None, chunksize=4024,
# save the file to disk
log.info('Downloading: %s', fullpath)
if showstatus:
if showstatus: # pragma: no cover
total = int(response.headers.get('content-length', 0))
bar = tqdm(unit='B', unit_scale=True, total=total, desc=filename)
@ -288,7 +281,7 @@ def download(url, filename=None, savepath=None, session=None, chunksize=4024,
if showstatus:
bar.update(len(chunk))
if showstatus:
if showstatus: # pragma: no cover
bar.close()
# check we want to unzip the contents
if fullpath.endswith('zip') and unpack:
@ -314,7 +307,7 @@ def tag_helper(tag, items, locked=True, remove=False):
return data
def getMyPlexAccount(opts=None):
def getMyPlexAccount(opts=None): # pragma: no cover
""" Helper function tries to get a MyPlex Account instance by checking
the the following locations for a username and password. This is
useful to create user-friendly command line tools.
@ -341,7 +334,7 @@ def getMyPlexAccount(opts=None):
return MyPlexAccount(username, password)
def choose(msg, items, attr):
def choose(msg, items, attr): # pragma: no cover
""" Command line helper to display a list of choices, asking the
user to choose one of the options.
"""

View file

@ -15,12 +15,24 @@
# 3. A Photos section containing the photoalbums:
# Cats (with cute cat photos inside)
# 4. A TV Shows section containing at least two seasons of The 100.
import plexapi, pytest, requests
from datetime import datetime
from functools import partial
import pytest
import requests
try:
from unittest.mock import patch, MagicMock
except ImportError:
from mock import patch, MagicMock
import plexapi
from plexapi import compat
from plexapi.client import PlexClient
from datetime import datetime
from plexapi.server import PlexServer
from functools import partial
SERVER_BASEURL = plexapi.CONFIG.get('auth.server_baseurl')
SERVER_TOKEN = plexapi.CONFIG.get('auth.server_token')
@ -150,10 +162,29 @@ def monkeydownload(request, monkeypatch):
monkeypatch.undo()
def callable_http_patch():
return patch('plexapi.server.requests.sessions.Session.send',
return_value=MagicMock(status_code=200,
text='<xml><child></child></xml>'))
@pytest.fixture()
def empty_response(mocker):
response = mocker.MagicMock(status_code=200, text='<xml><child></child></xml>')
return response
@pytest.fixture()
def patched_http_call(mocker):
return mocker.patch('plexapi.server.requests.sessions.Session.send',
return_value=MagicMock(status_code=200,
text='<xml><child></child></xml>')
)
# ---------------------------------
# Utility Functions
# ---------------------------------
def is_datetime(value):
return value > MIN_DATETIME

View file

@ -45,13 +45,8 @@ def test_library_section_get_movie(plex):
assert plex.library.section('Movies').get('Sita Sings the Blues')
def test_library_section_delete(monkeypatch, movies):
monkeypatch.delattr("requests.sessions.Session.request")
try:
movies.delete()
except AttributeError:
# will always raise because there is no request
pass
def test_library_section_delete(movies, patched_http_call):
movies.delete()
def test_library_fetchItem(plex, movie):
@ -69,11 +64,6 @@ def test_library_recentlyAdded(plex):
assert len(list(plex.library.recentlyAdded()))
def test_library_search(plex):
item = plex.library.search('Elephants Dream')[0]
assert item.title == 'Elephants Dream'
def test_library_add_edit_delete(plex):
# Dont add a location to prevent scanning scanning
section_name = 'plexapi_test_section'
@ -115,14 +105,33 @@ def test_library_Library_deleteMediaPreviews(plex):
plex.library.deleteMediaPreviews()
def _test_library_MovieSection_refresh(movies):
movies.refresh()
def test_library_Library_all(plex):
assert len(plex.library.all(title__iexact='The 100'))
def test_library_Library_search(plex):
item = plex.library.search('Elephants Dream')[0]
assert item.title == 'Elephants Dream'
assert len(plex.library.search(libtype='episode'))
def test_library_MovieSection_update(movies):
movies.update()
def test_library_ShowSection_all(tvshows):
assert len(tvshows.all(title__iexact='The 100'))
def test_library_MovieSection_refresh(movies, patched_http_call):
movies.refresh()
def test_library_MovieSection_search_genre(movie, movies):
# assert len(movie.genres[0].items()) # TODO
assert len(movies.search(genre=movie.genres[0])) > 1
def test_library_MovieSection_cancelUpdate(movies):
movies.cancelUpdate()

View file

@ -1,4 +1,7 @@
# -*- coding: utf-8 -*-
import pytest
from plexapi.exceptions import BadRequest, NotFound
from . import conftest as utils
def test_myplex_accounts(account, plex):
@ -16,7 +19,7 @@ def test_myplex_accounts(account, plex):
account = plex.account()
print('Local PlexServer.account():')
print('username: %s' % account.username)
print('authToken: %s' % account.authToken)
#print('authToken: %s' % account.authToken)
print('signInState: %s' % account.signInState)
assert account.username, 'Account has no username'
assert account.authToken, 'Account has no authToken'
@ -51,6 +54,10 @@ def test_myplex_devices(account):
assert devices, 'No devices found for account: %s' % account.name
def test_myplex_device(account):
assert account.device('pkkid-plexapi')
def _test_myplex_connect_to_device(account):
devices = account.devices()
for device in devices:
@ -67,3 +74,81 @@ def test_myplex_users(account):
user = account.user(users[0].title)
print('Found user: %s' % user)
assert user, 'Could not find user %s' % users[0].title
assert len(users[0].servers[0].sections()) == 7, "Could'nt info about the shared libraries"
def test_myplex_resource(account):
assert account.resource('pkkid-plexapi')
def test_myplex_webhooks(account):
# Webhooks are a plex pass feature to this will fail
with pytest.raises(BadRequest):
account.webhooks()
def test_myplex_addwebhooks(account):
with pytest.raises(BadRequest):
account.addWebhook('http://site.com')
def test_myplex_deletewebhooks(account):
with pytest.raises(BadRequest):
account.deleteWebhook('http://site.com')
def test_myplex_optout(account):
def enabled():
ele = account.query('https://plex.tv/api/v2/user/privacy')
lib = ele.attrib.get('optOutLibraryStats')
play = ele.attrib.get('optOutPlayback')
return bool(int(lib)), bool(int(play))
# This should be False False
library_enabled, playback_enabled = enabled()
account.optOut(library=True, playback=True)
assert all(enabled())
account.optOut(library=False, playback=False)
assert not all(enabled())
def test_myplex_inviteFriend_remove(account, plex, mocker):
inv_user = 'hellowlol'
vid_filter = {'contentRating': ['G'], 'label': ['foo']}
secs = plex.library.sections()
ids = account._getSectionIds(plex.machineIdentifier, secs)
with mocker.patch.object(account, '_getSectionIds', return_value=ids):
with utils.callable_http_patch():
account.inviteFriend(inv_user, plex, secs, allowSync=True, allowCameraUpload=True,
allowChannels=False, filterMovies=vid_filter, filterTelevision=vid_filter,
filterMusic={'label': ['foo']})
assert inv_user not in [u.title for u in account.users()]
with pytest.raises(NotFound):
with utils.callable_http_patch():
account.removeFriend(inv_user)
def test_myplex_updateFriend(account, plex, mocker):
edit_user = 'PKKid'
vid_filter = {'contentRating': ['G'], 'label': ['foo']}
secs = plex.library.sections()
user = account.user(edit_user)
ids = account._getSectionIds(plex.machineIdentifier, secs)
with mocker.patch.object(account, '_getSectionIds', return_value=ids):
with mocker.patch.object(account, 'user', return_value=user):
with utils.callable_http_patch():
account.updateFriend(edit_user, plex, secs, allowSync=True, removeSections=True,
allowCameraUpload=True, allowChannels=False, filterMovies=vid_filter,
filterTelevision=vid_filter, filterMusic={'label': ['foo']})

9
tests/test_photo.py Normal file
View file

@ -0,0 +1,9 @@
def test_photo_Photoalbum(photoalbum):
assert len(photoalbum.albums()) == 3
assert len(photoalbum.photos()) == 3
cats_in_bed = photoalbum.album('Cats in bed')
assert len(cats_in_bed.photos()) == 7
a_pic = cats_in_bed.photo('maxresdefault')
assert a_pic

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import pytest, time
import time
import pytest
def test_create_playlist(plex, show):
@ -99,14 +100,11 @@ def test_playqueues(plex):
def test_copyToUser(plex, show, fresh_plex):
# Skip out if we do not have plexpass
if not plex.myPlexSubscription:
pytest.skip('PlexPass subscription required for test.')
episodes = show.episodes()
playlist = plex.createPlaylist('shared_from_test_plexapi', episodes)
try:
playlist.copyToUser('plexapi2')
user = plex.myPlexAccount().user('plexapi2')
playlist.copyToUser('PKKid')
user = plex.myPlexAccount().user('PKKid')
user_plex = fresh_plex(plex._baseurl, user.get_token(plex.machineIdentifier))
assert playlist.title in [p.title for p in user_plex.playlists()]
finally:

View file

@ -155,12 +155,6 @@ def test_server_createPlayQueue(plex, movie):
assert playqueue.playQueueShuffled is True
def _test_server_createPlaylist():
# TODO: Implement _test_server_createPlaylist()
# see test_playlists.py
pass
def test_server_client_not_found(plex):
with pytest.raises(NotFound):
plex.client('<This-client-should-not-be-found>')
@ -171,9 +165,14 @@ def test_server_sessions(plex):
def test_server_isLatest(plex, mocker):
# I really need to update the testservers pms.. TODO
with mocker.patch('plexapi.server.PlexServer.isLatest', return_value=True):
assert plex.isLatest() is True
plex.isLatest()
def test_server_installUpdate(plex, mocker):
m = mocker.MagicMock(release='aa')
mocker.patch('plexapi.server.PlexServer.check_for_update', return_value=m)
with utils.callable_http_patch():
plex.installUpdate()
def test_server_check_for_update(plex, mocker):

17
tests/test_settings.py Normal file
View file

@ -0,0 +1,17 @@
def test_settings_group(plex):
assert plex.settings.group('general')
def test_settings_get(plex):
# This is the value since it we havnt set any friendlyname
# plex just default to computer name but it NOT in the settings.
assert plex.settings.get('FriendlyName').value == ''
def test_settings_get(plex):
cd = plex.settings.get('collectUsageData')
cd.set(False)
# Save works but since we reload asap the data isnt changed.
# or it might be our caching that does this. ## TODO
plex.settings.save()

View file

@ -17,10 +17,9 @@ def test_video_ne(movies):
assert len(movies.fetchItems('/library/sections/7/all', title__ne='Sintel')) == 3
def test_video_Movie_delete(monkeypatch, movie):
monkeypatch.delattr('requests.sessions.Session.request')
with pytest.raises(AttributeError):
movie.delete()
def test_video_Movie_delete(movie, patched_http_call):
movie.delete()
def test_video_Movie_addCollection(movie):
labelname = 'Random_label'
@ -61,6 +60,15 @@ def test_video_Movie_isPartialObject(movie):
assert movie.isPartialObject()
def test_video_Movie_delete_part(movie, mocker):
# we need to reload this as there is a bug in part.delete
# See https://github.com/pkkid/python-plexapi/issues/201
m = movie.reload()
for part in m.iterParts():
with utils.callable_http_patch():
part.delete()
def test_video_Movie_iterParts(movie):
assert len(list(movie.iterParts())) >= 1
@ -72,6 +80,10 @@ def test_video_Movie_download(monkeydownload, tmpdir, movie):
assert len(filepaths2) >= 1
def test_video_Movie_subtitlestreams(movie):
assert not movie.subtitleStreams()
def test_video_Movie_attrs(movies):
movie = movies.get('Sita Sings the Blues')
assert len(movie.locations[0]) >= 10
@ -259,6 +271,19 @@ def test_video_Show(show):
assert show.title == 'Game of Thrones'
def test_video_Episode_split(episode, patched_http_call):
episode.split()
def test_video_Episode_unmatch(episode, patched_http_call):
episode.unmatch()
def test_video_Episode_stop(episode, mocker, patched_http_call):
mocker.patch.object(episode, 'session', return_value=list(mocker.MagicMock(id='hello')))
episode.stop(reason="It's past bedtime!")
def test_video_Show_attrs(show):
assert utils.is_datetime(show.addedAt)
assert utils.is_metadata(show.art, contains='/art/')