run tests over unclaimed server

This commit is contained in:
Andrey Yantsen 2018-09-15 09:42:42 +01:00
parent 8f7e315f6f
commit 06742e87e8
5 changed files with 94 additions and 60 deletions

View file

@ -1,9 +1,7 @@
language: python
stages:
- syntax
- name: test
if: type != pull_request
- test
- name: deploy
if: tag = present
@ -28,8 +26,10 @@ before_install:
- pip install --upgrade pytest pytest-cov coveralls
install:
- pip install -r requirements_dev.txt
- PYTHONPATH="$PWD:$PYTHONPATH" python -u tools/plex-bootstraptest.py --destination plex --advertise-ip=127.0.0.1
--bootstrap-timeout 540 --docker-tag $PLEX_CONTAINER_TAG
- '[ -z "${PLEXAPI_AUTH_MYPLEX_USERNAME}" ] && PYTHONPATH="$PWD:$PYTHONPATH" python -u tools/plex-bootstraptest.py
--destination plex --advertise-ip=127.0.0.1 --bootstrap-timeout 540 --docker-tag $PLEX_CONTAINER_TAG --unclaimed ||
PYTHONPATH="$PWD:$PYTHONPATH" python -u tools/plex-bootstraptest.py --destination plex --advertise-ip=127.0.0.1
--bootstrap-timeout 540 --docker-tag $PLEX_CONTAINER_TAG'
script:
- py.test tests -rxXs --ignore=tests/test_sync.py --tb=native --verbose --cov-config .coveragerc --cov=plexapi
@ -41,18 +41,16 @@ after_success:
- coveralls
after_script:
- PYTHONPATH="$PWD:$PYTHONPATH" python -u tools/plex-teardowntest.py
- '[ -z "${PLEXAPI_AUTH_MYPLEX_USERNAME}" ] || PYTHONPATH="$PWD:$PYTHONPATH" python -u tools/plex-teardowntest.py'
jobs:
include:
- stage: syntax
python: 3.6
- python: 3.6
name: "Flake8"
install:
- pip install -r requirements_dev.txt
script: flake8 plexapi --exclude=compat.py --max-line-length=120 --ignore=E128,E701,E702,E731,W293
after_success: true
after_script: true
after_success: skip
env:
- PLEX_CONTAINER_TAG=latest
- stage: test
@ -60,6 +58,12 @@ jobs:
env:
- PLEX_CONTAINER_TAG=1.3.2.3112-1751929
- TEST_ACCOUNT_ONCE=1
- stage: test
python: 3.6
if: type != 'pull_request' # pull requests always run over unclaimed server
after_success: skip
env:
- PLEX_CONTAINER_TAG=latest PLEXAPI_AUTH_MYPLEX_USERNAME=
- stage: deploy
name: "Deploy to PyPi"
python: 3.6

View file

@ -42,25 +42,52 @@ PROFILES = {'advanced simple', 'main', 'constrained baseline'}
RESOLUTIONS = {'sd', '480', '576', '720', '1080'}
ENTITLEMENTS = {'ios', 'cpms', 'roku', 'android', 'xbox_one', 'xbox_360', 'windows', 'windows_phone'}
TEST_AUTHENTICATED = 'authenticated'
TEST_ANONYMOUSLY = 'anonymously'
ANON_PARAM = pytest.param(TEST_ANONYMOUSLY, marks=pytest.mark.anonymous)
AUTH_PARAM = pytest.param(TEST_AUTHENTICATED, marks=pytest.mark.authenticated)
def pytest_addoption(parser):
parser.addoption('--client', action='store_true', default=False, help='Run client tests.')
def pytest_generate_tests(metafunc):
if 'plex' in metafunc.fixturenames:
if 'account' in metafunc.fixturenames or TEST_AUTHENTICATED in metafunc.definition.keywords:
metafunc.parametrize('plex', [AUTH_PARAM], indirect=True)
else:
metafunc.parametrize('plex', [ANON_PARAM, AUTH_PARAM], indirect=True)
elif 'account' in metafunc.fixturenames:
metafunc.parametrize('account', [AUTH_PARAM], indirect=True)
def pytest_runtest_setup(item):
if 'client' in item.keywords and not item.config.getvalue('client'):
return pytest.skip('Need --client option to run.')
if TEST_AUTHENTICATED in item.keywords and not (MYPLEX_USERNAME and MYPLEX_PASSWORD):
return pytest.skip('You have to specify MYPLEX_USERNAME and MYPLEX_PASSWORD to run authenticated tests')
if TEST_ANONYMOUSLY in item.keywords and MYPLEX_USERNAME and MYPLEX_PASSWORD:
return pytest.skip('Anonymous tests should be ran on unclaimed server, without providing MYPLEX_USERNAME and '
'MYPLEX_PASSWORD')
# ---------------------------------
# Fixtures
# ---------------------------------
def get_account():
return MyPlexAccount()
@pytest.fixture(scope='session')
def account():
assert MYPLEX_USERNAME, 'Required MYPLEX_USERNAME not specified.'
assert MYPLEX_PASSWORD, 'Required MYPLEX_PASSWORD not specified.'
return MyPlexAccount()
return get_account()
@pytest.fixture(scope='session')
@ -89,10 +116,14 @@ def account_synctarget(account_plexpass):
@pytest.fixture(scope='session')
def plex(account):
def plex(request):
assert SERVER_BASEURL, 'Required SERVER_BASEURL not specified.'
session = requests.Session()
return PlexServer(SERVER_BASEURL, account.authenticationToken, session=session)
if request.param == TEST_AUTHENTICATED:
token = get_account().authenticationToken
else:
token = None
return PlexServer(SERVER_BASEURL, token, session=session)
@pytest.fixture()

View file

@ -146,7 +146,7 @@ def test_library_MovieSection_onDeck(movie, movies, tvshows, episode):
movie.updateProgress(movie.duration * 1000 / 10) # set progress to 10%
assert movies.onDeck()
movie.markUnwatched()
episode.markWatched()
episode.updateProgress(episode.duration * 1000 / 10)
assert tvshows.onDeck()
episode.markUnwatched()

View file

@ -121,10 +121,18 @@ def test_server_history(plex, movie):
movie.markUnwatched()
@pytest.mark.anonymously
def test_server_Server_query(plex):
assert plex.query('/')
with pytest.raises(BadRequest):
assert plex.query('/asdf/1234/asdf', headers={'random_headers': '1234'})
@pytest.mark.authenticated
def test_server_Server_query_authenticated(plex):
assert plex.query('/')
with pytest.raises(BadRequest):
assert plex.query('/asdf/1234/asdf', headers={'random_headers': '1234'})
with pytest.raises(BadRequest):
# This is really requests.exceptions.HTTPError
# 401 Client Error: Unauthorized for url
@ -142,6 +150,7 @@ def test_server_Server_session(account):
assert hasattr(plex._session, 'plexapi_session_test')
@pytest.mark.authenticated
def test_server_token_in_headers(plex):
headers = plex._headers()
assert 'X-Plex-Token' in headers
@ -223,6 +232,7 @@ def test_server_clients(plex):
assert client.version == '2.12.5'
@pytest.mark.authenticated
def test_server_account(plex):
account = plex.account()
assert account.authToken

View file

@ -16,11 +16,12 @@ import plexapi
from plexapi.compat import which, makedirs
from plexapi.exceptions import BadRequest, NotFound
from plexapi.myplex import MyPlexAccount
from plexapi.server import PlexServer
from plexapi.utils import download, SEARCHTYPES
DOCKER_CMD = [
'docker', 'run', '-d',
'--name', 'plex-test-%(image_tag)s',
'--name', 'plex-test-%(container_name_extra)s%(image_tag)s',
'--restart', 'on-failure',
'-p', '32400:32400/tcp',
'-p', '3005:3005/tcp',
@ -43,30 +44,6 @@ DOCKER_CMD = [
]
def get_claim_token(myplex):
"""
Returns a str, a new "claim-token", which you can use to register your new Plex Server instance to your account
See: https://hub.docker.com/r/plexinc/pms-docker/, https://www.plex.tv/claim/
Arguments:
myplex (:class:`~plexapi.myplex.MyPlexAccount`)
"""
retry = 0
status_code = None
while retry < 3 and status_code not in (200, 201, 204):
if retry > 0:
sleep(2)
response = myplex._session.get('https://plex.tv/api/claim/token.json', headers=myplex._headers(),
timeout=plexapi.TIMEOUT)
status_code = response.status_code
retry += 1
if status_code not in (200, 201, 204):
errtext = response.text.replace('\n', ' ')
raise BadRequest('(%s) unable to get status code %s; %s' % (response.status_code, response.url, errtext))
return response.json()['token']
def get_ips():
import socket
return list(set([i[4][0] for i in socket.getaddrinfo(socket.gethostname(), None)
@ -146,9 +123,15 @@ if __name__ == '__main__':
default_ip = available_ips[0]
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--username', help='Your Plex username')
parser.add_argument('--password', help='Your Plex password')
parser.add_argument('--token', help='Plex.tv authentication token', default=plexapi.CONFIG.get('auth.server_token'))
mg = parser.add_mutually_exclusive_group()
g = mg.add_argument_group()
g.add_argument('--username', help='Your Plex username')
g.add_argument('--password', help='Your Plex password')
mg.add_argument('--token', help='Plex.tv authentication token', default=plexapi.CONFIG.get('auth.server_token'))
mg.add_argument('--unclaimed', help='Do not claim the server', default=False, action='store_true')
parser.add_argument('--timezone', help='Timezone to set inside plex', default='UTC')
parser.add_argument('--destination', help='Local path where to store all the media',
default=os.path.join(os.getcwd(), 'plex'))
@ -176,33 +159,40 @@ if __name__ == '__main__':
print('Got an error when executing docker pull!')
exit(1)
if opts.token:
account = MyPlexAccount(token=opts.token)
else:
account = plexapi.utils.getMyPlexAccount(opts)
account = None
if not opts.unclaimed:
if opts.token:
account = MyPlexAccount(token=opts.token)
else:
account = plexapi.utils.getMyPlexAccount(opts)
path = os.path.realpath(os.path.expanduser(opts.destination))
makedirs(os.path.join(path, 'media'), exist_ok=True)
arg_bindings = {
'destination': path,
'hostname': opts.server_name,
'claim_token': get_claim_token(account),
'claim_token': account.claimToken() if account else '',
'timezone': opts.timezone,
'advertise_ip': opts.advertise_ip,
'image_tag': opts.docker_tag,
'container_name_extra': '' if account else 'unclaimed-'
}
docker_cmd = [c % arg_bindings for c in DOCKER_CMD]
exit_code = call(docker_cmd)
if exit_code != 0:
exit(exit_code)
print('Let`s wait while the instance register in your plex account...')
print('Let`s wait while the instance boots...')
start_time = time()
server = None
while not server and (time() - start_time < opts.bootstrap_timeout):
try:
device = account.device(opts.server_name)
server = device.connect()
if account:
device = account.device(opts.server_name)
server = device.connect()
else:
server = PlexServer('http://%s:32400' % opts.advertise_ip)
if opts.accept_eula:
server.settings.get('acceptedEULA').set(True)
server.settings.save()
@ -351,18 +341,17 @@ if __name__ == '__main__':
for section in sections:
create_section(server, section)
import logging
logging.basicConfig(level=logging.INFO)
shared_username = os.environ.get('SHARED_USERNAME', 'PKKid')
try:
user = account.user(shared_username)
account.updateFriend(user, server)
print('The server was shared with user "%s"' % shared_username)
except NotFound:
pass
if account:
shared_username = os.environ.get('SHARED_USERNAME', 'PKKid')
try:
user = account.user(shared_username)
account.updateFriend(user, server)
print('The server was shared with user "%s"' % shared_username)
except NotFound:
pass
print('Base URL is %s' % server.url('', False))
if opts.show_token:
if account and opts.show_token:
print('Auth token is %s' % account.authenticationToken)
print('Server %s is ready to use!' % opts.server_name)