naiveproxy/build/android/play_services/update_test.py
2018-08-14 22:19:20 +00:00

392 lines
13 KiB
Python
Executable File

#!/usr/bin/env python
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
'''Unittests for update.py.
They set up a temporary directory that is used to mock a bucket, the directory
containing the configuration files and the android sdk directory.
Tests run the script with various inputs and check the status of the filesystem
'''
import contextlib
import logging
import os
import shutil
import sys
import tempfile
import unittest
import zipfile
sys.path.append(os.path.join(os.path.dirname(__file__), os.pardir))
from play_services import update
import devil_chromium # pylint: disable=import-error,unused-import
from devil.utils import cmd_helper
class TestFunctions(unittest.TestCase):
DEFAULT_CONFIG_VERSION = '1.2.3'
DEFAULT_LICENSE = 'Default License'
DEFAULT_ZIP_SHA1 = 'zip0and0filling0to0forty0chars0000000000'
def __init__(self, *args, **kwargs):
super(TestFunctions, self).__init__(*args, **kwargs)
self.paths = None # Initialized in SetUpWorkdir
self.workdir = None # Initialized in setUp
# Uncomment for debug logs.
# logging.basicConfig(level=logging.DEBUG)
#override
def setUp(self):
self.workdir = tempfile.mkdtemp()
#override
def tearDown(self):
shutil.rmtree(self.workdir)
self.workdir = None
def testUpload(self):
version = '2.3.4'
self.SetUpWorkdir(
gms_lib=True,
config_version=version,
source_prop=True)
status = update.main([
'upload',
'--dry-run',
'--bucket', self.paths.bucket,
'--config', self.paths.config_file,
'--sdk-root', self.paths.gms.sdk_root
])
self.assertEqual(status, 0, 'the command should have succeeded.')
# bucket should contain license, name = content from LICENSE.sha1
self.assertTrue(os.path.isfile(self.paths.config_license_sha1))
license_sha1 = _GetFileContent(self.paths.config_license_sha1)
bucket_license = os.path.join(self.paths.bucket, version, license_sha1)
self.assertTrue(os.path.isfile(bucket_license))
self.assertEqual(_GetFileContent(bucket_license), self.DEFAULT_LICENSE)
# bucket should contain zip, name = content from zip.sha1
self.assertTrue(os.path.isfile(self.paths.config_zip_sha1))
bucket_zip = os.path.join(self.paths.bucket, str(version),
_GetFileContent(self.paths.config_zip_sha1))
self.assertTrue(os.path.isfile(bucket_zip))
# unzip, should contain expected files
with zipfile.ZipFile(bucket_zip, "r") as bucket_zip_file:
self.assertEqual(bucket_zip_file.namelist(),
['com/google/android/gms/client/2.3.4/client-2.3.4.aar'])
def testDownload(self):
self.SetUpWorkdir(populate_bucket=True)
with _MockedInput('y'):
status = update.main([
'download',
'--dry-run',
'--bucket', self.paths.bucket,
'--config', self.paths.config_file,
'--sdk-root', self.paths.gms.sdk_root,
])
self.assertEqual(status, 0, 'the command should have succeeded.')
# sdk_root should contain zip contents, zip sha1, license
self.assertTrue(os.path.isfile(self.paths.gms.client_paths[0]))
self.assertTrue(os.path.isfile(self.paths.gms.lib_zip_sha1))
self.assertTrue(os.path.isfile(self.paths.gms.license))
self.assertEquals(_GetFileContent(self.paths.gms.license),
self.DEFAULT_LICENSE)
def testDownloadBot(self):
self.SetUpWorkdir(populate_bucket=True, bot_env=True)
# No need to type 'y' on bots
status = update.main([
'download',
'--dry-run',
'--bucket', self.paths.bucket,
'--config', self.paths.config_file,
'--sdk-root', self.paths.gms.sdk_root,
])
self.assertEqual(status, 0, 'the command should have succeeded.')
# sdk_root should contain zip contents, zip sha1, license
self.assertTrue(os.path.isfile(self.paths.gms.client_paths[0]))
self.assertTrue(os.path.isfile(self.paths.gms.lib_zip_sha1))
self.assertTrue(os.path.isfile(self.paths.gms.license))
self.assertEquals(_GetFileContent(self.paths.gms.license),
self.DEFAULT_LICENSE)
def testDownloadAlreadyUpToDate(self):
self.SetUpWorkdir(
populate_bucket=True,
existing_zip_sha1=self.DEFAULT_ZIP_SHA1)
status = update.main([
'download',
'--dry-run',
'--bucket', self.paths.bucket,
'--config', self.paths.config_file,
'--sdk-root', self.paths.gms.sdk_root,
])
self.assertEqual(status, 0, 'the command should have succeeded.')
# there should not be new files downloaded to sdk_root
self.assertFalse(os.path.isfile(os.path.join(self.paths.gms.client_paths[0],
'dummy_file')))
self.assertFalse(os.path.isfile(self.paths.gms.license))
def testDownloadAcceptedLicense(self):
self.SetUpWorkdir(
populate_bucket=True,
existing_license=self.DEFAULT_LICENSE)
# License already accepted, no need to type
status = update.main([
'download',
'--dry-run',
'--bucket', self.paths.bucket,
'--config', self.paths.config_file,
'--sdk-root', self.paths.gms.sdk_root,
])
self.assertEqual(status, 0, 'the command should have succeeded.')
# sdk_root should contain zip contents, zip sha1, license
self.assertTrue(os.path.isfile(self.paths.gms.client_paths[0]))
self.assertTrue(os.path.isfile(self.paths.gms.lib_zip_sha1))
self.assertTrue(os.path.isfile(self.paths.gms.license))
self.assertEquals(_GetFileContent(self.paths.gms.license),
self.DEFAULT_LICENSE)
def testDownloadNewLicense(self):
self.SetUpWorkdir(
populate_bucket=True,
existing_license='Old license')
with _MockedInput('y'):
status = update.main([
'download',
'--dry-run',
'--bucket', self.paths.bucket,
'--config', self.paths.config_file,
'--sdk-root', self.paths.gms.sdk_root,
])
self.assertEqual(status, 0, 'the command should have succeeded.')
# sdk_root should contain zip contents, zip sha1, NEW license
self.assertTrue(os.path.isfile(self.paths.gms.client_paths[0]))
self.assertTrue(os.path.isfile(self.paths.gms.lib_zip_sha1))
self.assertTrue(os.path.isfile(self.paths.gms.license))
self.assertEquals(_GetFileContent(self.paths.gms.license),
self.DEFAULT_LICENSE)
def testDownloadRefusedLicense(self):
self.SetUpWorkdir(
populate_bucket=True,
existing_license='Old license')
with _MockedInput('n'):
status = update.main([
'download',
'--dry-run',
'--bucket', self.paths.bucket,
'--config', self.paths.config_file,
'--sdk-root', self.paths.gms.sdk_root,
])
self.assertEqual(status, 0, 'the command should have succeeded.')
# there should not be new files downloaded to sdk_root
self.assertFalse(os.path.isfile(os.path.join(self.paths.gms.client_paths[0],
'dummy_file')))
self.assertEquals(_GetFileContent(self.paths.gms.license),
'Old license')
def testDownloadNoAndroidSDK(self):
self.SetUpWorkdir(
populate_bucket=True,
existing_license='Old license')
non_existing_sdk_root = os.path.join(self.workdir, 'non_existing_sdk_root')
# Should not run, no typing needed
status = update.main([
'download',
'--dry-run',
'--bucket', self.paths.bucket,
'--config', self.paths.config_file,
'--sdk-root', non_existing_sdk_root,
])
self.assertEqual(status, 0, 'the command should have succeeded.')
self.assertFalse(os.path.isdir(non_existing_sdk_root))
def SetUpWorkdir(self,
bot_env=False,
config_version=DEFAULT_CONFIG_VERSION,
existing_license=None,
existing_zip_sha1=None,
gms_lib=False,
populate_bucket=False,
source_prop=None):
'''Prepares workdir by putting it in the specified state
Args:
- general
bot_env: sets or unsets CHROME_HEADLESS
- bucket
populate_bucket: boolean. Populate the bucket with a zip and license
file. The sha1s will be copied to the config directory
- config
config_version: number. Version of the current SDK. Defaults to
`self.DEFAULT_CONFIG_VERSION`
- sdk_root
existing_license: string. Create a LICENSE file setting the specified
text as content of the currently accepted license.
existing_zip_sha1: string. Create a sha1 file setting the specified
hash as hash of the SDK supposed to be installed
gms_lib: boolean. Create a dummy file in the location of the play
services SDK.
source_prop: boolean. Create a source.properties file that contains
the license to upload.
'''
client_name = 'client'
self.paths = Paths(self.workdir, config_version, [client_name])
# Create the main directories
_MakeDirs(self.paths.gms.sdk_root)
_MakeDirs(self.paths.config_dir)
_MakeDirs(self.paths.bucket)
# is not configured via argument.
update.SHA1_DIRECTORY = self.paths.config_dir
os.environ['CHROME_HEADLESS'] = '1' if bot_env else ''
if config_version:
_MakeDirs(os.path.dirname(self.paths.config_file))
with open(self.paths.config_file, 'w') as stream:
stream.write(('{"clients": ["%s"],'
'"version_number": "%s"}'
'\n') % (client_name, config_version))
if existing_license:
_MakeDirs(self.paths.gms.package)
with open(self.paths.gms.license, 'w') as stream:
stream.write(existing_license)
if existing_zip_sha1:
_MakeDirs(self.paths.gms.package)
with open(self.paths.gms.lib_zip_sha1, 'w') as stream:
stream.write(existing_zip_sha1)
if gms_lib:
_MakeDirs(os.path.dirname(self.paths.gms.client_paths[0]))
with open(self.paths.gms.client_paths[0], 'w') as stream:
stream.write('foo\n')
if source_prop:
_MakeDirs(os.path.dirname(self.paths.gms.source_prop))
with open(self.paths.gms.source_prop, 'w') as stream:
stream.write('Foo=Bar\n'
'Pkg.License=%s\n'
'Baz=Fizz\n' % self.DEFAULT_LICENSE)
if populate_bucket:
_MakeDirs(self.paths.config_dir)
bucket_dir = os.path.join(self.paths.bucket, str(config_version))
_MakeDirs(bucket_dir)
# TODO(dgn) should we use real sha1s? comparison with the real sha1 is
# done but does not do anything other than displaying a message.
config_license_sha1 = 'license0and0filling0to0forty0chars000000'
with open(self.paths.config_license_sha1, 'w') as stream:
stream.write(config_license_sha1)
with open(os.path.join(bucket_dir, config_license_sha1), 'w') as stream:
stream.write(self.DEFAULT_LICENSE)
config_zip_sha1 = self.DEFAULT_ZIP_SHA1
with open(self.paths.config_zip_sha1, 'w') as stream:
stream.write(config_zip_sha1)
pre_zip_client = os.path.join(
self.workdir,
'pre_zip_lib',
os.path.relpath(self.paths.gms.client_paths[0],
self.paths.gms.package))
pre_zip_lib = os.path.dirname(pre_zip_client)
post_zip_lib = os.path.join(bucket_dir, config_zip_sha1)
print(pre_zip_lib, post_zip_lib)
_MakeDirs(pre_zip_lib)
with open(pre_zip_client, 'w') as stream:
stream.write('foo\n')
# pylint: disable=protected-access
update._ZipLibrary(post_zip_lib, [pre_zip_client], os.path.join(
self.workdir, 'pre_zip_lib'))
if logging.getLogger().isEnabledFor(logging.DEBUG):
cmd_helper.Call(['tree', self.workdir])
class Paths(object):
'''Declaration of the paths commonly manipulated in the tests.'''
def __init__(self, workdir, version, clients):
self.bucket = os.path.join(workdir, 'bucket')
self.config_dir = os.path.join(workdir, 'config')
self.config_file = os.path.join(self.config_dir, 'config.json')
self.config_license_sha1 = os.path.join(self.config_dir, 'LICENSE.sha1')
self.config_zip_sha1 = os.path.join(
self.config_dir,
'google_play_services_library.zip.sha1')
self.gms = update.PlayServicesPaths(os.path.join(workdir, 'sdk_root'),
version, clients)
def _GetFileContent(file_path):
with open(file_path, 'r') as stream:
return stream.read()
def _MakeDirs(path):
'''Avoids having to do the error handling everywhere.'''
if not os.path.exists(path):
os.makedirs(path)
@contextlib.contextmanager
def _MockedInput(typed_string):
'''Makes raw_input return |typed_string| while inside the context.'''
try:
if isinstance(__builtins__, dict):
original_raw_input = __builtins__['raw_input']
__builtins__['raw_input'] = lambda _: typed_string
else:
original_raw_input = __builtins__.raw_input
__builtins__.raw_input = lambda _: typed_string
yield
finally:
if isinstance(__builtins__, dict):
__builtins__['raw_input'] = original_raw_input
else:
__builtins__.raw_input = original_raw_input
if __name__ == '__main__':
unittest.main()