mirror of
https://github.com/crystalidea/qt6windows7.git
synced 2024-11-26 22:27:06 +08:00
332 lines
11 KiB
Python
332 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright (C) 2021 The Qt Company Ltd.
|
|
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
|
|
|
|
import argparse
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
|
|
from selenium.webdriver.support import expected_conditions
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium import webdriver
|
|
from pathlib import Path
|
|
import typing
|
|
import http.server
|
|
import subprocess
|
|
import threading
|
|
import psutil
|
|
import re
|
|
import os
|
|
from signal import SIGINT
|
|
|
|
import sys
|
|
|
|
|
|
class StdoutOutputSink(object):
|
|
def __init__(self):
|
|
pass
|
|
|
|
def write(self, data: str):
|
|
print(data)
|
|
|
|
def __enter__(self):
|
|
pass
|
|
|
|
def __exit__(self, _, __, ___):
|
|
pass
|
|
|
|
|
|
class FileOutputSink(object):
|
|
def __init__(self, filename: str):
|
|
self.__filename = filename
|
|
self.__file = None
|
|
|
|
def write(self, data: str):
|
|
self.__file.write(data)
|
|
|
|
def __enter__(self):
|
|
self.__file = open(self.__filename, 'w')
|
|
|
|
def __exit__(self, _, __, ___):
|
|
self.__file.close()
|
|
|
|
|
|
class OutputMulticast(object):
|
|
def __init__(self, destinations: typing.List[str]):
|
|
self.__sinks: typing.List[typing.Union[StdoutOutputSink, FileOutputSink]] = [
|
|
]
|
|
self.__destinations = [
|
|
'stdout'] if destinations is None else destinations
|
|
number_of_stdout_sinks = sum(
|
|
[1 if destination == 'stdout' else 0 for destination in self.__destinations])
|
|
if number_of_stdout_sinks > 1:
|
|
raise Exception('Maximum allowed number of stdout sinks is 1')
|
|
|
|
def write(self, data: str):
|
|
for sink in self.__sinks:
|
|
sink.write(data)
|
|
|
|
def _makeSink(self, destination: str):
|
|
return StdoutOutputSink() if 'stdout' == destination else FileOutputSink(destination)
|
|
|
|
def __enter__(self):
|
|
for destination in self.__destinations:
|
|
sink = self._makeSink(destination)
|
|
sink.__enter__()
|
|
self.__sinks.append(sink)
|
|
return self
|
|
|
|
def __exit__(self, _, __, ___):
|
|
for sink in reversed(self.__sinks):
|
|
sink.__exit__(_, __, ___)
|
|
|
|
|
|
class WasmTestRunner:
|
|
def __init__(self, args: dict):
|
|
self.server_process = None
|
|
self.browser_process = None
|
|
self.python_path = Path(sys.executable)
|
|
self.script_dir = Path(os.path.dirname(os.path.realpath(__file__)))
|
|
self.host = 'localhost'
|
|
self.webserver = None
|
|
self.webthread = None
|
|
|
|
paths = ['html_path', 'browser_path', 'chromedriver_path', 'tmp_dir']
|
|
|
|
for key, value in args.items():
|
|
if value is None:
|
|
continue
|
|
if key in paths:
|
|
value = Path(value)
|
|
value.resolve()
|
|
setattr(self, key, value)
|
|
|
|
if not self.html_path.exists():
|
|
raise FileNotFoundError(self.html_path)
|
|
|
|
self.webroot = self.html_path.parent
|
|
|
|
if hasattr(self, 'browser_path') and not self.browser_path.exists():
|
|
raise FileNotFoundError(self.browser_path)
|
|
|
|
def run(self):
|
|
self.run_threaded_webserver()
|
|
|
|
with OutputMulticast(
|
|
self.output if hasattr(self, 'output') else ['stdout']) as output_multicast:
|
|
try:
|
|
if self.use_browser:
|
|
return self.run_wasm_browser()
|
|
else:
|
|
return self.run_wasm_webdriver(output_multicast)
|
|
finally:
|
|
self.cleanup()
|
|
|
|
def run_webserver(self):
|
|
webroot = self.html_path.parent.resolve()
|
|
self.server_process =\
|
|
subprocess.Popen([
|
|
str(self.python_path),
|
|
'-m', 'http.server',
|
|
'--directory', str(webroot),
|
|
self.port
|
|
])
|
|
|
|
def run_threaded_webserver(self):
|
|
self.webserver = http.server.ThreadingHTTPServer(
|
|
(self.host, int(self.port)), self.get_http_handler_class())
|
|
|
|
self.webthread = threading.Thread(target=self.webserver.serve_forever)
|
|
self.webthread.start()
|
|
|
|
def shutdown_threaded_webserver(self):
|
|
if self.webserver is not None:
|
|
self.webserver.shutdown()
|
|
if self.webthread is not None:
|
|
self.webthread.join()
|
|
|
|
def run_wasm_webdriver(self, output_multicast: OutputMulticast):
|
|
url = f'http://localhost:{self.port}/{self.html_path.name}'
|
|
if (self.batched_test is not None):
|
|
url = f'{url}?qtestname={self.batched_test}&qtestoutputformat={self.format}'
|
|
|
|
d = DesiredCapabilities.CHROME
|
|
d['goog:loggingPrefs'] = {'browser': 'ALL'}
|
|
ser = Service(executable_path=self.chromedriver_path)
|
|
driver = webdriver.Chrome(desired_capabilities=d, service=ser)
|
|
driver.get(url)
|
|
driver.execute_script(
|
|
""" const status = qtTestRunner.status;
|
|
const onFinished = status => {
|
|
if (status === 'Completed' || status === 'Error')
|
|
document.title = 'qtFinished';
|
|
};
|
|
onFinished(status);
|
|
qtTestRunner.onStatusChanged.addEventListener(onFinished);
|
|
""")
|
|
|
|
WebDriverWait(driver, self.timeout).until(
|
|
expected_conditions.title_is('qtFinished'))
|
|
|
|
runner_status = driver.execute_script(f"return qtTestRunner.status")
|
|
if runner_status == 'Error':
|
|
output_multicast.write(driver.execute_script(
|
|
"return qtTestRunner.errorDetails"))
|
|
return -1
|
|
else:
|
|
assert runner_status == 'Completed'
|
|
output_multicast.write(driver.execute_script(
|
|
f"return qtTestRunner.results.get('{self.batched_test}').textOutput"))
|
|
return driver.execute_script(
|
|
f"return qtTestRunner.results.get('{self.batched_test}').exitCode")
|
|
|
|
def run_wasm_browser(self):
|
|
if not hasattr(self, 'browser_path'):
|
|
print('Error: browser path must be set to run with browser')
|
|
return
|
|
|
|
if not hasattr(self, 'tmp_dir'):
|
|
print('Error: tmp_dir must be set to run with browser')
|
|
return
|
|
|
|
self.create_tmp_dir()
|
|
self.browser_process =\
|
|
subprocess.Popen([
|
|
str(self.browser_path),
|
|
'--user-data-dir=' + str(self.tmp_dir),
|
|
'--enable-logging=stderr',
|
|
f'http://localhost:{self.port}/{self.html_path.name}'
|
|
],
|
|
stderr=subprocess.PIPE
|
|
)
|
|
|
|
# Only capture the console content
|
|
regex = re.compile(r'[^"]*CONSOLE[^"]*"(.*)"[.\w]*')
|
|
|
|
for line in self.browser_process.stderr:
|
|
str_line = line.decode('utf-8')
|
|
|
|
match = regex.match(str_line)
|
|
|
|
# Error condition, this should have matched
|
|
if 'CONSOLE' in str_line and match is None:
|
|
print('Error: did not match console line:')
|
|
print(str_line)
|
|
|
|
if match is not None:
|
|
console_line = match.group(1)
|
|
print(console_line)
|
|
|
|
if 'Finished testing' in str_line:
|
|
self.browser_process.kill()
|
|
break
|
|
|
|
@staticmethod
|
|
def get_loader_variable(driver, varname: str):
|
|
return driver.execute_script('return qtLoader.' + varname)
|
|
|
|
def create_tmp_dir(self):
|
|
if not self.tmp_dir.exists():
|
|
self.tmp_dir.mkdir()
|
|
|
|
if not self.tmp_dir.is_dir():
|
|
raise NotADirectoryError(self.tmp_dir)
|
|
|
|
# Needed to bypass the "Welcome to Chrome" prompt
|
|
first_run = Path(self.tmp_dir, 'First Run')
|
|
first_run.touch()
|
|
|
|
def get_http_handler_class(self):
|
|
wtr = self
|
|
|
|
class OriginIsolationHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
|
|
def __init__(self, request, client_address, server):
|
|
super().__init__(request, client_address, server, directory=wtr.webroot)
|
|
|
|
# Headers required to enable SharedArrayBuffer
|
|
# See https://web.dev/cross-origin-isolation-guide/
|
|
def end_headers(self):
|
|
self.send_header("Cross-Origin-Opener-Policy", "same-origin")
|
|
self.send_header(
|
|
"Cross-Origin-Embedder-Policy", "require-corp")
|
|
self.send_header(
|
|
"Cross-Origin-Resource-Policy", "cross-origin")
|
|
http.server.SimpleHTTPRequestHandler.end_headers(self)
|
|
|
|
# We usually don't care that much about what the webserver is logging
|
|
def log_message(self, format_, *args):
|
|
return
|
|
|
|
return OriginIsolationHTTPRequestHandler
|
|
|
|
def cleanup(self):
|
|
if self.browser_process is not None:
|
|
self.browser_process.kill()
|
|
if self.server_process is not None:
|
|
self.server_process.kill()
|
|
self.shutdown_threaded_webserver()
|
|
|
|
|
|
class BackendProcess:
|
|
def __init__(self) -> None:
|
|
self.__process = subprocess.Popen(
|
|
[sys.executable, *sys.argv, '--backend'], shell=False, stdout=subprocess.PIPE)
|
|
|
|
def abort(self):
|
|
current_process = psutil.Process(self.__process.pid)
|
|
children = current_process.children(recursive=True)
|
|
for child in [*children, current_process]:
|
|
os.kill(child.pid, SIGINT)
|
|
|
|
def communicate(self, timeout):
|
|
return self.__process.communicate(timeout)[0].decode('utf-8')
|
|
|
|
def returncode(self):
|
|
return self.__process.returncode
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='WASM testrunner')
|
|
parser.add_argument('html_path', help='Path to the HTML file to request')
|
|
parser.add_argument(
|
|
'--batched_test', help='Specifies a batched test to run')
|
|
parser.add_argument('--timeout', help='Test timeout',
|
|
type=int, default=120)
|
|
parser.add_argument(
|
|
'--port', help='Port to run the webserver on', default='8000')
|
|
parser.add_argument('--use_browser', action='store_true')
|
|
parser.add_argument('--browser_path', help='Path to the browser to use')
|
|
parser.add_argument('--chromedriver_path', help='Absolute path to chromedriver',
|
|
default='chromedriver')
|
|
parser.add_argument('--tmp_dir', help='Path to the tmpdir to use when using a browser',
|
|
default='/tmp/wasm-testrunner')
|
|
parser.add_argument(
|
|
'-o', help='filename. Filename may be "stdout" to write to stdout.',
|
|
action='append', dest='output')
|
|
parser.add_argument(
|
|
'--format', help='Output format', choices=['txt', 'xml', 'lightxml', 'junitxml', 'tap'],
|
|
default='txt')
|
|
parser.add_argument(
|
|
'--backend', help='Run as a backend process. There are two types of test runner processes - '
|
|
'the main monitoring process and the backend processes launched by it. The tests are '
|
|
'run on the backend to avoid any undesired behavior, like deadlocks in browser main process, '
|
|
'spilling over across test cases.',
|
|
action='store_true')
|
|
|
|
args = parser.parse_args()
|
|
if not args.backend:
|
|
backend_process = BackendProcess()
|
|
try:
|
|
stdout = backend_process.communicate(args.timeout)
|
|
print(stdout)
|
|
return backend_process.returncode()
|
|
except Exception as e:
|
|
print(f"Exception while executing test {e}")
|
|
backend_process.abort()
|
|
return -1
|
|
|
|
return WasmTestRunner(vars(args)).run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|