# -*- coding: utf-8 -*- import time from datetime import datetime from functools import partial from os import environ import plexapi import pytest import requests from plexapi import compat from plexapi.client import PlexClient from plexapi.compat import MagicMock, patch from plexapi.myplex import MyPlexAccount from plexapi.server import PlexServer try: from unittest.mock import patch, MagicMock, mock_open except ImportError: from mock import patch, MagicMock, mock_open SERVER_BASEURL = plexapi.CONFIG.get("auth.server_baseurl") MYPLEX_USERNAME = plexapi.CONFIG.get("auth.myplex_username") MYPLEX_PASSWORD = plexapi.CONFIG.get("auth.myplex_password") CLIENT_BASEURL = plexapi.CONFIG.get("auth.client_baseurl") CLIENT_TOKEN = plexapi.CONFIG.get("auth.client_token") MIN_DATETIME = datetime(1999, 1, 1) REGEX_EMAIL = r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)" REGEX_IPADDR = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$" AUDIOCHANNELS = {2, 6} AUDIOLAYOUTS = {"5.1", "5.1(side)", "stereo"} CODECS = {"aac", "ac3", "dca", "h264", "mp3", "mpeg4"} CONTAINERS = {"avi", "mp4", "mkv"} CONTENTRATINGS = {"TV-14", "TV-MA", "G", "NR", "Not Rated"} FRAMERATES = {"24p", "PAL", "NTSC"} PROFILES = {"advanced simple", "main", "constrained baseline"} RESOLUTIONS = {"sd", "480", "576", "720", "1080"} ENTITLEMENTS = { "ios", "roku", "android", "xbox_one", "xbox_360", "windows", "windows_phone", } TEST_AUTHENTICATED = "authenticated" TEST_ANONYMOUSLY = "anonymously" ANON_PARAM = pytest.param(TEST_ANONYMOUSLY, marks=pytest.mark.anonymous) AUTH_PARAM = pytest.param(TEST_AUTHENTICATED, marks=pytest.mark.authenticated) def pytest_addoption(parser): parser.addoption( "--client", action="store_true", default=False, help="Run client tests." ) def pytest_generate_tests(metafunc): if "plex" in metafunc.fixturenames: if ( "account" in metafunc.fixturenames or TEST_AUTHENTICATED in metafunc.definition.keywords ): metafunc.parametrize("plex", [AUTH_PARAM], indirect=True) else: metafunc.parametrize("plex", [ANON_PARAM, AUTH_PARAM], indirect=True) elif "account" in metafunc.fixturenames: metafunc.parametrize("account", [AUTH_PARAM], indirect=True) def pytest_runtest_setup(item): if "client" in item.keywords and not item.config.getvalue("client"): return pytest.skip("Need --client option to run.") if TEST_AUTHENTICATED in item.keywords and not ( MYPLEX_USERNAME and MYPLEX_PASSWORD ): return pytest.skip( "You have to specify MYPLEX_USERNAME and MYPLEX_PASSWORD to run authenticated tests" ) if TEST_ANONYMOUSLY in item.keywords and MYPLEX_USERNAME and MYPLEX_PASSWORD: return pytest.skip( "Anonymous tests should be ran on unclaimed server, without providing MYPLEX_USERNAME and " "MYPLEX_PASSWORD" ) # --------------------------------- # Fixtures # --------------------------------- def get_account(): return MyPlexAccount() @pytest.fixture(scope="session") def account(): assert MYPLEX_USERNAME, "Required MYPLEX_USERNAME not specified." assert MYPLEX_PASSWORD, "Required MYPLEX_PASSWORD not specified." return get_account() @pytest.fixture(scope="session") def account_once(account): if environ.get("TEST_ACCOUNT_ONCE") != "1" and environ.get("CI") == "true": pytest.skip("Do not forget to test this by providing TEST_ACCOUNT_ONCE=1") return account @pytest.fixture(scope="session") def account_plexpass(account): if not account.subscriptionActive: pytest.skip( "PlexPass subscription is not active, unable to test sync-stuff, be careful!" ) return account @pytest.fixture(scope="session") def account_synctarget(account_plexpass): assert "sync-target" in plexapi.X_PLEX_PROVIDES, ( "You have to set env var " "PLEXAPI_HEADER_PROVIDES=sync-target,controller" ) assert "sync-target" in plexapi.BASE_HEADERS["X-Plex-Provides"] assert ( "iOS" == plexapi.X_PLEX_PLATFORM ), "You have to set env var PLEXAPI_HEADER_PLATFORM=iOS" assert ( "11.4.1" == plexapi.X_PLEX_PLATFORM_VERSION ), "You have to set env var PLEXAPI_HEADER_PLATFORM_VERSION=11.4.1" assert ( "iPhone" == plexapi.X_PLEX_DEVICE ), "You have to set env var PLEXAPI_HEADER_DEVICE=iPhone" return account_plexpass @pytest.fixture(scope="session") def plex(request): assert SERVER_BASEURL, "Required SERVER_BASEURL not specified." session = requests.Session() if request.param == TEST_AUTHENTICATED: token = get_account().authenticationToken else: token = None return PlexServer(SERVER_BASEURL, token, session=session) @pytest.fixture() def device(account): d = None for device in account.devices(): if device.clientIdentifier == plexapi.X_PLEX_IDENTIFIER: d = device break assert d return d @pytest.fixture() def clear_sync_device(device, account_synctarget, plex): sync_items = account_synctarget.syncItems(clientId=device.clientIdentifier) for item in sync_items.items: item.delete() plex.refreshSync() return device @pytest.fixture def fresh_plex(): return PlexServer @pytest.fixture() def plex2(plex): return plex() @pytest.fixture() def client(request, plex): return PlexClient(plex, baseurl=CLIENT_BASEURL, token=CLIENT_TOKEN) @pytest.fixture() def tvshows(plex): return plex.library.section("TV Shows") @pytest.fixture() def movies(plex): return plex.library.section("Movies") @pytest.fixture() def music(plex): return plex.library.section("Music") @pytest.fixture() def photos(plex): return plex.library.section("Photos") @pytest.fixture() def movie(movies): return movies.get("Elephants Dream") @pytest.fixture() def collection(plex): try: return plex.library.section("Movies").collection()[0] except IndexError: movie = plex.library.section("Movies").get("Elephants Dream") movie.addCollection(["marvel"]) n = plex.library.section("Movies").reload() return n.collection()[0] @pytest.fixture() def artist(music): return music.get("Infinite State") @pytest.fixture() def album(artist): return artist.album("Unmastered Impulses") @pytest.fixture() def track(album): return album.track("Holy Moment") @pytest.fixture() def show(tvshows): return tvshows.get("Game of Thrones") @pytest.fixture() def episode(show): return show.get("Winter Is Coming") @pytest.fixture() def photoalbum(photos): try: return photos.get("Cats") except Exception: return photos.get("photo_album1") @pytest.fixture() def subtitle(): mopen = mock_open() with patch("__main__.open", mopen): with open("subtitle.srt", "w") as handler: handler.write("test") return handler @pytest.fixture() def shared_username(account): username = environ.get("SHARED_USERNAME", "PKKid") for user in account.users(): if user.title.lower() == username.lower(): return username elif ( user.username and user.email and user.id and username.lower() in (user.username.lower(), user.email.lower(), str(user.id)) ): return username pytest.skip("Shared user %s wasn`t found in your MyPlex account" % username) @pytest.fixture() def monkeydownload(request, monkeypatch): monkeypatch.setattr( "plexapi.utils.download", partial(plexapi.utils.download, mocked=True) ) yield monkeypatch.undo() def callable_http_patch(): """This intented to stop some http requests inside some tests.""" return patch( "plexapi.server.requests.sessions.Session.send", return_value=MagicMock(status_code=200, text=""), ) @pytest.fixture() def empty_response(mocker): response = mocker.MagicMock(status_code=200, text="") return response @pytest.fixture() def patched_http_call(mocker): """This will stop any http calls inside any test.""" return mocker.patch( "plexapi.server.requests.sessions.Session.send", return_value=MagicMock(status_code=200, text=""), ) # --------------------------------- # Utility Functions # --------------------------------- def is_datetime(value): return value > MIN_DATETIME def is_int(value, gte=1): return int(value) >= gte def is_float(value, gte=1.0): return float(value) >= gte def is_metadata(key, prefix="/library/metadata/", contains="", suffix=""): try: assert key.startswith(prefix) assert contains in key assert key.endswith(suffix) return True except AssertionError: return False def is_part(key): return is_metadata(key, prefix="/library/parts/") def is_section(key): return is_metadata(key, prefix="/library/sections/") def is_string(value, gte=1): return isinstance(value, compat.string_type) and len(value) >= gte def is_thumb(key): return is_metadata(key, contains="/thumb/") def wait_until(condition_function, delay=0.25, timeout=1, *args, **kwargs): start = time.time() ready = condition_function(*args, **kwargs) retries = 1 while not ready and time.time() - start < timeout: retries += 1 time.sleep(delay) ready = condition_function(*args, **kwargs) assert ready, "Wait timeout after %d retries, %.2f seconds" % ( retries, time.time() - start, ) return ready