Improvements in tests process (#297)

* lets begin

* skip plexpass tests if there is not plexpass on account

* test new myplex attrubutes

* bootstrap: proper photos organisation

* fix rest of photos tests

* fix myplex new attributes test

* fix music bootstrap by setting agent to lastfm

* fix sync tests

* increase bootstrap timeout

* remove timeout from .travis.yml

* do not create playlist-style photoalbums in plex-bootstraptest.py

* allow negative filtering in LibrarySection.search()

* fix sync tests once again

* use sendCrashReports in test_settings

* fix test_settings

* fix test_video

* do not accept eula in bootstrap

* fix PlexServer.isLatest()

* add test against old version of PlexServer

* fix MyPlexAccount.OutOut

* add flag for one-time testing in Travis

* fix test_library onDeck tests

* fix more tests

* use tqdm in plex-bootstraptest for media scanning progress

* create sections one-by-one

* update docs on AlertListener for timeline entries

* fix plex-bootstraptest for server version 1.3.2

* display skip/xpass/xfail reasons

* fix tests on 1.3

* wait for music to be fully processed in plex-bootstraptest

* fix misplaced TEST_ACCOUNT_ONCE

* fix test_myplex_users, not sure if in proper-way

* add pytest-rerunfailures; mark test_myplex_optout as flaky

* fix comment

* Revert "add pytest-rerunfailures; mark test_myplex_optout as flaky"

This reverts commit 580e4c95a7.

* restart plex container on failure

* add conftest.wait_until() and used where some retries are required

* add more wait_until() usage in test_sync

* fix managed user search

* fix updating managed users in myplex

* allow to add new servers to existent users

* add new server to a shared user while bootstrapping

* add some docs on testing process

* perform few attemps when unable to get the claim token

* unlock websocket-client in requirements_dev

* fix docblock in tools/plex-teardowntest

* do not hardcode mediapart size in test_video

* remove cache:pip from travis

* Revert "unlock websocket-client in requirements_dev"

This reverts commit 0d536bd06d.

* remove debug from server.py

* improve webhook tests

* fix type() check to isinstance()

* remove excessive `else` branch due to Hellowlol advice

* add `unknown` as allowed `myPlexMappingState` in test_server
This commit is contained in:
Andrey Yantsen 2018-09-14 19:03:23 +01:00 committed by Hellowlol
parent afd4e24420
commit 68fc970d7a
23 changed files with 613 additions and 350 deletions

View file

@ -1,27 +1,72 @@
language:
- python
language: python
stages:
- test
- name: deploy
if: tag IS present
sudo: required
services:
- docker
python:
- '2.7'
- '3.4'
- '3.6'
- 2.7
- 3.4
- 3.6
env:
global:
- PLEXAPI_AUTH_SERVER_BASEURL=http://127.0.0.1:32400
matrix:
- PLEX_CONTAINER_TAG=latest
before_install:
- pip install --upgrade pip
- pip install --upgrade setuptools
- pip install --upgrade pytest pytest-cov coveralls
- pip install --upgrade pip
- pip install --upgrade setuptools
- pip install --upgrade pytest pytest-cov coveralls
install:
- pip install -r requirements_dev.txt
- pip install -r requirements_dev.txt
- PYTHONPATH="$PWD:$PYTHONPATH" python -u tools/plex-bootstraptest.py --destination plex --advertise-ip=127.0.0.1
--bootstrap-timeout 540 --docker-tag $PLEX_CONTAINER_TAG
script:
- if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then py.test tests --tb=native --verbose
--cov-config .coveragerc --cov=plexapi; fi
- flake8 plexapi --exclude=compat.py --max-line-length=120 --ignore=E128,E701,E702,E731,W293
- py.test tests -rxXs --ignore=tests/test_sync.py --tb=native --verbose --cov-config .coveragerc --cov=plexapi
- PLEXAPI_HEADER_PROVIDES='controller,sync-target' PLEXAPI_HEADER_PLATFORM=iOS PLEXAPI_HEADER_PLATFORM_VERSION=11.4.1
PLEXAPI_HEADER_DEVICE=iPhone py.test tests/test_sync.py -rxXs --tb=native --verbose --cov-config .coveragerc
--cov=plexapi --cov-append
after_success:
- coveralls
matrix:
fast_finish: true
deploy:
provider: pypi
user: mjs7231
password:
secure: UhuEN9GAp9zMEXdVTxSrbhfYf4HjTcj47l093Qh1HYKmZACxJM/+JkQCm7+oHPJpo7YDLk2we9oEsQ41maZBr9WgZI1lwR6m590M12vPhPI7NCVzINxJqebc0uZhCFsAFFKA3kzpRQbDfsBUG4yL/AzeMcvJMgIg3m07KRVhBywnnRhQ77trbBI0Io5MBzfW9PYDeGJqlNDBM7SbB4tK0udGZQT9wmFwvIoJODPDnM15Ry4vpkVNww/vVgyHklmnYlPzQgvhSMOXk0+MWlYtaKmu6uuLAiRccT1Fsmi1POKuFEq8S0Z7w4LmwxCVRaCvsZdNW5eXWgPDhZXNcLrKMwjgJt9Vj3VcD+NCywux/C1hTq7tecBocA13kzbgg4fd2sATOjQT5iaRPGrDtKm8e00hxr125n0StDxXdYGl2W5sH0LCkZE6Vq1GjXYjKFXZeTk3Fzav/3N8IxHBX3CliJB/vbloJ2mpz1kXL4UTORl9pghPyGOOq2yJPYSSWly/RsAD7UDrL1/lezaPSJGKbZJ0CMyfA83kd82/hgZflOuBuTcPHCZSU3zMCs0fsImZZxr6Qm1tbff+iyNS/ufoYgeVfsWhlEl9FoLv1g4HG6oA+uDHz+jKz9uSRHcGqD6P4JJK+H+yy0PeYfo7b6eSqFxgt8q8QfifUaCrVoCiY+c=
on:
tags: true
- coveralls
after_script:
- PYTHONPATH="$PWD:$PYTHONPATH" python -u tools/plex-teardowntest.py
jobs:
include:
- python: 3.6
env:
- PLEX_CONTAINER_TAG=1.3.2.3112-1751929
- TEST_ACCOUNT_ONCE=1
- python: 3.6
name: "Flake8"
install:
- pip install -r requirements_dev.txt
script: flake8 plexapi --exclude=compat.py --max-line-length=120 --ignore=E128,E701,E702,E731,W293
after_success: true
after_script: true
env:
- PLEX_CONTAINER_TAG=latest
- stage: deploy
name: "Deploy to PyPi"
python: 3.6
install: true
script: true
env:
- PLEX_CONTAINER_TAG=latest
deploy:
provider: pypi
user: mjs7231
password:
secure: UhuEN9GAp9zMEXdVTxSrbhfYf4HjTcj47l093Qh1HYKmZACxJM/+JkQCm7+oHPJpo7YDLk2we9oEsQ41maZBr9WgZI1lwR6m590M12vPhPI7NCVzINxJqebc0uZhCFsAFFKA3kzpRQbDfsBUG4yL/AzeMcvJMgIg3m07KRVhBywnnRhQ77trbBI0Io5MBzfW9PYDeGJqlNDBM7SbB4tK0udGZQT9wmFwvIoJODPDnM15Ry4vpkVNww/vVgyHklmnYlPzQgvhSMOXk0+MWlYtaKmu6uuLAiRccT1Fsmi1POKuFEq8S0Z7w4LmwxCVRaCvsZdNW5eXWgPDhZXNcLrKMwjgJt9Vj3VcD+NCywux/C1hTq7tecBocA13kzbgg4fd2sATOjQT5iaRPGrDtKm8e00hxr125n0StDxXdYGl2W5sH0LCkZE6Vq1GjXYjKFXZeTk3Fzav/3N8IxHBX3CliJB/vbloJ2mpz1kXL4UTORl9pghPyGOOq2yJPYSSWly/RsAD7UDrL1/lezaPSJGKbZJ0CMyfA83kd82/hgZflOuBuTcPHCZSU3zMCs0fsImZZxr6Qm1tbff+iyNS/ufoYgeVfsWhlEl9FoLv1g4HG6oA+uDHz+jKz9uSRHcGqD6P4JJK+H+yy0PeYfo7b6eSqFxgt8q8QfifUaCrVoCiY+c=
on:
tags: true

View file

@ -131,6 +131,68 @@ Usage Examples
print(playlist.title)
Running tests over PlexAPI
--------------------------
In order to test the PlexAPI library you have to prepare a Plex Server instance with following libraries:
1. Movies section (agent `com.plexapp.agents.imdb`) containing both movies:
* Sintel - https://durian.blender.org/
* Elephants Dream - https://orange.blender.org/
* Sita Sings the Blues - http://www.sitasingstheblues.com/
* Big Buck Bunny - https://peach.blender.org/
2. TV Show section (agent `com.plexapp.agents.thetvdb`) containing the shows:
* Game of Thrones (Season 1 and 2)
* The 100 (Seasons 1 and 2)
* (or symlink the above movies with proper names)
3. Music section (agent `com.plexapp.agents.lastfm`) containing the albums:
* Infinite State - Unmastered Impulses - https://github.com/kennethreitz/unmastered-impulses
* Broke For Free - Layers - http://freemusicarchive.org/music/broke_for_free/Layers/
4. A Photos section (any agent) containing the photoalbums (photoalbum is just a folder on your disk):
* `Cats`
* Within `Cats` album you need to place 3 photos (cute cat photos, of course)
* Within `Cats` album you should place 3 more photoalbums (one of them should be named `Cats in bed`,
names of others doesn't matter)
* Within `Cats in bed` you need to place 7 photos
* Within other 2 albums you should place 1 photo in each
Instead of manual creation of the library you could use a script `tools/plex-boostraptest.py` with appropriate
arguments and add this new server to a shared user which username is defined in environment veriable `SHARED_USERNAME`.
It uses `official docker image`_ to create a proper instance.
Also in order to run most of the tests you have to provide some environment variables:
* `PLEXAPI_AUTH_SERVER_BASEURL` containing an URL to your Plex instance, e.g. `http://127.0.0.1:32400` (without trailing
slash)
* `PLEXAPI_AUTH_MYPLEX_USERNAME` and `PLEXAPI_AUTH_MYPLEX_PASSWORD` with your MyPlex username and password accordingly
After this step you can run tests with following command:
.. code-block:: bash
py.test tests -rxXs --ignore=tests/test_sync.py
Some of the tests in main test-suite require a shared user in your account (e.g. `test_myplex_users`,
`test_myplex_updateFriend`, etc.), you need to provide a valid shared user's username to get them running you need to
provide the username of the shared user as an environment variable `SHARED_USERNAME`. You can enable a Guest account and
simply pass `Guest` as `SHARED_USERNAME` (or just create a user like `plexapitest` and play with it).
To be able to run tests over Mobile Sync api you have to some some more environment variables, to following values
exactly:
* PLEXAPI_HEADER_PROVIDES='controller,sync-target'
* PLEXAPI_HEADER_PLATFORM=iOS
* PLEXAPI_HEADER_PLATFORM_VERSION=11.4.1
* PLEXAPI_HEADER_DEVICE=iPhone
And finally run the sync-related tests:
.. code-block:: bash
py.test tests/test_sync.py -rxXs
.. _official docker image: https://hub.docker.com/r/plexinc/pms-docker/
Common Questions
----------------

View file

@ -12,6 +12,18 @@ class AlertListener(threading.Thread):
alerts you must call .start() on the object once it's created. When calling
`PlexServer.startAlertListener()`, the thread will be started for you.
Known `state`-values for timeline entries, with identifier=`com.plexapp.plugins.library`:
:0: The item was created
:1: Reporting progress on item processing
:2: Matching the item
:3: Downloading the metadata
:4: Processing downloaded metadata
:5: The item processed
:9: The item deleted
When metadata agent is not set for the library processing ends with state=1.
Parameters:
server (:class:`~plexapi.server.PlexServer`): PlexServer this listener is connected to.
callback (func): Callback function to call on recieved messages. The callback function

View file

@ -470,8 +470,8 @@ class LibrarySection(PlexObject):
libtype (str): Filter results to a spcifiec libtype (movie, show, episode, artist,
album, track; optional).
**kwargs (dict): Any of the available filters for the current library section. Partial string
matches allowed. Multiple matches OR together. All inputs will be compared with the
available options and a warning logged if the option does not appear valid.
matches allowed. Multiple matches OR together. Negative filtering also possible, just add an
exclamation mark to the end of filter name, e.g. `resolution!=1x1`.
* unwatched: Display or hide unwatched content (True, False). [all]
* duplicate: Display or hide duplicate items (True, False). [movie]
@ -486,6 +486,9 @@ class LibrarySection(PlexObject):
* resolution: List of video resolutions to search within ([resolution_or_key, ...]). [movie]
* studio: List of studios to search within ([studio_or_key, ...]). [music]
* year: List of years to search within ([yyyy, ...]). [all]
Raises:
:class:`plexapi.exceptions.BadRequest`: when applying unknown filter
"""
# cleanup the core arguments
args = {}
@ -510,7 +513,10 @@ class LibrarySection(PlexObject):
def _cleanSearchFilter(self, category, value, libtype=None):
# check a few things before we begin
if category not in self.ALLOWED_FILTERS:
if category.endswith('!'):
if category[:-1] not in self.ALLOWED_FILTERS:
raise BadRequest('Unknown filter category: %s' % category[:-1])
elif category not in self.ALLOWED_FILTERS:
raise BadRequest('Unknown filter category: %s' % category)
if category in self.BOOLEAN_FILTERS:
return '1' if value else '0'
@ -839,12 +845,12 @@ class PhotoSection(LibrarySection):
Attributes:
ALLOWED_FILTERS (list<str>): List of allowed search filters. ('all', 'iso',
'make', 'lens', 'aperture', 'exposure')
'make', 'lens', 'aperture', 'exposure', 'device', 'resolution')
ALLOWED_SORT (list<str>): List of allowed sorting keys. ('addedAt')
TAG (str): 'Directory'
TYPE (str): 'photo'
"""
ALLOWED_FILTERS = ('all', 'iso', 'make', 'lens', 'aperture', 'exposure')
ALLOWED_FILTERS = ('all', 'iso', 'make', 'lens', 'aperture', 'exposure', 'device', 'resolution')
ALLOWED_SORT = ('addedAt',)
TAG = 'Directory'
TYPE = 'photo'
@ -853,13 +859,11 @@ class PhotoSection(LibrarySection):
def searchAlbums(self, title, **kwargs):
""" Search for an album. See :func:`~plexapi.library.LibrarySection.search()` for usage. """
key = '/library/sections/%s/all?type=14' % self.key
return self.fetchItems(key, title=title)
return self.search(libtype='photoalbum', title=title, **kwargs)
def searchPhotos(self, title, **kwargs):
""" Search for a photo. See :func:`~plexapi.library.LibrarySection.search()` for usage. """
key = '/library/sections/%s/all?type=13' % self.key
return self.fetchItems(key, title=title)
return self.search(libtype='photo', title=title, **kwargs)
def sync(self, resolution, limit=None, **kwargs):
""" Add current Music library section as sync item for specified device.

View file

@ -263,7 +263,7 @@ class MyPlexAccount(PlexObject):
# Update friend servers
response_filters = ''
response_servers = ''
user = self.user(user.username if isinstance(user, MyPlexUser) else user)
user = user if isinstance(user, MyPlexUser) else self.user(user)
machineId = server.machineIdentifier if isinstance(server, PlexServer) else server
sectionIds = self._getSectionIds(machineId, sections)
headers = {'Content-Type': 'application/json'}
@ -278,7 +278,7 @@ class MyPlexAccount(PlexObject):
"invited_id": user.id}}
url = self.FRIENDINVITE.format(machineId=machineId)
# Remove share sections, add shares to user without shares, or update shares
if sectionIds:
if not user_servers or sectionIds:
if removeSections is True:
response_servers = self.query(url, self._session.delete, json=params, headers=headers)
elif 'invited_id' in params.get('shared_server', ''):
@ -376,7 +376,8 @@ class MyPlexAccount(PlexObject):
def setWebhooks(self, urls):
log.info('Setting webhooks: %s' % urls)
data = self.query(self.WEBHOOKS, self._session.post, data={'urls[]': urls})
data = {'urls[]': urls} if len(urls) else {'urls': ''}
data = self.query(self.WEBHOOKS, self._session.post, data=data)
self._webhooks = self.listAttrs(data, 'url', etag='webhook')
return self._webhooks
@ -395,7 +396,7 @@ class MyPlexAccount(PlexObject):
if library is not None:
params['optOutLibraryStats'] = int(library)
url = 'https://plex.tv/api/v2/user/privacy'
return self.query(url, method=self._session.put, params=params)
return self.query(url, method=self._session.put, data=params)
def syncItems(self, client=None, clientId=None):
""" Returns an instance of :class:`plexapi.sync.SyncList` for specified client.

View file

@ -154,7 +154,7 @@ class Photo(PlexPartialObject):
sync_item.metadataType = self.METADATA_TYPE
sync_item.machineIdentifier = self._server.machineIdentifier
section = self._server.library.sectionByID(self.librarySectionID)
section = self.section()
sync_item.location = 'library://%s/item/%s' % (section.uuid, quote_plus(self.key))
sync_item.policy = Policy.create(limit)

View file

@ -168,8 +168,8 @@ class Playlist(PlexPartialObject, Playable):
:mod:`plexapi.sync` module. Used only when playlist contains video.
photoResolution (str): maximum allowed resolution for synchronized photos, see PHOTO_QUALITY_* values in
the module :mod:`plexapi.sync`. Used only when playlist contains photos.
audioBitrate (int): maximum bitrate for synchronized music, better use one of MUSIC_BITRATE_* values from the
module :mod:`plexapi.sync`. Used only when playlist contains audio.
audioBitrate (int): maximum bitrate for synchronized music, better use one of MUSIC_BITRATE_* values
from the module :mod:`plexapi.sync`. Used only when playlist contains audio.
client (:class:`plexapi.myplex.MyPlexDevice`): sync destination, see
:func:`plexapi.myplex.MyPlexAccount.sync`.
clientId (str): sync destination, see :func:`plexapi.myplex.MyPlexAccount.sync`.

View file

@ -290,12 +290,14 @@ class PlexServer(PlexObject):
part = '/updater/check?download=%s' % (1 if download else 0)
if force:
self.query(part, method=self._session.put)
return self.fetchItem('/updater/status')
releases = self.fetchItems('/updater/status')
if len(releases):
return releases[0]
def isLatest(self):
""" Check if the installed version of PMS is the latest. """
release = self.check_for_update(force=True)
return bool(release.version == self.version)
return release is None
def installUpdate(self):
""" Install the newest version of Plex Media Server. """

View file

@ -99,7 +99,7 @@ class Setting(PlexObject):
group (str): Group name this setting is categorized as.
enumValues (list,dict): List or dictionary of valis values for this setting.
"""
_bool_cast = lambda x: True if x == 'true' else False
_bool_cast = lambda x: True if x == 'true' or x == '1' else False
_bool_str = lambda x: str(x).lower()
TYPES = {
'bool': {'type': bool, 'cast': _bool_cast, 'tostr': _bool_str},

View file

@ -14,9 +14,9 @@ from plexapi.exceptions import NotFound
# Search Types - Plex uses these to filter specific media types when searching.
# Library Types - Populated at runtime
SEARCHTYPES = {'movie': 1, 'show': 2, 'season': 3, 'episode': 4,
'artist': 8, 'album': 9, 'track': 10, 'photo': 14,
'collection': 18}
SEARCHTYPES = {'movie': 1, 'show': 2, 'season': 3, 'episode': 4, 'trailer': 5, 'comic': 6, 'person': 7,
'artist': 8, 'album': 9, 'track': 10, 'picture': 11, 'clip': 12, 'photo': 13, 'photoalbum': 14,
'playlist': 15, 'playlistFolder': 16, 'collection': 18, 'userPlaylistItem': 1001}
PLEXOBJECTS = {}

View file

@ -323,7 +323,7 @@ class Show(Video):
def seasons(self, **kwargs):
""" Returns a list of :class:`~plexapi.video.Season` objects. """
key = '/library/metadata/%s/children' % self.ratingKey
key = '/library/metadata/%s/children?excludeAllLeaves=1' % self.ratingKey
return self.fetchItems(key, **kwargs)
def season(self, title=None):

View file

@ -15,4 +15,4 @@ sphinx
sphinx-rtd-theme
sphinxcontrib-napoleon
tqdm
websocket-client
websocket-client==0.48.0

View file

@ -1,25 +1,14 @@
# -*- coding: utf-8 -*-
# Running these tests requires a few things in your Plex Library.
# 1. Movies section containing both movies:
# * Sintel - https://durian.blender.org/
# * Elephants Dream - https://orange.blender.org/
# * Sita Sings the Blues - http://www.sitasingstheblues.com/
# * Big Buck Bunny - https://peach.blender.org/
# 2. TV Show section containing the shows:
# * Game of Thrones (Season 1 and 2)
# * The 100 (Seasons 1 and 2)
# * (or symlink the above movies with proper names)
# 3. Music section containing the albums:
# Infinite State - Unmastered Impulses - https://github.com/kennethreitz/unmastered-impulses
# Broke For Free - Layers - http://freemusicarchive.org/music/broke_for_free/Layers/
# 4. A Photos section containing the photoalbums:
# Cats (with cute cat photos inside)
import time
from datetime import datetime
from functools import partial
from os import environ
import pytest
import requests
from plexapi.myplex import MyPlexAccount
try:
from unittest.mock import patch, MagicMock
except ImportError:
@ -34,7 +23,8 @@ from plexapi.server import PlexServer
SERVER_BASEURL = plexapi.CONFIG.get('auth.server_baseurl')
SERVER_TOKEN = plexapi.CONFIG.get('auth.server_token')
MYPLEX_USERNAME = plexapi.CONFIG.get('auth.myplex_username')
MYPLEX_PASSWORD = plexapi.CONFIG.get('auth.myplex_password')
CLIENT_BASEURL = plexapi.CONFIG.get('auth.client_baseurl')
CLIENT_TOKEN = plexapi.CONFIG.get('auth.client_token')
@ -47,9 +37,10 @@ AUDIOLAYOUTS = {'5.1', '5.1(side)', 'stereo'}
CODECS = {'aac', 'ac3', 'dca', 'h264', 'mp3', 'mpeg4'}
CONTAINERS = {'avi', 'mp4', 'mkv'}
CONTENTRATINGS = {'TV-14', 'TV-MA', 'G', 'NR'}
FRAMERATES = {'24p', 'PAL'}
FRAMERATES = {'24p', 'PAL', 'NTSC'}
PROFILES = {'advanced simple', 'main', 'constrained baseline'}
RESOLUTIONS = {'sd', '480', '576', '720', '1080'}
ENTITLEMENTS = {'ios', 'cpms', 'roku', 'android', 'xbox_one', 'xbox_360', 'windows', 'windows_phone'}
def pytest_addoption(parser):
@ -65,31 +56,43 @@ def pytest_runtest_setup(item):
# Fixtures
# ---------------------------------
@pytest.fixture()
@pytest.fixture(scope='session')
def account():
return plex().myPlexAccount()
# assert MYPLEX_USERNAME, 'Required MYPLEX_USERNAME not specified.'
# assert MYPLEX_PASSWORD, 'Required MYPLEX_PASSWORD not specified.'
# return MyPlexAccount(MYPLEX_USERNAME, MYPLEX_PASSWORD)
@pytest.fixture()
def account_synctarget():
assert 'sync-target' in plexapi.X_PLEX_PROVIDES, 'You have to set env var ' \
'PLEXAPI_HEADER_PROVIDES=sync-target,controller'
assert 'sync-target' in plexapi.BASE_HEADERS['X-Plex-Provides']
assert 'iOS' == plexapi.X_PLEX_PLATFORM, 'You have to set env var PLEXAPI_HEADER_PLATORM=iOS'
assert '11.4.1' == plexapi.X_PLEX_PLATFORM_VERSION, 'You have to set env var PLEXAPI_HEADER_PLATFORM_VERSION=11.4.1'
assert 'iPhone' == plexapi.X_PLEX_DEVICE, 'You have to set env var PLEXAPI_HEADER_DEVICE=iPhone'
return plex().myPlexAccount()
assert MYPLEX_USERNAME, 'Required MYPLEX_USERNAME not specified.'
assert MYPLEX_PASSWORD, 'Required MYPLEX_PASSWORD not specified.'
return MyPlexAccount()
@pytest.fixture(scope='session')
def plex():
def account_once(account):
if environ.get('TEST_ACCOUNT_ONCE') != '1' and environ.get('CI') == 'true':
pytest.skip('Do not forget to test this by providing TEST_ACCOUNT_ONCE=1')
return account
@pytest.fixture(scope='session')
def account_plexpass(account):
if not account.subscriptionActive:
pytest.skip('PlexPass subscription is not active, unable to test sync-stuff, be careful!')
return account
@pytest.fixture(scope='session')
def account_synctarget(account_plexpass):
assert 'sync-target' in plexapi.X_PLEX_PROVIDES, 'You have to set env var ' \
'PLEXAPI_HEADER_PROVIDES=sync-target,controller'
assert 'sync-target' in plexapi.BASE_HEADERS['X-Plex-Provides']
assert 'iOS' == plexapi.X_PLEX_PLATFORM, 'You have to set env var PLEXAPI_HEADER_PLATFORM=iOS'
assert '11.4.1' == plexapi.X_PLEX_PLATFORM_VERSION, 'You have to set env var PLEXAPI_HEADER_PLATFORM_VERSION=11.4.1'
assert 'iPhone' == plexapi.X_PLEX_DEVICE, 'You have to set env var PLEXAPI_HEADER_DEVICE=iPhone'
return account_plexpass
@pytest.fixture(scope='session')
def plex(account):
assert SERVER_BASEURL, 'Required SERVER_BASEURL not specified.'
assert SERVER_TOKEN, 'Requred SERVER_TOKEN not specified.'
session = requests.Session()
return PlexServer(SERVER_BASEURL, SERVER_TOKEN, session=session)
return PlexServer(SERVER_BASEURL, account.authenticationToken, session=session)
@pytest.fixture()
@ -186,6 +189,18 @@ def photoalbum(photos):
return photos.get('photo_album1')
@pytest.fixture()
def shared_username(account):
username = environ.get('SHARED_USERNAME', 'PKKid')
for user in account.users():
if user.title.lower() == username.lower():
return username
elif (user.username and user.email and user.id and username.lower() in
(user.username.lower(), user.email.lower(), str(user.id))):
return username
pytest.skip('Shared user %s wasn`t found in your MyPlex account' % username)
@pytest.fixture()
def monkeydownload(request, monkeypatch):
monkeypatch.setattr('plexapi.utils.download', partial(plexapi.utils.download, mocked=True))
@ -254,3 +269,17 @@ def is_string(value, gte=1):
def is_thumb(key):
return is_metadata(key, contains='/thumb/')
def wait_until(condition_function, delay=0.25, timeout=1, *args, **kwargs):
start = time.time()
ready = condition_function(*args, **kwargs)
retries = 1
while not ready and time.time() - start < timeout:
retries += 1
time.sleep(delay)
ready = condition_function(*args, **kwargs)
assert ready, 'Wait timeout after %d retries, %.2f seconds' % (retries, time.time() - start)
return ready

View file

@ -56,8 +56,10 @@ def test_library_fetchItem(plex, movie):
assert item1 == item2 == movie
def test_library_onDeck(plex):
def test_library_onDeck(plex, movie):
movie.updateProgress(movie.duration * 1000 / 10) # set progress to 10%
assert len(list(plex.library.onDeck()))
movie.markUnwatched()
def test_library_recentlyAdded(plex):
@ -140,8 +142,13 @@ def test_librarty_deleteMediaPreviews(movies):
movies.deleteMediaPreviews()
def test_library_MovieSection_onDeck(movies, tvshows):
assert len(movies.onDeck()) + len(tvshows.onDeck())
def test_library_MovieSection_onDeck(movie, movies, tvshows, episode):
movie.updateProgress(movie.duration * 1000 / 10) # set progress to 10%
assert movies.onDeck()
movie.markUnwatched()
episode.markWatched()
assert tvshows.onDeck()
episode.markUnwatched()
def test_library_MovieSection_recentlyAdded(movies):

View file

@ -54,8 +54,10 @@ def test_myplex_devices(account):
assert devices, 'No devices found for account: %s' % account.name
def test_myplex_device(account):
assert account.device('pkkid-plexapi')
def test_myplex_device(account, plex):
from plexapi import X_PLEX_DEVICE_NAME
assert account.device(plex.friendlyName)
assert account.device(X_PLEX_DEVICE_NAME)
def _test_myplex_connect_to_device(account):
@ -69,52 +71,55 @@ def _test_myplex_connect_to_device(account):
def test_myplex_users(account):
users = account.users()
assert users, 'Found no users on account: %s' % account.name
if not len(users):
return pytest.skip('You have to add a shared account into your MyPlex')
print('Found %s users.' % len(users))
user = account.user(users[0].title)
print('Found user: %s' % user)
assert user, 'Could not find user %s' % users[0].title
assert len(users[0].servers[0].sections()) == 10, "Could'nt info about the shared libraries"
assert len(users[0].servers[0].sections()) > 0, "Couldn't info about the shared libraries"
def test_myplex_resource(account):
assert account.resource('pkkid-plexapi')
def test_myplex_resource(account, plex):
assert account.resource(plex.friendlyName)
def test_myplex_webhooks(account):
# Webhooks are a plex pass feature to this will fail
with pytest.raises(BadRequest):
account.webhooks()
if account.subscriptionActive:
assert isinstance(account.webhooks(), list)
else:
with pytest.raises(BadRequest):
account.webhooks()
def test_myplex_addwebhooks(account):
with pytest.raises(BadRequest):
account.addWebhook('http://site.com')
if account.subscriptionActive:
assert 'http://example.com' in account.addWebhook('http://example.com')
else:
with pytest.raises(BadRequest):
account.addWebhook('http://example.com')
def test_myplex_deletewebhooks(account):
with pytest.raises(BadRequest):
account.deleteWebhook('http://site.com')
if account.subscriptionActive:
assert 'http://example.com' not in account.deleteWebhook('http://example.com')
else:
with pytest.raises(BadRequest):
account.deleteWebhook('http://example.com')
def test_myplex_optout(account):
def test_myplex_optout(account_once):
def enabled():
ele = account.query('https://plex.tv/api/v2/user/privacy')
ele = account_once.query('https://plex.tv/api/v2/user/privacy')
lib = ele.attrib.get('optOutLibraryStats')
play = ele.attrib.get('optOutPlayback')
return bool(int(lib)), bool(int(play))
# This should be False False
library_enabled, playback_enabled = enabled()
account.optOut(library=True, playback=True)
assert all(enabled())
account.optOut(library=False, playback=False)
assert not all(enabled())
account_once.optOut(library=True, playback=True)
utils.wait_until(lambda: enabled() == (True, True))
account_once.optOut(library=False, playback=False)
utils.wait_until(lambda: enabled() == (False, False))
def test_myplex_inviteFriend_remove(account, plex, mocker):
@ -137,18 +142,26 @@ def test_myplex_inviteFriend_remove(account, plex, mocker):
account.removeFriend(inv_user)
def test_myplex_updateFriend(account, plex, mocker):
edit_user = 'PKKid'
def test_myplex_updateFriend(account, plex, mocker, shared_username):
vid_filter = {'contentRating': ['G'], 'label': ['foo']}
secs = plex.library.sections()
user = account.user(edit_user)
user = account.user(shared_username)
ids = account._getSectionIds(plex.machineIdentifier, secs)
with mocker.patch.object(account, '_getSectionIds', return_value=ids):
with mocker.patch.object(account, 'user', return_value=user):
with utils.callable_http_patch():
account.updateFriend(edit_user, plex, secs, allowSync=True, removeSections=True,
allowCameraUpload=True, allowChannels=False, filterMovies=vid_filter,
filterTelevision=vid_filter, filterMusic={'label': ['foo']})
account.updateFriend(shared_username, plex, secs, allowSync=True, removeSections=True,
allowCameraUpload=True, allowChannels=False, filterMovies=vid_filter,
filterTelevision=vid_filter, filterMusic={'label': ['foo']})
def test_myplex_plexpass_attributes(account_plexpass):
assert account_plexpass.subscriptionActive
assert account_plexpass.subscriptionStatus == 'Active'
assert account_plexpass.subscriptionPlan
assert 'sync' in account_plexpass.subscriptionFeatures
assert 'premium_music_metadata' in account_plexpass.subscriptionFeatures
assert 'plexpass' in account_plexpass.roles
assert set(account_plexpass.entitlements) == utils.ENTITLEMENTS

View file

@ -5,5 +5,5 @@ def test_photo_Photoalbum(photoalbum):
assert len(photoalbum.photos()) == 3
cats_in_bed = photoalbum.album('Cats in bed')
assert len(cats_in_bed.photos()) == 7
a_pic = cats_in_bed.photo('maxresdefault')
a_pic = cats_in_bed.photo('photo7')
assert a_pic

View file

@ -99,12 +99,12 @@ def test_playqueues(plex):
assert playqueue.playQueueID, 'Play queue ID not set.'
def test_copyToUser(plex, show, fresh_plex):
def test_copyToUser(plex, show, fresh_plex, shared_username):
episodes = show.episodes()
playlist = plex.createPlaylist('shared_from_test_plexapi', episodes)
try:
playlist.copyToUser('PKKid')
user = plex.myPlexAccount().user('PKKid')
playlist.copyToUser(shared_username)
user = plex.myPlexAccount().user(shared_username)
user_plex = fresh_plex(plex._baseurl, user.get_token(plex.machineIdentifier))
assert playlist.title in [p.title for p in user_plex.playlists()]
finally:

View file

@ -8,18 +8,19 @@ from requests import Session
from . import conftest as utils
def test_server_attr(plex):
def test_server_attr(plex, account):
assert plex._baseurl == utils.SERVER_BASEURL
assert len(plex.friendlyName) >= 1
assert len(plex.machineIdentifier) == 40
assert plex.myPlex is True
assert plex.myPlexMappingState == 'mapped'
# if you run the tests very shortly after server creation the state in rare cases may be `unknown`
assert plex.myPlexMappingState in ('mapped', 'unknown')
assert plex.myPlexSigninState == 'ok'
assert utils.is_int(plex.myPlexSubscription, gte=0)
assert re.match(utils.REGEX_EMAIL, plex.myPlexUsername)
assert plex.platform in ('Linux', 'Windows')
assert len(plex.platformVersion) >= 5
assert plex._token == utils.SERVER_TOKEN
assert plex._token == account.authenticationToken
assert utils.is_int(plex.transcoderActiveVideoSessions, gte=0)
assert utils.is_datetime(plex.updatedAt)
assert len(plex.version) >= 5
@ -30,10 +31,7 @@ def test_server_alert_listener(plex, movies):
messages = []
listener = plex.startAlertListener(messages.append)
movies.refresh()
starttime, runtime = time.time(), 0
while len(messages) < 3 and runtime <= 30:
time.sleep(1)
runtime = int(time.time() - starttime)
utils.wait_until(lambda: len(messages) >= 3, delay=1, timeout=30)
assert len(messages) >= 3
finally:
listener.stop()
@ -116,9 +114,11 @@ def test_server_playlists(plex, show):
playlist.delete()
def test_server_history(plex):
def test_server_history(plex, movie):
movie.markWatched()
history = plex.history()
assert len(history)
movie.markUnwatched()
def test_server_Server_query(plex):
@ -131,14 +131,14 @@ def test_server_Server_query(plex):
PlexServer(utils.SERVER_BASEURL, '1234')
def test_server_Server_session():
def test_server_Server_session(account):
# Mock Sesstion
class MySession(Session):
def __init__(self):
super(self.__class__, self).__init__()
self.plexapi_session_test = True
# Test Code
plex = PlexServer(utils.SERVER_BASEURL, utils.SERVER_TOKEN, session=MySession())
plex = PlexServer(utils.SERVER_BASEURL, account.authenticationToken, session=MySession())
assert hasattr(plex._session, 'plexapi_session_test')
@ -165,7 +165,12 @@ def test_server_sessions(plex):
def test_server_isLatest(plex, mocker):
plex.isLatest()
from os import environ
is_latest = plex.isLatest()
if environ.get('PLEX_CONTAINER_TAG') and environ['PLEX_CONTAINER_TAG'] != 'latest':
assert not is_latest
else:
return pytest.skip('Do not forget to run with PLEX_CONTAINER_TAG != latest to ensure that update is available')
def test_server_installUpdate(plex, mocker):
@ -225,13 +230,20 @@ def test_server_account(plex):
# assert account.mappingError == 'publisherror'
assert account.mappingErrorMessage is None
assert account.mappingState == 'mapped'
assert re.match(utils.REGEX_IPADDR, account.privateAddress)
assert int(account.privatePort) >= 1000
assert re.match(utils.REGEX_IPADDR, account.publicAddress)
assert int(account.publicPort) >= 1000
if account.mappingError != 'unreachable':
assert re.match(utils.REGEX_IPADDR, account.privateAddress)
assert int(account.privatePort) >= 1000
assert re.match(utils.REGEX_IPADDR, account.publicAddress)
assert int(account.publicPort) >= 1000
else:
assert account.privateAddress == ''
assert int(account.privatePort) == 0
assert account.publicAddress == ''
assert int(account.publicPort) == 0
assert account.signInState == 'ok'
assert isinstance(account.subscriptionActive, bool)
if account.subscriptionActive: assert len(account.subscriptionFeatures)
if account.subscriptionActive:
assert len(account.subscriptionFeatures)
# Below check keeps failing.. it should go away.
# else: assert sorted(account.subscriptionFeatures) == ['adaptive_bitrate',
# 'download_certificates', 'federated-auth', 'news']

View file

@ -9,9 +9,11 @@ def test_settings_get(plex):
assert plex.settings.get('FriendlyName').value == ''
def test_settings_get(plex):
cd = plex.settings.get('collectUsageData')
cd.set(False)
# Save works but since we reload asap the data isnt changed.
# or it might be our caching that does this. ## TODO
def test_settings_set(plex):
cd = plex.settings.get('autoEmptyTrash')
old_value = cd.value
new_value = not old_value
cd.set(new_value)
plex.settings.save()
plex._settings = None
assert plex.settings.get('autoEmptyTrash').value == new_value

View file

@ -1,48 +1,41 @@
from time import sleep, time
import pytest
from plexapi.exceptions import BadRequest
from . import conftest as utils
from plexapi.sync import VIDEO_QUALITY_3_MBPS_720p, AUDIO_BITRATE_192_KBPS, PHOTO_QUALITY_MEDIUM
def ensure_sync_item(device, sync_item, timeout=3):
start = time()
while time() - start < timeout:
sync_list = device.syncItems()
for item in sync_list.items:
if item.id == sync_item.id:
return item
sleep(0.5)
assert False, 'Failed to ensure that required sync_item is exist'
def get_sync_item_from_server(device, sync_item):
sync_list = device.syncItems()
for item in sync_list.items:
if item.id == sync_item.id:
return item
def ensure_sync_item_missing(device, sync_item, timeout=3):
start = time()
ret = None
while time() - start < timeout:
sync_list = device.syncItems()
for item in sync_list.items:
if item.id == sync_item.id:
ret = item
if ret:
sleep(0.5)
else:
break
assert not ret, 'Failed to ensure that required sync_item is missing'
def is_sync_item_missing(device, sync_item):
return not get_sync_item_from_server(device, sync_item)
def test_current_device_got_sync_target(clear_sync_device):
assert 'sync-target' in clear_sync_device.provides
def get_media(item, server):
try:
return item.getMedia()
except BadRequest as e:
if 'not_found' in str(e):
server.refreshSync()
return None
else:
raise
def test_add_movie_to_sync(clear_sync_device, movie):
new_item = movie.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
movie._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
media_list = item.getMedia()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=movie._server)
assert len(media_list) == 1
assert media_list[0].ratingKey == movie.ratingKey
@ -50,19 +43,21 @@ def test_add_movie_to_sync(clear_sync_device, movie):
def test_delete_sync_item(clear_sync_device, movie):
new_item = movie.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
movie._server.refreshSync()
new_item_in_myplex = ensure_sync_item(clear_sync_device, new_item)
new_item_in_myplex = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
sync_items = clear_sync_device.syncItems()
for item in sync_items.items:
item.delete()
ensure_sync_item_missing(clear_sync_device, new_item_in_myplex)
utils.wait_until(is_sync_item_missing, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item_in_myplex)
def test_add_show_to_sync(clear_sync_device, show):
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
show._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
episodes = show.episodes()
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
@ -71,9 +66,10 @@ def test_add_season_to_sync(clear_sync_device, show):
season = show.season('Season 1')
new_item = season.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
season._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
episodes = season.episodes()
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=season._server)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
@ -81,8 +77,9 @@ def test_add_season_to_sync(clear_sync_device, show):
def test_add_episode_to_sync(clear_sync_device, episode):
new_item = episode.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
episode._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
media_list = item.getMedia()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=episode._server)
assert 1 == len(media_list)
assert episode.ratingKey == media_list[0].ratingKey
@ -91,14 +88,15 @@ def test_limited_watched(clear_sync_device, show):
show.markUnwatched()
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, limit=5, unwatched=False)
show._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
episodes = show.episodes()[:5]
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
assert 5 == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
episodes[0].markWatched()
show._server.refreshSync()
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
assert 5 == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
@ -107,15 +105,16 @@ def test_limited_unwatched(clear_sync_device, show):
show.markUnwatched()
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, limit=5, unwatched=True)
show._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
episodes = show.episodes(viewCount=0)[:5]
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
episodes[0].markWatched()
show._server.refreshSync()
episodes = show.episodes(viewCount=0)[:5]
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
@ -124,15 +123,16 @@ def test_unlimited_and_watched(clear_sync_device, show):
show.markUnwatched()
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, unwatched=False)
show._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
episodes = show.episodes()
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
episodes[0].markWatched()
show._server.refreshSync()
episodes = show.episodes()
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
@ -141,15 +141,16 @@ def test_unlimited_and_unwatched(clear_sync_device, show):
show.markUnwatched()
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, unwatched=True)
show._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
episodes = show.episodes(viewCount=0)
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
episodes[0].markWatched()
show._server.refreshSync()
episodes = show.episodes(viewCount=0)
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
@ -157,9 +158,10 @@ def test_unlimited_and_unwatched(clear_sync_device, show):
def test_add_music_artist_to_sync(clear_sync_device, artist):
new_item = artist.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
artist._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
tracks = artist.tracks()
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=artist._server)
assert len(tracks) == len(media_list)
assert [t.ratingKey for t in tracks] == [m.ratingKey for m in media_list]
@ -167,9 +169,10 @@ def test_add_music_artist_to_sync(clear_sync_device, artist):
def test_add_music_album_to_sync(clear_sync_device, album):
new_item = album.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
album._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
tracks = album.tracks()
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=album._server)
assert len(tracks) == len(media_list)
assert [t.ratingKey for t in tracks] == [m.ratingKey for m in media_list]
@ -177,20 +180,20 @@ def test_add_music_album_to_sync(clear_sync_device, album):
def test_add_music_track_to_sync(clear_sync_device, track):
new_item = track.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
track._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
media_list = item.getMedia()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=track._server)
assert 1 == len(media_list)
assert track.ratingKey == media_list[0].ratingKey
def test_add_photo_to_sync(clear_sync_device, photos):
photo = photos.all()[0]
if not hasattr(photo, 'librarySectionID'):
pytest.skip('Photos are not ready for individual synchronization yet')
def test_add_photo_to_sync(clear_sync_device, photoalbum):
photo = photoalbum.photo('photo1')
new_item = photo.sync(PHOTO_QUALITY_MEDIUM, client=clear_sync_device)
photo._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
media_list = item.getMedia()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=photo._server)
assert 1 == len(media_list)
assert photo.ratingKey == media_list[0].ratingKey
@ -198,9 +201,10 @@ def test_add_photo_to_sync(clear_sync_device, photos):
def test_sync_entire_library_movies(clear_sync_device, movies):
new_item = movies.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
movies._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
section_content = movies.all()
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=movies._server)
assert len(section_content) == len(media_list)
assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list]
@ -208,9 +212,10 @@ def test_sync_entire_library_movies(clear_sync_device, movies):
def test_sync_entire_library_tvshows(clear_sync_device, tvshows):
new_item = tvshows.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
tvshows._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
section_content = tvshows.searchEpisodes()
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=tvshows._server)
assert len(section_content) == len(media_list)
assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list]
@ -218,9 +223,10 @@ def test_sync_entire_library_tvshows(clear_sync_device, tvshows):
def test_sync_entire_library_music(clear_sync_device, music):
new_item = music.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
music._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
section_content = music.searchTracks()
media_list = item.getMedia()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=music._server)
assert len(section_content) == len(media_list)
assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list]
@ -228,9 +234,11 @@ def test_sync_entire_library_music(clear_sync_device, music):
def test_sync_entire_library_photos(clear_sync_device, photos):
new_item = photos.sync(PHOTO_QUALITY_MEDIUM, client=clear_sync_device)
photos._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
section_content = photos.all()
media_list = item.getMedia()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
# It's not that easy, to just get all the photos within the library, so let`s query for photos with device!=0x0
section_content = photos.search(libtype='photo', **{'device!': '0x0'})
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=photos._server)
assert len(section_content) == len(media_list)
assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list]
@ -240,8 +248,9 @@ def test_playlist_movie_sync(plex, clear_sync_device, movies):
playlist = plex.createPlaylist('Sync: Movies', items)
new_item = playlist.sync(videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
playlist._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
media_list = item.getMedia()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server)
assert len(items) == len(media_list)
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
playlist.delete()
@ -252,8 +261,9 @@ def test_playlist_tvshow_sync(plex, clear_sync_device, show):
playlist = plex.createPlaylist('Sync: TV Show', items)
new_item = playlist.sync(videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
playlist._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
media_list = item.getMedia()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server)
assert len(items) == len(media_list)
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
playlist.delete()
@ -264,8 +274,9 @@ def test_playlist_mixed_sync(plex, clear_sync_device, movie, episode):
playlist = plex.createPlaylist('Sync: Mixed', items)
new_item = playlist.sync(videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
playlist._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
media_list = item.getMedia()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server)
assert len(items) == len(media_list)
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
playlist.delete()
@ -276,22 +287,22 @@ def test_playlist_music_sync(plex, clear_sync_device, artist):
playlist = plex.createPlaylist('Sync: Music', items)
new_item = playlist.sync(audioBitrate=AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
playlist._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
media_list = item.getMedia()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server)
assert len(items) == len(media_list)
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
playlist.delete()
def test_playlist_photos_sync(plex, clear_sync_device, photos):
items = photos.all()
if not hasattr(items[0], 'librarySectionID'):
pytest.skip('Photos are not ready for individual synchronization yet')
def test_playlist_photos_sync(plex, clear_sync_device, photoalbum):
items = photoalbum.photos()
playlist = plex.createPlaylist('Sync: Photos', items)
new_item = playlist.sync(photoResolution=PHOTO_QUALITY_MEDIUM, client=clear_sync_device)
playlist._server.refreshSync()
item = ensure_sync_item(clear_sync_device, new_item)
media_list = item.getMedia()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server)
assert len(items) == len(media_list)
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
playlist.delete()

View file

@ -14,7 +14,7 @@ def test_video_Movie_attributeerror(movie):
movie.asshat
def test_video_ne(movies):
assert len(movies.fetchItems('/library/sections/7/all', title__ne='Sintel')) == 3
assert len(movies.fetchItems('/library/sections/1/all', title__ne='Sintel')) == 3
def test_video_Movie_delete(movie, patched_http_call):
@ -33,10 +33,10 @@ def test_video_Movie_addCollection(movie):
assert labelname not in [tag.tag for tag in movie.collections if tag]
def test_video_Movie_getStreamURL(movie):
def test_video_Movie_getStreamURL(movie, account):
key = movie.ratingKey
assert movie.getStreamURL() == '{0}/video/:/transcode/universal/start.m3u8?X-Plex-Platform=Chrome&copyts=1&mediaIndex=0&offset=0&path=%2Flibrary%2Fmetadata%2F{1}&X-Plex-Token={2}'.format(utils.SERVER_BASEURL, key, utils.SERVER_TOKEN) # noqa
assert movie.getStreamURL(videoResolution='800x600') == '{0}/video/:/transcode/universal/start.m3u8?X-Plex-Platform=Chrome&copyts=1&mediaIndex=0&offset=0&path=%2Flibrary%2Fmetadata%2F{1}&videoResolution=800x600&X-Plex-Token={2}'.format(utils.SERVER_BASEURL, key, utils.SERVER_TOKEN) # noqa
assert movie.getStreamURL() == '{0}/video/:/transcode/universal/start.m3u8?X-Plex-Platform=Chrome&copyts=1&mediaIndex=0&offset=0&path=%2Flibrary%2Fmetadata%2F{1}&X-Plex-Token={2}'.format(utils.SERVER_BASEURL, key, account.authenticationToken) # noqa
assert movie.getStreamURL(videoResolution='800x600') == '{0}/video/:/transcode/universal/start.m3u8?X-Plex-Platform=Chrome&copyts=1&mediaIndex=0&offset=0&path=%2Flibrary%2Fmetadata%2F{1}&videoResolution=800x600&X-Plex-Token={2}'.format(utils.SERVER_BASEURL, key, account.authenticationToken) # noqa
def test_video_Movie_isFullObject_and_reload(plex):
@ -118,7 +118,7 @@ def test_video_Movie_attrs(movies):
assert float(movie.rating) >= 6.4
#assert movie.ratingImage == 'rottentomatoes://image.rating.ripe'
assert movie.ratingKey >= 1
assert sorted([i.tag for i in movie.roles])[:4] == ['Aladdin Ullah', 'Annette Hanshaw', 'Aseem Chhabra', 'Debargo Sanyal'] # noqa
assert sorted([i.tag for i in movie.roles])[:4] == ['Aladdin Ullah', 'Annette Hanshaw', 'Aseem Chhabra', 'Bhavana Nagulapally'] # noqa
assert movie._server._baseurl == utils.SERVER_BASEURL
assert movie.sessionKey is None
assert movie.studio == 'Nina Paley'
@ -151,9 +151,9 @@ def test_video_Movie_attrs(movies):
assert audio.id >= 1
assert audio.index == 1
assert utils.is_metadata(audio._initpath)
assert audio.language == 'English'
assert audio.languageCode == 'eng'
assert audio.samplingRate == 48000
assert audio.language is None
assert audio.languageCode is None
assert audio.samplingRate == 44100
assert audio.selected is True
assert audio._server._baseurl == utils.SERVER_BASEURL
assert audio.streamType == 2
@ -178,13 +178,13 @@ def test_video_Movie_attrs(movies):
assert utils.is_int(media.width, gte=200)
# Video
video = movie.media[0].parts[0].videoStreams()[0]
assert video.bitDepth == 8
assert video.bitDepth in (8, None) # Different versions of Plex Server return different values
assert utils.is_int(video.bitrate)
assert video.cabac is None
assert video.chromaSubsampling == '4:2:0'
assert video.chromaSubsampling in ('4:2:0', None)
assert video.codec in utils.CODECS
assert video.codecID is None
assert video.colorSpace == 'bt709'
assert video.colorSpace is None
assert video.duration is None
assert utils.is_float(video.frameRate, gte=20.0)
assert video.frameRateMode is None
@ -193,12 +193,12 @@ def test_video_Movie_attrs(movies):
assert utils.is_int(video.id)
assert utils.is_int(video.index, gte=0)
assert utils.is_metadata(video._initpath)
assert video.language == 'English'
assert video.languageCode == 'eng'
assert video.language is None
assert video.languageCode is None
assert utils.is_int(video.level)
assert video.profile in utils.PROFILES
assert utils.is_int(video.refFrames)
assert video.scanType is None
assert video.scanType in ('progressive', None)
assert video.selected is False
assert video._server._baseurl == utils.SERVER_BASEURL
assert utils.is_int(video.streamType)
@ -217,13 +217,13 @@ def test_video_Movie_attrs(movies):
assert utils.is_int(part.size, gte=1000000)
# Stream 1
stream1 = part.streams[0]
assert stream1.bitDepth == 8
assert stream1.bitDepth in (8, None)
assert utils.is_int(stream1.bitrate)
assert stream1.cabac is None
assert stream1.chromaSubsampling == '4:2:0'
assert stream1.chromaSubsampling in ('4:2:0', None)
assert stream1.codec in utils.CODECS
assert stream1.codecID is None
assert stream1.colorSpace == 'bt709'
assert stream1.colorSpace is None
assert stream1.duration is None
assert utils.is_float(stream1.frameRate, gte=20.0)
assert stream1.frameRateMode is None
@ -232,12 +232,12 @@ def test_video_Movie_attrs(movies):
assert utils.is_int(stream1.id)
assert utils.is_int(stream1.index, gte=0)
assert utils.is_metadata(stream1._initpath)
assert stream1.language == 'English'
assert stream1.languageCode == 'eng'
assert stream1.language is None
assert stream1.languageCode is None
assert utils.is_int(stream1.level)
assert stream1.profile in utils.PROFILES
assert utils.is_int(stream1.refFrames)
assert stream1.scanType is None
assert stream1.scanType in ('progressive', None)
assert stream1.selected is False
assert stream1._server._baseurl == utils.SERVER_BASEURL
assert utils.is_int(stream1.streamType)
@ -259,8 +259,8 @@ def test_video_Movie_attrs(movies):
assert utils.is_int(stream2.id)
assert utils.is_int(stream2.index)
assert utils.is_metadata(stream2._initpath)
assert stream2.language == 'English'
assert stream2.languageCode == 'eng'
assert stream2.language is None
assert stream2.languageCode is None
assert utils.is_int(stream2.samplingRate)
assert stream2.selected is True
assert stream2._server._baseurl == utils.SERVER_BASEURL
@ -301,7 +301,7 @@ def test_video_Show_attrs(show):
assert utils.is_int(show.duration, gte=1600000)
assert utils.is_section(show._initpath)
# Check reloading the show loads the full list of genres
assert sorted([i.tag for i in show.genres]) == ['Adventure', 'Drama', 'Fantasy']
assert not {'Adventure', 'Drama'} - {i.tag for i in show.genres}
show.reload()
assert sorted([i.tag for i in show.genres]) == ['Adventure', 'Drama', 'Fantasy']
# So the initkey should have changed because of the reload
@ -316,8 +316,8 @@ def test_video_Show_attrs(show):
assert show.originallyAvailableAt.strftime('%Y-%m-%d') == '2011-04-17'
assert show.rating >= 8.0
assert utils.is_int(show.ratingKey)
assert sorted([i.tag for i in show.roles])[:4] == ['Aidan Gillen', 'Alexander Siddig', 'Alfie Allen', 'Art Parkinson']
assert sorted([i.tag for i in show.actors])[:4] == ['Aidan Gillen', 'Alexander Siddig', 'Alfie Allen', 'Art Parkinson']
assert sorted([i.tag for i in show.roles])[:4] == ['Aidan Gillen', 'Aimee Richardson', 'Alexander Siddig', 'Alfie Allen'] # noqa
assert sorted([i.tag for i in show.actors])[:4] == ['Aidan Gillen', 'Aimee Richardson', 'Alexander Siddig', 'Alfie Allen'] # noqa
assert show._server._baseurl == utils.SERVER_BASEURL
assert show.studio == 'HBO'
assert utils.is_string(show.summary, gte=100)
@ -503,12 +503,12 @@ def test_video_Episode_attrs(episode):
assert utils.is_metadata(part._initpath)
assert len(part.key) >= 10
assert part._server._baseurl == utils.SERVER_BASEURL
assert utils.is_int(part.size, gte=30000000)
assert utils.is_int(part.size, gte=18184197)
def test_video_Season(show):
seasons = show.seasons()
assert len(seasons) >= 1
assert len(seasons) == 2
assert ['Season 1', 'Season 2'] == [s.title for s in seasons[:2]]
assert show.season('Season 1') == seasons[0]

View file

@ -1,6 +1,8 @@
""" The script is used to bootstrap a docker container with Plex and with all the libraries required for testing.
"""
import argparse
import os
import platform
from glob import glob
from shutil import copyfile, rmtree
from subprocess import call
@ -8,15 +10,18 @@ from time import time, sleep
from uuid import uuid4
from requests import codes
from tqdm import tqdm
import plexapi
from plexapi.compat import which, makedirs
from plexapi.exceptions import BadRequest
from plexapi.exceptions import BadRequest, NotFound
from plexapi.myplex import MyPlexAccount
from plexapi.utils import download, SEARCHTYPES
DOCKER_CMD = [
'docker', 'run', '-d',
'--name', 'plex-test-%(image_tag)s',
'--restart', 'on-failure',
'-p', '32400:32400/tcp',
'-p', '3005:3005/tcp',
'-p', '8324:8324/tcp',
@ -46,12 +51,19 @@ def get_claim_token(myplex):
Arguments:
myplex (:class:`~plexapi.myplex.MyPlexAccount`)
"""
response = myplex._session.get('https://plex.tv/api/claim/token.json', headers=myplex._headers(),
timeout=plexapi.TIMEOUT)
if response.status_code not in (200, 201, 204):
codename = codes.get(response.status_code)[0]
retry = 0
status_code = None
while retry < 3 and status_code not in (200, 201, 204):
if retry > 0:
sleep(2)
response = myplex._session.get('https://plex.tv/api/claim/token.json', headers=myplex._headers(),
timeout=plexapi.TIMEOUT)
status_code = response.status_code
retry += 1
if status_code not in (200, 201, 204):
errtext = response.text.replace('\n', ' ')
raise BadRequest('(%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
raise BadRequest('(%s) unable to get status code %s; %s' % (response.status_code, response.url, errtext))
return response.json()['token']
@ -61,12 +73,66 @@ def get_ips():
if i[4][0] not in ('127.0.0.1', '::1') and not i[4][0].startswith('fe80:')]))
# Unfortunately plex ignore hardlinks created on OS X host machine, so we have to copy here
def cp(src, dst):
if platform.system() == 'Darwin':
copyfile(src, dst)
else:
os.link(src, dst)
def create_section(server, section):
processed_media = 0
expected_media_count = section.pop('expected_media_count', 0)
expected_media_type = (section['type'], )
if section['type'] == 'artist':
expected_media_type = ('artist', 'album', 'track')
expected_media_type = tuple(SEARCHTYPES[t] for t in expected_media_type)
def alert_callback(data):
global processed_media
if data['type'] == 'timeline':
for entry in data['TimelineEntry']:
if entry.get('identifier', 'com.plexapp.plugins.library') == 'com.plexapp.plugins.library':
# Missed mediaState means that media was processed (analyzed & thumbnailed)
if 'mediaState' not in entry and entry['type'] in expected_media_type:
# state=5 means record processed, applicable only when metadata source was set
if entry['state'] == 5:
cnt = 1
# Workaround for old Plex versions which not reports individual episodes' progress
if entry['type'] == SEARCHTYPES['show']:
show = server.library.sectionByID(str(entry['sectionID'])).get(entry['title'])
cnt = show.leafCount
bar.update(cnt)
# state=1 means record processed, when no metadata source was set
elif entry['state'] == 1 and entry['type'] == SEARCHTYPES['photo']:
bar.update()
bar = tqdm(desc='Scanning section ' + section['name'], total=expected_media_count)
notifier = server.startAlertListener(alert_callback)
# I don't know how to determinate of plex successfully started, so let's do it in creepy way
success = False
start_time = time()
while not success and (time() - start_time < opts.bootstrap_timeout):
try:
server.library.add(**section)
success = True
except BadRequest as e:
if 'the server is still starting up. Please retry later' in str(e):
sleep(1)
else:
raise
if not success:
print('Something went wrong :(')
exit(1)
while bar.n < bar.total:
if time() - start_time >= opts.bootstrap_timeout:
print('Metadata scan takes too long, probably something went really wrong')
exit(1)
sleep(3)
bar.close()
notifier.stop()
if __name__ == '__main__':
@ -82,6 +148,7 @@ if __name__ == '__main__':
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--username', help='Your Plex username')
parser.add_argument('--password', help='Your Plex password')
parser.add_argument('--token', help='Plex.tv authentication token', default=plexapi.CONFIG.get('auth.server_token'))
parser.add_argument('--timezone', help='Timezone to set inside plex', default='UTC')
parser.add_argument('--destination', help='Local path where to store all the media',
default=os.path.join(os.getcwd(), 'plex'))
@ -101,16 +168,20 @@ if __name__ == '__main__':
action='store_false')
parser.add_argument('--without-photos', help='Do not create Photos section', default=True, dest='with_photos',
action='store_false')
parser.add_argument('--without-album', help='Do not create Photo Album', default=True, dest='with_photo_album',
action='store_false')
parser.add_argument('--show-token', help='Display access token after bootstrap', default=False, action='store_true')
opts = parser.parse_args()
print('I`m going to create a plex instance named %s with advertised ip "%s", be prepared!' % (opts.server_name,
opts.advertise_ip))
if call(['docker', 'pull', 'plexinc/pms-docker:%s' % opts.docker_tag]) != 0:
print('Got an error when executing docker pull!')
exit(1)
account = plexapi.utils.getMyPlexAccount(opts)
if opts.token:
account = MyPlexAccount(token=opts.token)
else:
account = plexapi.utils.getMyPlexAccount(opts)
path = os.path.realpath(os.path.expanduser(opts.destination))
makedirs(os.path.join(path, 'media'), exist_ok=True)
arg_bindings = {
'destination': path,
'hostname': opts.server_name,
@ -146,8 +217,6 @@ if __name__ == '__main__':
print('Ok, I got the server instance, let`s download what you`re missing')
expected_media_count = 0
def get_tvshow_path(name, season, episode):
return os.path.join(tvshows_path, name, 'S%02dE%02d.mp4' % (season, episode))
@ -155,14 +224,14 @@ if __name__ == '__main__':
def get_movie_path(name, year):
return os.path.join(movies_path, '%s (%d).mp4' % (name, year))
media_stub_path = os.path.join(opts.destination, 'media', 'video_stub.mp4')
media_stub_path = os.path.join(path, 'media', 'video_stub.mp4')
if not os.path.isfile(media_stub_path):
download('http://www.mytvtestpatterns.com/mytvtestpatterns/Default/GetFile?p=PhilipsCircleMP4', '',
filename='video_stub.mp4', savepath=os.path.join(opts.destination, 'media'), showstatus=True)
filename='video_stub.mp4', savepath=os.path.join(path, 'media'), showstatus=True)
sections = []
if opts.with_movies:
movies_path = os.path.join(opts.destination, 'media', 'Movies')
movies_path = os.path.join(path, 'media', 'Movies')
makedirs(movies_path, exist_ok=True)
required_movies = {
@ -172,17 +241,18 @@ if __name__ == '__main__':
'Sintel': 2010,
}
expected_media_count = 0
for name, year in required_movies.items():
expected_media_count += 1
if not os.path.isfile(get_movie_path(name, year)):
cp(media_stub_path, get_movie_path(name, year))
copyfile(media_stub_path, get_movie_path(name, year))
print('Finished with movies...')
sections.append(dict(name='Movies', type='movie', location='/data/Movies', agent='com.plexapp.agents.imdb',
scanner='Plex Movie Scanner'))
scanner='Plex Movie Scanner', expected_media_count=expected_media_count))
if opts.with_shows:
tvshows_path = os.path.join(opts.destination, 'media', 'TV-Shows')
tvshows_path = os.path.join(path, 'media', 'TV-Shows')
makedirs(os.path.join(tvshows_path, 'Game of Thrones'), exist_ok=True)
makedirs(os.path.join(tvshows_path, 'The 100'), exist_ok=True)
@ -197,21 +267,23 @@ if __name__ == '__main__':
]
}
expected_media_count = 0
for show_name, seasons in required_tv_shows.items():
for season_id, episodes in enumerate(seasons, start=1):
for episode_id in episodes:
expected_media_count += 1
episode_path = get_tvshow_path(show_name, season_id, episode_id)
if not os.path.isfile(episode_path):
cp(get_movie_path('Sintel', 2010), episode_path)
copyfile(get_movie_path('Sintel', 2010), episode_path)
print('Finished with TV Shows...')
sections.append(dict(name='TV Shows', type='show', location='/data/TV-Shows', agent='com.plexapp.agents.thetvdb',
scanner='Plex Series Scanner'))
scanner='Plex Series Scanner', expected_media_count=expected_media_count))
if opts.with_music:
music_path = os.path.join(opts.destination, 'media', 'Music')
music_path = os.path.join(path, 'media', 'Music')
makedirs(music_path, exist_ok=True)
expected_media_count = 0
artist_dst = os.path.join(music_path, 'Infinite State')
dest_path = os.path.join(artist_dst, 'Unmastered Impulses')
@ -227,103 +299,70 @@ if __name__ == '__main__':
os.rename(os.path.join(artist_dst, 'unmastered-impulses-master', 'mp3'), dest_path)
rmtree(os.path.join(artist_dst, 'unmastered-impulses-master'))
expected_media_count += len(glob(os.path.join(dest_path, '*.mp3')))
expected_media_count += len(glob(os.path.join(dest_path, '*.mp3'))) + 2 # wait for artist & album
artist_dst = os.path.join(music_path, 'Broke For Free')
dest_path = os.path.join(artist_dst, 'Layers')
if not os.path.isdir(dest_path):
zip_path = os.path.join(artist_dst, 'Layers.zip')
if not os.path.isfile(zip_path):
download('https://freemusicarchive.org/music/zip/2803d3e9c9510c17d180b821b43b248e9db83487', '',
download('https://archive.org/compress/Layers-11520/formats=VBR%20MP3&file=/Layers-11520.zip', '',
filename='Layers.zip', savepath=artist_dst, showstatus=True)
makedirs(dest_path, exist_ok=True)
import zipfile
with zipfile.ZipFile(zip_path, 'r') as handle:
handle.extractall(dest_path)
expected_media_count += len(glob(os.path.join(dest_path, '*.mp3')))
expected_media_count += len(glob(os.path.join(dest_path, '*.mp3'))) + 2 # wait for artist & album
print('Finished with Music...')
sections.append(dict(name='Music', type='artist', location='/data/Music', agent='com.plexapp.agents.none',
scanner='Plex Music Scanner'))
sections.append(dict(name='Music', type='artist', location='/data/Music', agent='com.plexapp.agents.lastfm',
scanner='Plex Music Scanner', expected_media_count=expected_media_count))
if opts.with_photos:
photos_path = os.path.join(opts.destination, 'media', 'Photos')
photos_path = os.path.join(path, 'media', 'Photos')
makedirs(photos_path, exist_ok=True)
expected_photo_count = 0
has_photos = len(glob(os.path.join(photos_path, '*.jpg')))
while has_photos < 10:
has_photos += 1
download('https://picsum.photos/800/600/?random', '',
filename='photo%d.jpg' % has_photos, savepath=photos_path)
folders = {
('Cats', ): 3,
('Cats', 'Cats in bed'): 7,
('Cats', 'Cats not in bed'): 1,
('Cats', 'Not cats in bed'): 1,
}
print('Photos collected, but we need to create an album later...')
has_photos = 0
for folder_path, required_cnt in folders.items():
folder_path = os.path.join(photos_path, *folder_path)
photos_in_folder = len(glob(os.path.join(folder_path, '*.jpg')))
while photos_in_folder < required_cnt:
photos_in_folder += 1
download('https://picsum.photos/800/600/?random', '',
filename='photo%d.jpg' % photos_in_folder, savepath=folder_path)
has_photos += photos_in_folder
print('Finished with photos...')
sections.append(dict(name='Photos', type='photo', location='/data/Photos', agent='com.plexapp.agents.none',
scanner='Plex Photo Scanner'))
scanner='Plex Photo Scanner', expected_media_count=has_photos))
if sections:
print('Ok, got the media, it`s time to create a library for you!')
library = server.library
finished = expected_media_count == 0
processed_media = 0
def alert_callback(data):
global finished, processed_media
if data['type'] == 'timeline':
for entry in data['TimelineEntry']:
if entry['identifier'] == 'com.plexapp.plugins.library' and entry['state'] == 5 \
and entry['type'] in (SEARCHTYPES['movie'], SEARCHTYPES['episode'], SEARCHTYPES['track'],
SEARCHTYPES['photo']):
processed_media += 1
if processed_media == expected_media_count:
finished = True
notifier = server.startAlertListener(alert_callback)
first_section = sections.pop(0)
# I don't know how to determinate of plex successfully started, so let's do it in creepy way
success = False
start_time = time()
while not success and (time() - start_time < opts.bootstrap_timeout):
try:
library.add(**first_section)
success = True
except BadRequest as e:
if 'the server is still starting up. Please retry later' in str(e):
sleep(1)
else:
raise
if not success:
print('Something went wrong :(')
exit(1)
for section in sections:
library.add(**section)
create_section(server, section)
print('Sections created, almost done! Please wait while metadata will be collected, it may take a couple '
'minutes...')
album_created = False
start_time = time()
while not finished and not (album_created and opts.with_photos and opts.with_photo_album):
if time() - start_time >= opts.bootstrap_timeout:
print('Metadata scan takes too long, probably something went really wrong')
exit(1)
if not album_created and opts.with_photos and opts.with_photo_album:
photos = library.section('Photos').all()
if len(photos) == has_photos:
server.createPlaylist('photo_album1', photos)
album_created = True
print('Photo album created')
sleep(3)
import logging
logging.basicConfig(level=logging.INFO)
shared_username = os.environ.get('SHARED_USERNAME', 'PKKid')
try:
user = account.user(shared_username)
account.updateFriend(user, server)
print('The server was shared with user "%s"' % shared_username)
except NotFound:
pass
print('Base URL is %s' % server.url('', False))
print('Auth token is %s' % account.authenticationToken)
if opts.show_token:
print('Auth token is %s' % account.authenticationToken)
print('Server %s is ready to use!' % opts.server_name)

View file

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Remove current Plex Server and a Client from MyPlex account. Useful when running tests in CI.
"""
from plexapi.myplex import MyPlexAccount
from plexapi.server import PlexServer
from plexapi import X_PLEX_IDENTIFIER
if __name__ == '__main__':
myplex = MyPlexAccount()
plex = PlexServer(token=myplex.authenticationToken)
for device in plex.myPlexAccount().devices():
if device.clientIdentifier == plex.machineIdentifier:
print('Removing device "%s", with id "%s"' % (device.name, device. clientIdentifier))
device.delete()
# If we suddenly remove the client first we wouldn't be able to authenticate to delete the server
for device in plex.myPlexAccount().devices():
if device.clientIdentifier == X_PLEX_IDENTIFIER:
print('Removing device "%s", with id "%s"' % (device.name, device. clientIdentifier))
device.delete()
break