From 752c3769874596d012cd8325099d2ae20123f989 Mon Sep 17 00:00:00 2001 From: Simon Glass Date: Sun, 9 Feb 2025 09:07:14 -0700 Subject: test/py: Shorten u_boot_console This fixture name is quite long and results in lots of verbose code. We know this is U-Boot so the 'u_boot_' part is not necessary. But it is also a bit of a misnomer, since it provides access to all the information available to tests. It is not just the console. It would be too confusing to use con as it would be confused with config and it is probably too short. So shorten it to 'ubman'. Signed-off-by: Simon Glass Link: https://lore.kernel.org/u-boot/CAFLszTgPa4aT_J9h9pqeTtLCVn4x2JvLWRcWRD8NaN3uoSAtyA@mail.gmail.com/ --- doc/develop/py_testing.rst | 12 ++++++------ doc/develop/tests_writing.rst | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) (limited to 'doc/develop') diff --git a/doc/develop/py_testing.rst b/doc/develop/py_testing.rst index b50473039be..b88d7e3c8d4 100644 --- a/doc/develop/py_testing.rst +++ b/doc/develop/py_testing.rst @@ -506,24 +506,24 @@ Writing tests Please refer to the pytest documentation for details of writing pytest tests. Details specific to the U-Boot test suite are described below. -A test fixture named `u_boot_console` should be used by each test function. This +A test fixture named `ubman` should be used by each test function. This provides the means to interact with the U-Boot console, and retrieve board and environment configuration information. -The function `u_boot_console.run_command()` executes a shell command on the +The function `ubman.run_command()` executes a shell command on the U-Boot console, and returns all output from that command. This allows validation or interpretation of the command output. This function validates that certain strings are not seen on the U-Boot console. These include shell error messages and the U-Boot sign-on message (in order to detect unexpected board resets). See the source of `u_boot_console_base.py` for a complete list of "bad" strings. Some test scenarios are expected to trigger these strings. Use -`u_boot_console.disable_check()` to temporarily disable checking for specific +`ubman.disable_check()` to temporarily disable checking for specific strings. See `test_unknown_cmd.py` for an example. Board- and board-environment configuration values may be accessed as sub-fields -of the `u_boot_console.config` object, for example -`u_boot_console.config.ram_base`. +of the `ubman.config` object, for example +`ubman.config.ram_base`. Build configuration values (from `.config`) may be accessed via the dictionary -`u_boot_console.config.buildconfig`, with keys equal to the Kconfig variable +`ubman.config.buildconfig`, with keys equal to the Kconfig variable names. diff --git a/doc/develop/tests_writing.rst b/doc/develop/tests_writing.rst index 5f3c43d5da2..d5917fe674c 100644 --- a/doc/develop/tests_writing.rst +++ b/doc/develop/tests_writing.rst @@ -116,19 +116,19 @@ below are approximate, as measured on an AMD 2950X system. Here is is the test in Python:: @pytest.mark.buildconfigspec('cmd_memory') - def test_md(u_boot_console): + def test_md(ubman): """Test that md reads memory as expected, and that memory can be modified using the mw command.""" - ram_base = u_boot_utils.find_ram_base(u_boot_console) + ram_base = u_boot_utils.find_ram_base(ubman) addr = '%08x' % ram_base val = 'a5f09876' expected_response = addr + ': ' + val - u_boot_console.run_command('mw ' + addr + ' 0 10') - response = u_boot_console.run_command('md ' + addr + ' 10') + ubman.run_command('mw ' + addr + ' 0 10') + response = ubman.run_command('md ' + addr + ' 10') assert(not (expected_response in response)) - u_boot_console.run_command('mw ' + addr + ' ' + val) - response = u_boot_console.run_command('md ' + addr + ' 10') + ubman.run_command('mw ' + addr + ' ' + val) + response = ubman.run_command('md ' + addr + ' 10') assert(expected_response in response) This runs a few commands and checks the output. Note that it runs a command, -- cgit v1.3.1 From d9ed4b75add4b4ccc37cf32b54cd9c77f48e3396 Mon Sep 17 00:00:00 2001 From: Simon Glass Date: Sun, 9 Feb 2025 09:07:15 -0700 Subject: test/py: Drop u_boot_ prefix on test files We know this is U-Boot so the prefix serves no purpose other than to make things longer and harder to read. Drop it and rename the files. Signed-off-by: Simon Glass Reviewed-by: Mattijs Korpershoek # test_android / test_dfu --- doc/develop/py_testing.rst | 4 +- doc/develop/tests_writing.rst | 2 +- test/py/conftest.py | 52 +-- test/py/console_base.py | 613 ++++++++++++++++++++++++++++ test/py/console_board.py | 85 ++++ test/py/console_sandbox.py | 119 ++++++ test/py/spawn.py | 345 ++++++++++++++++ test/py/tests/test_android/test_ab.py | 12 +- test/py/tests/test_android/test_abootimg.py | 10 +- test/py/tests/test_android/test_avb.py | 2 +- test/py/tests/test_dfu.py | 18 +- test/py/tests/test_efi_fit.py | 2 +- test/py/tests/test_efi_loader.py | 4 +- test/py/tests/test_env.py | 24 +- test/py/tests/test_event_dump.py | 2 +- test/py/tests/test_extension.py | 2 +- test/py/tests/test_fit.py | 2 +- test/py/tests/test_fit_auto_signed.py | 2 +- test/py/tests/test_fit_ecdsa.py | 2 +- test/py/tests/test_fit_hashes.py | 2 +- test/py/tests/test_fpga.py | 8 +- test/py/tests/test_fs/conftest.py | 2 +- test/py/tests/test_gpio.py | 2 +- test/py/tests/test_gpt.py | 14 +- test/py/tests/test_kconfig.py | 2 +- test/py/tests/test_md.py | 6 +- test/py/tests/test_mmc.py | 8 +- test/py/tests/test_mmc_rd.py | 4 +- test/py/tests/test_mmc_wr.py | 4 +- test/py/tests/test_net.py | 6 +- test/py/tests/test_net_boot.py | 4 +- test/py/tests/test_of_migrate.py | 2 +- test/py/tests/test_ofplatdata.py | 2 +- test/py/tests/test_optee_rpmb.py | 2 +- test/py/tests/test_pinmux.py | 2 +- test/py/tests/test_pstore.py | 2 +- test/py/tests/test_sandbox_opts.py | 2 +- test/py/tests/test_scp03.py | 2 +- test/py/tests/test_sf.py | 12 +- test/py/tests/test_source.py | 2 +- test/py/tests/test_spi.py | 14 +- test/py/tests/test_tpm2.py | 10 +- test/py/tests/test_trace.py | 2 +- test/py/tests/test_ums.py | 21 +- test/py/tests/test_upl.py | 2 +- test/py/tests/test_usb.py | 12 +- test/py/tests/test_ut.py | 86 ++-- test/py/tests/test_vbe_vpl.py | 10 +- test/py/tests/test_vboot.py | 2 +- test/py/tests/test_zynq_secure.py | 12 +- test/py/tests/test_zynqmp_secure.py | 8 +- test/py/u_boot_console_base.py | 613 ---------------------------- test/py/u_boot_console_exec_attach.py | 85 ---- test/py/u_boot_console_sandbox.py | 119 ------ test/py/u_boot_spawn.py | 345 ---------------- test/py/u_boot_utils.py | 382 ----------------- test/py/utils.py | 382 +++++++++++++++++ 57 files changed, 1748 insertions(+), 1749 deletions(-) create mode 100644 test/py/console_base.py create mode 100644 test/py/console_board.py create mode 100644 test/py/console_sandbox.py create mode 100644 test/py/spawn.py delete mode 100644 test/py/u_boot_console_base.py delete mode 100644 test/py/u_boot_console_exec_attach.py delete mode 100644 test/py/u_boot_console_sandbox.py delete mode 100644 test/py/u_boot_spawn.py delete mode 100644 test/py/u_boot_utils.py create mode 100644 test/py/utils.py (limited to 'doc/develop') diff --git a/doc/develop/py_testing.rst b/doc/develop/py_testing.rst index b88d7e3c8d4..40a85380343 100644 --- a/doc/develop/py_testing.rst +++ b/doc/develop/py_testing.rst @@ -125,7 +125,7 @@ browser, but may be read directly as plain text, perhaps with the aid of the If sandbox crashes (e.g. with a segfault) you will see message like this:: - test/py/u_boot_spawn.py:171: in expect + test/py/spawn.py:171: in expect c = os.read(self.fd, 1024).decode(errors='replace') E ValueError: U-Boot exited with signal 11 (Signals.SIGSEGV) @@ -515,7 +515,7 @@ U-Boot console, and returns all output from that command. This allows validation or interpretation of the command output. This function validates that certain strings are not seen on the U-Boot console. These include shell error messages and the U-Boot sign-on message (in order to detect unexpected -board resets). See the source of `u_boot_console_base.py` for a complete list of +board resets). See the source of `console_base.py` for a complete list of "bad" strings. Some test scenarios are expected to trigger these strings. Use `ubman.disable_check()` to temporarily disable checking for specific strings. See `test_unknown_cmd.py` for an example. diff --git a/doc/develop/tests_writing.rst b/doc/develop/tests_writing.rst index d5917fe674c..7ea17081def 100644 --- a/doc/develop/tests_writing.rst +++ b/doc/develop/tests_writing.rst @@ -120,7 +120,7 @@ in Python:: """Test that md reads memory as expected, and that memory can be modified using the mw command.""" - ram_base = u_boot_utils.find_ram_base(ubman) + ram_base = utils.find_ram_base(ubman) addr = '%08x' % ram_base val = 'a5f09876' expected_response = addr + ': ' + val diff --git a/test/py/conftest.py b/test/py/conftest.py index 863f56c1152..8d0e786ee5c 100644 --- a/test/py/conftest.py +++ b/test/py/conftest.py @@ -7,7 +7,7 @@ # test, at shutdown etc. These hooks perform functions such as: # - Parsing custom command-line options. # - Pullilng in user-specified board configuration. -# - Creating the U-Boot console test fixture. +# - Creating the ubman test fixture. # - Creating the HTML log file. # - Monitoring each test's results. # - Implementing custom pytest markers. @@ -25,12 +25,12 @@ import re from _pytest.runner import runtestprotocol import subprocess import sys +from spawn import BootFail, Timeout, Unexpected, handle_exception import time -from u_boot_spawn import BootFail, Timeout, Unexpected, handle_exception -# Globals: The HTML log file, and the connection to the U-Boot console. +# Globals: The HTML log file, and the top-level fixture log = None -console = None +ubman_fix = None TEST_PY_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -247,7 +247,7 @@ def pytest_configure(config): ubconfig.buildconfig.update(parser.items('root')) global log - global console + global ubman_fix global ubconfig (board_type, board_type_extra, board_identity, build_dir, build_dir_extra, @@ -343,11 +343,11 @@ def pytest_configure(config): os.environ['U_BOOT_' + v.upper()] = getattr(ubconfig, v) if board_type.startswith('sandbox'): - import u_boot_console_sandbox - console = u_boot_console_sandbox.ConsoleSandbox(log, ubconfig) + import console_sandbox + ubman_fix = console_sandbox.ConsoleSandbox(log, ubconfig) else: - import u_boot_console_exec_attach - console = u_boot_console_exec_attach.ConsoleExecAttach(log, ubconfig) + import console_board + ubman_fix = console_board.ConsoleExecAttach(log, ubconfig) def generate_ut_subtest(metafunc, fixture_name, sym_path): @@ -366,7 +366,7 @@ def generate_ut_subtest(metafunc, fixture_name, sym_path): Returns: Nothing. """ - fn = console.config.build_dir + sym_path + fn = ubman_fix.config.build_dir + sym_path try: with open(fn, 'rt') as f: lines = f.readlines() @@ -407,8 +407,8 @@ def generate_config(metafunc, fixture_name): """ subconfigs = { - 'brd': console.config.brd, - 'env': console.config.env, + 'brd': ubman_fix.config.brd, + 'env': ubman_fix.config.env, } parts = fixture_name.split('__') if len(parts) < 2: @@ -470,7 +470,7 @@ def u_boot_log(request): The fixture value. """ - return console.log + return ubman_fix.log @pytest.fixture(scope='session') def u_boot_config(request): @@ -483,7 +483,7 @@ def u_boot_config(request): The fixture value. """ - return console.config + return ubman_fix.config @pytest.fixture(scope='function') def ubman(request): @@ -499,18 +499,18 @@ def ubman(request): pytest.skip('Cannot get target connection') return None try: - console.ensure_spawned() + ubman_fix.ensure_spawned() except OSError as err: - handle_exception(ubconfig, console, log, err, 'Lab failure', True) + handle_exception(ubconfig, ubman_fix, log, err, 'Lab failure', True) except Timeout as err: - handle_exception(ubconfig, console, log, err, 'Lab timeout', True) + handle_exception(ubconfig, ubman_fix, log, err, 'Lab timeout', True) except BootFail as err: - handle_exception(ubconfig, console, log, err, 'Boot fail', True, - console.get_spawn_output()) + handle_exception(ubconfig, ubman_fix, log, err, 'Boot fail', True, + ubman.get_spawn_output()) except Unexpected: - handle_exception(ubconfig, console, log, err, 'Unexpected test output', + handle_exception(ubconfig, ubman_fix, log, err, 'Unexpected test output', False) - return console + return ubman_fix anchors = {} tests_not_run = [] @@ -623,8 +623,8 @@ def cleanup(): Nothing. """ - if console: - console.close() + if ubman_fix: + ubman_fix.close() if log: with log.section('Status Report', 'status_report'): log.status_pass('%d passed' % len(tests_passed)) @@ -845,7 +845,7 @@ def pytest_runtest_protocol(item, nextitem): test_durations[item.name] = duration if failure_cleanup: - console.drain_console() + ubman_fix.drain_console() test_list.append(item.name) tests_not_run.remove(item.name) @@ -855,7 +855,7 @@ def pytest_runtest_protocol(item, nextitem): except: # If something went wrong with logging, it's better to let the test # process continue, which may report other exceptions that triggered - # the logging issue (e.g. console.log wasn't created). Hence, just + # the logging issue (e.g. ubman_fix.log wasn't created). Hence, just # squash the exception. If the test setup failed due to e.g. syntax # error somewhere else, this won't be seen. However, once that issue # is fixed, if this exception still exists, it will then be logged as @@ -868,6 +868,6 @@ def pytest_runtest_protocol(item, nextitem): log.end_section(item.name) if failure_cleanup: - console.cleanup_spawn() + ubman_fix.cleanup_spawn() return True diff --git a/test/py/console_base.py b/test/py/console_base.py new file mode 100644 index 00000000000..260df773bac --- /dev/null +++ b/test/py/console_base.py @@ -0,0 +1,613 @@ +# SPDX-License-Identifier: GPL-2.0 +# Copyright (c) 2015 Stephen Warren +# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. + +# Common logic to interact with U-Boot via the console. This class provides +# the interface that tests use to execute U-Boot shell commands and wait for +# their results. Sub-classes exist to perform board-type-specific setup +# operations, such as spawning a sub-process for Sandbox, or attaching to the +# serial console of real hardware. + +import multiplexed_log +import os +import pytest +import re +import sys +import spawn +from spawn import BootFail, Timeout, Unexpected, handle_exception + +# Regexes for text we expect U-Boot to send to the console. +pattern_u_boot_spl_signon = re.compile('(U-Boot SPL \\d{4}\\.\\d{2}[^\r\n]*\\))') +pattern_u_boot_main_signon = re.compile('(U-Boot \\d{4}\\.\\d{2}[^\r\n]*\\))') +pattern_stop_autoboot_prompt = re.compile('Hit any key to stop autoboot: ') +pattern_unknown_command = re.compile('Unknown command \'.*\' - try \'help\'') +pattern_error_notification = re.compile('## Error: ') +pattern_error_please_reset = re.compile('### ERROR ### Please RESET the board ###') +pattern_ready_prompt = re.compile('{lab ready in (.*)s: (.*)}') +pattern_lab_mode = re.compile('{lab mode.*}') + +PAT_ID = 0 +PAT_RE = 1 + +# Timeout before expecting the console to be ready (in milliseconds) +TIMEOUT_MS = 30000 # Standard timeout +TIMEOUT_CMD_MS = 10000 # Command-echo timeout + +# Timeout for board preparation in lab mode. This needs to be enough to build +# U-Boot, write it to the board and then boot the board. Since this process is +# under the control of another program (e.g. Labgrid), it will failure sooner +# if something goes way. So use a very long timeout here to cover all possible +# situations. +TIMEOUT_PREPARE_MS = 3 * 60 * 1000 + +bad_pattern_defs = ( + ('spl_signon', pattern_u_boot_spl_signon), + ('main_signon', pattern_u_boot_main_signon), + ('stop_autoboot_prompt', pattern_stop_autoboot_prompt), + ('unknown_command', pattern_unknown_command), + ('error_notification', pattern_error_notification), + ('error_please_reset', pattern_error_please_reset), +) + +class ConsoleDisableCheck(object): + """Context manager (for Python's with statement) that temporarily disables + the specified console output error check. This is useful when deliberately + executing a command that is known to trigger one of the error checks, in + order to test that the error condition is actually raised. This class is + used internally by ConsoleBase::disable_check(); it is not intended for + direct usage.""" + + def __init__(self, console, check_type): + self.console = console + self.check_type = check_type + + def __enter__(self): + self.console.disable_check_count[self.check_type] += 1 + self.console.eval_bad_patterns() + + def __exit__(self, extype, value, traceback): + self.console.disable_check_count[self.check_type] -= 1 + self.console.eval_bad_patterns() + +class ConsoleEnableCheck(object): + """Context manager (for Python's with statement) that temporarily enables + the specified console output error check. This is useful when executing a + command that might raise an extra bad pattern, beyond the default bad + patterns, in order to validate that the extra bad pattern is actually + detected. This class is used internally by ConsoleBase::enable_check(); it + is not intended for direct usage.""" + + def __init__(self, console, check_type, check_pattern): + self.console = console + self.check_type = check_type + self.check_pattern = check_pattern + + def __enter__(self): + global bad_pattern_defs + self.default_bad_patterns = bad_pattern_defs + bad_pattern_defs += ((self.check_type, self.check_pattern),) + self.console.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs} + self.console.eval_bad_patterns() + + def __exit__(self, extype, value, traceback): + global bad_pattern_defs + bad_pattern_defs = self.default_bad_patterns + self.console.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs} + self.console.eval_bad_patterns() + +class ConsoleSetupTimeout(object): + """Context manager (for Python's with statement) that temporarily sets up + timeout for specific command. This is useful when execution time is greater + then default 30s.""" + + def __init__(self, console, timeout): + self.p = console.p + self.orig_timeout = self.p.timeout + self.p.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, extype, value, traceback): + self.p.timeout = self.orig_timeout + +class ConsoleBase(object): + """The interface through which test functions interact with the U-Boot + console. This primarily involves executing shell commands, capturing their + results, and checking for common error conditions. Some common utilities + are also provided too.""" + + def __init__(self, log, config, max_fifo_fill): + """Initialize a U-Boot console connection. + + Can only usefully be called by sub-classes. + + Args: + log: A multiplexed_log.Logfile object, to which the U-Boot output + will be logged. + config: A configuration data structure, as built by conftest.py. + max_fifo_fill: The maximum number of characters to send to U-Boot + command-line before waiting for U-Boot to echo the characters + back. For UART-based HW without HW flow control, this value + should be set less than the UART RX FIFO size to avoid + overflow, assuming that U-Boot can't keep up with full-rate + traffic at the baud rate. + + Returns: + Nothing. + """ + + self.log = log + self.config = config + self.max_fifo_fill = max_fifo_fill + + self.logstream = self.log.get_stream('console', sys.stdout) + + # Array slice removes leading/trailing quotes + self.prompt = self.config.buildconfig['config_sys_prompt'][1:-1] + self.prompt_compiled = re.compile('^' + re.escape(self.prompt), re.MULTILINE) + self.p = None + self.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs} + self.eval_bad_patterns() + + self.at_prompt = False + self.at_prompt_logevt = None + self.lab_mode = False + + def get_spawn(self): + # This is not called, ssubclass must define this. + # Return a value to avoid: + # console_base.py:348:12: E1128: Assigning result of a function + # call, where the function returns None (assignment-from-none) + return spawn.Spawn([]) + + + def eval_bad_patterns(self): + self.bad_patterns = [pat[PAT_RE] for pat in bad_pattern_defs \ + if self.disable_check_count[pat[PAT_ID]] == 0] + self.bad_pattern_ids = [pat[PAT_ID] for pat in bad_pattern_defs \ + if self.disable_check_count[pat[PAT_ID]] == 0] + + def close(self): + """Terminate the connection to the U-Boot console. + + This function is only useful once all interaction with U-Boot is + complete. Once this function is called, data cannot be sent to or + received from U-Boot. + + Args: + None. + + Returns: + Nothing. + """ + + if self.p: + self.log.start_section('Stopping U-Boot') + close_type = self.p.close() + self.log.info(f'Close type: {close_type}') + self.log.end_section('Stopping U-Boot') + self.logstream.close() + + def set_lab_mode(self): + """Select lab mode + + This tells us that we will get a 'lab ready' message when the board is + ready for use. We don't need to look for signon messages. + """ + self.log.info(f'test.py: Lab mode is active') + self.p.timeout = TIMEOUT_PREPARE_MS + self.lab_mode = True + + def wait_for_boot_prompt(self, loop_num = 1): + """Wait for the boot up until command prompt. This is for internal use only. + """ + try: + self.log.info('Waiting for U-Boot to be ready') + bcfg = self.config.buildconfig + config_spl_serial = bcfg.get('config_spl_serial', 'n') == 'y' + env_spl_skipped = self.config.env.get('env__spl_skipped', False) + env_spl_banner_times = self.config.env.get('env__spl_banner_times', 1) + + while not self.lab_mode and loop_num > 0: + loop_num -= 1 + while config_spl_serial and not env_spl_skipped and env_spl_banner_times > 0: + m = self.p.expect([pattern_u_boot_spl_signon, + pattern_lab_mode] + self.bad_patterns) + if m == 1: + self.set_lab_mode() + break + elif m != 0: + raise BootFail('Bad pattern found on SPL console: ' + + self.bad_pattern_ids[m - 1]) + env_spl_banner_times -= 1 + + if not self.lab_mode: + m = self.p.expect([pattern_u_boot_main_signon, + pattern_lab_mode] + self.bad_patterns) + if m == 1: + self.set_lab_mode() + elif m != 0: + raise BootFail('Bad pattern found on console: ' + + self.bad_pattern_ids[m - 1]) + if not self.lab_mode: + self.u_boot_version_string = self.p.after + while True: + m = self.p.expect([self.prompt_compiled, pattern_ready_prompt, + pattern_stop_autoboot_prompt] + self.bad_patterns) + if m == 0: + self.log.info(f'Found ready prompt {m}') + break + elif m == 1: + m = pattern_ready_prompt.search(self.p.after) + self.u_boot_version_string = m.group(2) + self.log.info(f'Lab: Board is ready') + self.p.timeout = TIMEOUT_MS + break + if m == 2: + self.log.info(f'Found autoboot prompt {m}') + self.p.send(' ') + continue + if not self.lab_mode: + raise BootFail('Missing prompt / ready message on console: ' + + self.bad_pattern_ids[m - 3]) + self.log.info(f'U-Boot is ready') + + finally: + self.log.timestamp() + + def run_command(self, cmd, wait_for_echo=True, send_nl=True, + wait_for_prompt=True, wait_for_reboot=False): + """Execute a command via the U-Boot console. + + The command is always sent to U-Boot. + + U-Boot echoes any command back to its output, and this function + typically waits for that to occur. The wait can be disabled by setting + wait_for_echo=False, which is useful e.g. when sending CTRL-C to + interrupt a long-running command such as "ums". + + Command execution is typically triggered by sending a newline + character. This can be disabled by setting send_nl=False, which is + also useful when sending CTRL-C. + + This function typically waits for the command to finish executing, and + returns the console output that it generated. This can be disabled by + setting wait_for_prompt=False, which is useful when invoking a long- + running command such as "ums". + + Args: + cmd: The command to send. + wait_for_echo: Boolean indicating whether to wait for U-Boot to + echo the command text back to its output. + send_nl: Boolean indicating whether to send a newline character + after the command string. + wait_for_prompt: Boolean indicating whether to wait for the + command prompt to be sent by U-Boot. This typically occurs + immediately after the command has been executed. + wait_for_reboot: Boolean indication whether to wait for the + reboot U-Boot. If this sets True, wait_for_prompt must also + be True. + + Returns: + If wait_for_prompt == False: + Nothing. + Else: + The output from U-Boot during command execution. In other + words, the text U-Boot emitted between the point it echod the + command string and emitted the subsequent command prompts. + """ + + if self.at_prompt and \ + self.at_prompt_logevt != self.logstream.logfile.cur_evt: + self.logstream.write(self.prompt, implicit=True) + + try: + self.at_prompt = False + if not self.p: + raise BootFail( + f"Lab failure: Connection lost when sending command '{cmd}'") + + if send_nl: + cmd += '\n' + rem = cmd # Remaining to be sent + with self.temporary_timeout(TIMEOUT_CMD_MS): + while rem: + # Limit max outstanding data, so UART FIFOs don't overflow + chunk = rem[:self.max_fifo_fill] + rem = rem[self.max_fifo_fill:] + self.p.send(chunk) + if not wait_for_echo: + continue + chunk = re.escape(chunk) + chunk = chunk.replace('\\\n', '[\r\n]') + m = self.p.expect([chunk] + self.bad_patterns) + if m != 0: + self.at_prompt = False + raise BootFail(f"Failed to get echo on console (cmd '{cmd}':rem '{rem}'): " + + self.bad_pattern_ids[m - 1]) + if not wait_for_prompt: + return + if wait_for_reboot: + self.wait_for_boot_prompt() + else: + m = self.p.expect([self.prompt_compiled] + self.bad_patterns) + if m != 0: + self.at_prompt = False + raise BootFail('Missing prompt on console: ' + + self.bad_pattern_ids[m - 1]) + self.at_prompt = True + self.at_prompt_logevt = self.logstream.logfile.cur_evt + # Only strip \r\n; space/TAB might be significant if testing + # indentation. + return self.p.before.strip('\r\n') + except Timeout as exc: + handle_exception(self.config, self, self.log, exc, + f"Lab failure: Timeout executing '{cmd}'", True) + raise + except BootFail as exc: + handle_exception(self.config, self, self.log, exc, + f"'Boot fail '{cmd}'", + True, self.get_spawn_output()) + raise + finally: + self.log.timestamp() + + def run_command_list(self, cmds): + """Run a list of commands. + + This is a helper function to call run_command() with default arguments + for each command in a list. + + Args: + cmd: List of commands (each a string). + Returns: + A list of output strings from each command, one element for each + command. + """ + output = [] + for cmd in cmds: + output.append(self.run_command(cmd)) + return output + + def ctrlc(self): + """Send a CTRL-C character to U-Boot. + + This is useful in order to stop execution of long-running synchronous + commands such as "ums". + + Args: + None. + + Returns: + Nothing. + """ + + self.log.action('Sending Ctrl-C') + self.run_command(chr(3), wait_for_echo=False, send_nl=False) + + def wait_for(self, text): + """Wait for a pattern to be emitted by U-Boot. + + This is useful when a long-running command such as "dfu" is executing, + and it periodically emits some text that should show up at a specific + location in the log file. + + Args: + text: The text to wait for; either a string (containing raw text, + not a regular expression) or an re object. + + Returns: + Nothing. + """ + + if type(text) == type(''): + text = re.escape(text) + m = self.p.expect([text] + self.bad_patterns) + if m != 0: + raise Unexpected( + "Unexpected pattern found on console (exp '{text}': " + + self.bad_pattern_ids[m - 1]) + + def drain_console(self): + """Read from and log the U-Boot console for a short time. + + U-Boot's console output is only logged when the test code actively + waits for U-Boot to emit specific data. There are cases where tests + can fail without doing this. For example, if a test asks U-Boot to + enable USB device mode, then polls until a host-side device node + exists. In such a case, it is useful to log U-Boot's console output + in case U-Boot printed clues as to why the host-side even did not + occur. This function will do that. + + Args: + None. + + Returns: + Nothing. + """ + + # If we are already not connected to U-Boot, there's nothing to drain. + # This should only happen when a previous call to run_command() or + # wait_for() failed (and hence the output has already been logged), or + # the system is shutting down. + if not self.p: + return + + orig_timeout = self.p.timeout + try: + # Drain the log for a relatively short time. + self.p.timeout = 1000 + # Wait for something U-Boot will likely never send. This will + # cause the console output to be read and logged. + self.p.expect(['This should never match U-Boot output']) + except: + # We expect a timeout, since U-Boot won't print what we waited + # for. Squash it when it happens. + # + # Squash any other exception too. This function is only used to + # drain (and log) the U-Boot console output after a failed test. + # The U-Boot process will be restarted, or target board reset, once + # this function returns. So, we don't care about detecting any + # additional errors, so they're squashed so that the rest of the + # post-test-failure cleanup code can continue operation, and + # correctly terminate any log sections, etc. + pass + finally: + self.p.timeout = orig_timeout + + def ensure_spawned(self, expect_reset=False): + """Ensure a connection to a correctly running U-Boot instance. + + This may require spawning a new Sandbox process or resetting target + hardware, as defined by the implementation sub-class. + + This is an internal function and should not be called directly. + + Args: + expect_reset: Boolean indication whether this boot is expected + to be reset while the 1st boot process after main boot before + prompt. False by default. + + Returns: + Nothing. + """ + + if self.p: + # Reset the console timeout value as some tests may change + # its default value during the execution + if not self.config.gdbserver: + self.p.timeout = TIMEOUT_MS + return + try: + self.log.start_section('Starting U-Boot') + self.at_prompt = False + self.p = self.get_spawn() + # Real targets can take a long time to scroll large amounts of + # text if LCD is enabled. This value may need tweaking in the + # future, possibly per-test to be optimal. This works for 'help' + # on board 'seaboard'. + if not self.config.gdbserver: + self.p.timeout = TIMEOUT_MS + self.p.logfile_read = self.logstream + if self.config.use_running_system: + # Send an empty command to set up the 'expect' logic. This has + # the side effect of ensuring that there was no partial command + # line entered + self.run_command(' ') + else: + if expect_reset: + loop_num = 2 + else: + loop_num = 1 + self.wait_for_boot_prompt(loop_num = loop_num) + self.at_prompt = True + self.at_prompt_logevt = self.logstream.logfile.cur_evt + except Exception as ex: + self.log.error(str(ex)) + self.cleanup_spawn() + raise + finally: + self.log.timestamp() + self.log.end_section('Starting U-Boot') + + def cleanup_spawn(self): + """Shut down all interaction with the U-Boot instance. + + This is used when an error is detected prior to re-establishing a + connection with a fresh U-Boot instance. + + This is an internal function and should not be called directly. + + Args: + None. + + Returns: + Nothing. + """ + + try: + if self.p: + self.p.close() + except: + pass + self.p = None + + def restart_uboot(self, expect_reset=False): + """Shut down and restart U-Boot.""" + self.cleanup_spawn() + self.ensure_spawned(expect_reset) + + def get_spawn_output(self): + """Return the start-up output from U-Boot + + Returns: + The output produced by ensure_spawed(), as a string. + """ + if self.p: + return self.p.get_expect_output() + return None + + def validate_version_string_in_text(self, text): + """Assert that a command's output includes the U-Boot signon message. + + This is primarily useful for validating the "version" command without + duplicating the signon text regex in a test function. + + Args: + text: The command output text to check. + + Returns: + Nothing. An exception is raised if the validation fails. + """ + + assert(self.u_boot_version_string in text) + + def disable_check(self, check_type): + """Temporarily disable an error check of U-Boot's output. + + Create a new context manager (for use with the "with" statement) which + temporarily disables a particular console output error check. + + Args: + check_type: The type of error-check to disable. Valid values may + be found in self.disable_check_count above. + + Returns: + A context manager object. + """ + + return ConsoleDisableCheck(self, check_type) + + def enable_check(self, check_type, check_pattern): + """Temporarily enable an error check of U-Boot's output. + + Create a new context manager (for use with the "with" statement) which + temporarily enables a particular console output error check. The + arguments form a new element of bad_pattern_defs defined above. + + Args: + check_type: The type of error-check or bad pattern to enable. + check_pattern: The regexes for text error pattern or bad pattern + to be checked. + + Returns: + A context manager object. + """ + + return ConsoleEnableCheck(self, check_type, check_pattern) + + def temporary_timeout(self, timeout): + """Temporarily set up different timeout for commands. + + Create a new context manager (for use with the "with" statement) which + temporarily change timeout. + + Args: + timeout: Time in milliseconds. + + Returns: + A context manager object. + """ + + return ConsoleSetupTimeout(self, timeout) diff --git a/test/py/console_board.py b/test/py/console_board.py new file mode 100644 index 00000000000..bacb1e2526c --- /dev/null +++ b/test/py/console_board.py @@ -0,0 +1,85 @@ +# SPDX-License-Identifier: GPL-2.0 +# Copyright (c) 2015 Stephen Warren +# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. + +""" +Logic to interact with U-Boot running on real hardware, typically via a +physical serial port. +""" + +import sys +from spawn import Spawn +from console_base import ConsoleBase + +class ConsoleExecAttach(ConsoleBase): + """Represents a physical connection to a U-Boot console, typically via a + serial port. This implementation executes a sub-process to attach to the + console, expecting that the stdin/out of the sub-process will be forwarded + to/from the physical hardware. This approach isolates the test infra- + structure from the user-/installation-specific details of how to + communicate with, and the identity of, serial ports etc.""" + + def __init__(self, log, config): + """Initialize a U-Boot console connection. + + Args: + log: A multiplexed_log.Logfile instance. + config: A "configuration" object as defined in conftest.py. + + Returns: + Nothing. + """ + + # The max_fifo_fill value might need tweaking per-board/-SoC? + # 1 would be safe anywhere, but is very slow (a pexpect issue?). + # 16 is a common FIFO size. + # HW flow control would mean this could be infinite. + super(ConsoleExecAttach, self).__init__(log, config, max_fifo_fill=16) + + with self.log.section('flash'): + self.log.action('Flashing U-Boot') + cmd = ['u-boot-test-flash', config.board_type, config.board_identity] + runner = self.log.get_runner(cmd[0], sys.stdout) + runner.run(cmd) + runner.close() + self.log.status_pass('OK') + + def get_spawn(self): + """Connect to a fresh U-Boot instance. + + The target board is reset, so that U-Boot begins running from scratch. + + Args: + None. + + Returns: + A spawn.Spawn object that is attached to U-Boot. + """ + + args = [self.config.board_type, self.config.board_identity] + s = Spawn(['u-boot-test-console'] + args) + + if self.config.use_running_system: + self.log.action('Connecting to board without reset') + else: + try: + self.log.action('Resetting board') + cmd = ['u-boot-test-reset'] + args + runner = self.log.get_runner(cmd[0], sys.stdout) + runner.run(cmd) + runner.close() + except: + s.close() + raise + + return s + + def close(self): + super().close() + + self.log.action('Releasing board') + args = [self.config.board_type, self.config.board_identity] + cmd = ['u-boot-test-release'] + args + runner = self.log.get_runner(cmd[0], sys.stdout) + runner.run(cmd) + runner.close() diff --git a/test/py/console_sandbox.py b/test/py/console_sandbox.py new file mode 100644 index 00000000000..da55d2fcc1f --- /dev/null +++ b/test/py/console_sandbox.py @@ -0,0 +1,119 @@ +# SPDX-License-Identifier: GPL-2.0 +# Copyright (c) 2015 Stephen Warren +# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. + +""" +Logic to interact with the sandbox port of U-Boot, running as a sub-process. +""" + +import time +from spawn import Spawn +from console_base import ConsoleBase + +class ConsoleSandbox(ConsoleBase): + """Represents a connection to a sandbox U-Boot console, executed as a sub- + process.""" + + def __init__(self, log, config): + """Initialize a U-Boot console connection. + + Args: + log: A multiplexed_log.Logfile instance. + config: A "configuration" object as defined in conftest.py. + + Returns: + Nothing. + """ + + super(ConsoleSandbox, self).__init__(log, config, max_fifo_fill=1024) + self.sandbox_flags = [] + self.use_dtb = True + + def get_spawn(self): + """Connect to a fresh U-Boot instance. + + A new sandbox process is created, so that U-Boot begins running from + scratch. + + Args: + None. + + Returns: + A spawn.Spawn object that is attached to U-Boot. + """ + + bcfg = self.config.buildconfig + config_spl = bcfg.get('config_spl', 'n') == 'y' + config_vpl = bcfg.get('config_vpl', 'n') == 'y' + if config_vpl: + # Run TPL first, which runs VPL + fname = '/tpl/u-boot-tpl' + else: + fname = '/spl/u-boot-spl' if config_spl else '/u-boot' + print(fname) + cmd = [] + if self.config.gdbserver: + cmd += ['gdbserver', self.config.gdbserver] + cmd += [self.config.build_dir + fname, '-v'] + if self.use_dtb: + cmd += ['-d', self.config.dtb] + cmd += self.sandbox_flags + return Spawn(cmd, cwd=self.config.source_dir, decode_signal=True) + + def restart_uboot_with_flags(self, flags, expect_reset=False, use_dtb=True): + """Run U-Boot with the given command-line flags + + Args: + flags: List of flags to pass, each a string + expect_reset: Boolean indication whether this boot is expected + to be reset while the 1st boot process after main boot before + prompt. False by default. + use_dtb: True to use a device tree file, False to run without one + + Returns: + A spawn.Spawn object that is attached to U-Boot. + """ + + try: + self.sandbox_flags = flags + self.use_dtb = use_dtb + return self.restart_uboot(expect_reset) + finally: + self.sandbox_flags = [] + self.use_dtb = True + + def kill(self, sig): + """Send a specific Unix signal to the sandbox process. + + Args: + sig: The Unix signal to send to the process. + + Returns: + Nothing. + """ + + self.log.action('kill %d' % sig) + self.p.kill(sig) + + def validate_exited(self): + """Determine whether the sandbox process has exited. + + If required, this function waits a reasonable time for the process to + exit. + + Args: + None. + + Returns: + Boolean indicating whether the process has exited. + """ + + p = self.p + self.p = None + for i in range(100): + ret = not p.isalive() + if ret: + break + time.sleep(0.1) + p.close() + return ret diff --git a/test/py/spawn.py b/test/py/spawn.py new file mode 100644 index 00000000000..c703454389d --- /dev/null +++ b/test/py/spawn.py @@ -0,0 +1,345 @@ +# SPDX-License-Identifier: GPL-2.0 +# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. + +""" +Logic to spawn a sub-process and interact with its stdio. +""" + +import io +import os +import re +import pty +import pytest +import signal +import select +import sys +import termios +import time +import traceback + +# Character to send (twice) to exit the terminal +EXIT_CHAR = 0x1d # FS (Ctrl + ]) + +class Timeout(Exception): + """An exception sub-class that indicates that a timeout occurred.""" + +class BootFail(Exception): + """An exception sub-class that indicates that a boot failure occurred. + + This is used when a bad pattern is seen when waiting for the boot prompt. + It is regarded as fatal, to avoid trying to boot the again and again to no + avail. + """ + +class Unexpected(Exception): + """An exception sub-class that indicates that unexpected test was seen.""" + + +def handle_exception(ubconfig, console, log, err, name, fatal, output=''): + """Handle an exception from the console + + Exceptions can occur when there is unexpected output or due to the board + crashing or hanging. Some exceptions are likely fatal, where retrying will + just chew up time to no available. In those cases it is best to cause + further tests be skipped. + + Args: + ubconfig (ArbitraryAttributeContainer): ubconfig object + log (Logfile): Place to log errors + console (ConsoleBase): Console to clean up, if fatal + err (Exception): Exception which was thrown + name (str): Name of problem, to log + fatal (bool): True to abort all tests + output (str): Extra output to report on boot failure. This can show the + target's console output as it tried to boot + """ + msg = f'{name}: ' + if fatal: + msg += 'Marking connection bad - no other tests will run' + else: + msg += 'Assuming that lab is healthy' + print(msg) + log.error(msg) + log.error(f'Error: {err}') + + if output: + msg += f'; output {output}' + + if fatal: + ubconfig.connection_ok = False + console.cleanup_spawn() + pytest.exit(msg) + + +class Spawn: + """Represents the stdio of a freshly created sub-process. Commands may be + sent to the process, and responses waited for. + + Members: + output: accumulated output from expect() + """ + + def __init__(self, args, cwd=None, decode_signal=False): + """Spawn (fork/exec) the sub-process. + + Args: + args: array of processs arguments. argv[0] is the command to + execute. + cwd: the directory to run the process in, or None for no change. + decode_signal (bool): True to indicate the exception number when + something goes wrong + + Returns: + Nothing. + """ + self.decode_signal = decode_signal + self.waited = False + self.exit_code = 0 + self.exit_info = '' + self.buf = '' + self.output = '' + self.logfile_read = None + self.before = '' + self.after = '' + self.timeout = None + # http://stackoverflow.com/questions/7857352/python-regex-to-match-vt100-escape-sequences + self.re_vt100 = re.compile(r'(\x1b\[|\x9b)[^@-_]*[@-_]|\x1b[@-_]', re.I) + + (self.pid, self.fd) = pty.fork() + if self.pid == 0: + try: + # For some reason, SIGHUP is set to SIG_IGN at this point when + # run under "go" (www.go.cd). Perhaps this happens under any + # background (non-interactive) system? + signal.signal(signal.SIGHUP, signal.SIG_DFL) + if cwd: + os.chdir(cwd) + os.execvp(args[0], args) + except: + print('CHILD EXECEPTION:') + traceback.print_exc() + finally: + os._exit(255) + + old = None + try: + isatty = False + try: + isatty = os.isatty(sys.stdout.fileno()) + + # with --capture=tee-sys we cannot call fileno() + except io.UnsupportedOperation as exc: + pass + if isatty: + new = termios.tcgetattr(self.fd) + old = new + new[3] = new[3] & ~(termios.ICANON | termios.ISIG) + new[3] = new[3] & ~termios.ECHO + new[6][termios.VMIN] = 0 + new[6][termios.VTIME] = 0 + termios.tcsetattr(self.fd, termios.TCSANOW, new) + + self.poll = select.poll() + self.poll.register(self.fd, select.POLLIN | select.POLLPRI | select.POLLERR | + select.POLLHUP | select.POLLNVAL) + except: + if old: + termios.tcsetattr(self.fd, termios.TCSANOW, old) + self.close() + raise + + def kill(self, sig): + """Send unix signal "sig" to the child process. + + Args: + sig: The signal number to send. + + Returns: + Nothing. + """ + + os.kill(self.pid, sig) + + def checkalive(self): + """Determine whether the child process is still running. + + Returns: + tuple: + True if process is alive, else False + 0 if process is alive, else exit code of process + string describing what happened ('' or 'status/signal n') + """ + + if self.waited: + return False, self.exit_code, self.exit_info + + w = os.waitpid(self.pid, os.WNOHANG) + if w[0] == 0: + return True, 0, 'running' + status = w[1] + + if os.WIFEXITED(status): + self.exit_code = os.WEXITSTATUS(status) + self.exit_info = 'status %d' % self.exit_code + elif os.WIFSIGNALED(status): + signum = os.WTERMSIG(status) + self.exit_code = -signum + self.exit_info = 'signal %d (%s)' % (signum, signal.Signals(signum).name) + self.waited = True + return False, self.exit_code, self.exit_info + + def isalive(self): + """Determine whether the child process is still running. + + Args: + None. + + Returns: + Boolean indicating whether process is alive. + """ + return self.checkalive()[0] + + def send(self, data): + """Send data to the sub-process's stdin. + + Args: + data: The data to send to the process. + + Returns: + Nothing. + """ + + os.write(self.fd, data.encode(errors='replace')) + + def receive(self, num_bytes): + """Receive data from the sub-process's stdin. + + Args: + num_bytes (int): Maximum number of bytes to read + + Returns: + str: The data received + + Raises: + ValueError if U-Boot died + """ + try: + c = os.read(self.fd, num_bytes).decode(errors='replace') + except OSError as err: + # With sandbox, try to detect when U-Boot exits when it + # shouldn't and explain why. This is much more friendly than + # just dying with an I/O error + if self.decode_signal and err.errno == 5: # I/O error + alive, _, info = self.checkalive() + if alive: + raise err + raise ValueError('U-Boot exited with %s' % info) + raise + return c + + def expect(self, patterns): + """Wait for the sub-process to emit specific data. + + This function waits for the process to emit one pattern from the + supplied list of patterns, or for a timeout to occur. + + Args: + patterns: A list of strings or regex objects that we expect to + see in the sub-process' stdout. + + Returns: + The index within the patterns array of the pattern the process + emitted. + + Notable exceptions: + Timeout, if the process did not emit any of the patterns within + the expected time. + """ + + for pi in range(len(patterns)): + if type(patterns[pi]) == type(''): + patterns[pi] = re.compile(patterns[pi]) + + tstart_s = time.time() + try: + while True: + earliest_m = None + earliest_pi = None + for pi in range(len(patterns)): + pattern = patterns[pi] + m = pattern.search(self.buf) + if not m: + continue + if earliest_m and m.start() >= earliest_m.start(): + continue + earliest_m = m + earliest_pi = pi + if earliest_m: + pos = earliest_m.start() + posafter = earliest_m.end() + self.before = self.buf[:pos] + self.after = self.buf[pos:posafter] + self.output += self.buf[:posafter] + self.buf = self.buf[posafter:] + return earliest_pi + tnow_s = time.time() + if self.timeout: + tdelta_ms = (tnow_s - tstart_s) * 1000 + poll_maxwait = self.timeout - tdelta_ms + if tdelta_ms > self.timeout: + raise Timeout() + else: + poll_maxwait = None + events = self.poll.poll(poll_maxwait) + if not events: + raise Timeout() + c = self.receive(1024) + if self.logfile_read: + self.logfile_read.write(c) + self.buf += c + # count=0 is supposed to be the default, which indicates + # unlimited substitutions, but in practice the version of + # Python in Ubuntu 14.04 appears to default to count=2! + self.buf = self.re_vt100.sub('', self.buf, count=1000000) + finally: + if self.logfile_read: + self.logfile_read.flush() + + def close(self): + """Close the stdio connection to the sub-process. + + This also waits a reasonable time for the sub-process to stop running. + + Args: + None. + + Returns: + str: Type of closure completed + """ + # For Labgrid-sjg, ask it is exit gracefully, so it can transition the + # board to the final state (like 'off') before exiting. + if os.environ.get('USE_LABGRID_SJG'): + self.send(chr(EXIT_CHAR) * 2) + + # Wait about 10 seconds for Labgrid to close and power off the board + for _ in range(100): + if not self.isalive(): + return 'normal' + time.sleep(0.1) + + # That didn't work, so try closing the PTY + os.close(self.fd) + for _ in range(100): + if not self.isalive(): + return 'break' + time.sleep(0.1) + + return 'timeout' + + def get_expect_output(self): + """Return the output read by expect() + + Returns: + The output processed by expect(), as a string. + """ + return self.output diff --git a/test/py/tests/test_android/test_ab.py b/test/py/tests/test_android/test_ab.py index 739b7ce695d..5876a137463 100644 --- a/test/py/tests/test_android/test_ab.py +++ b/test/py/tests/test_android/test_ab.py @@ -5,7 +5,7 @@ import os import pytest -import u_boot_utils +import utils class ABTestDiskImage(object): """Disk Image used by the A/B tests.""" @@ -25,7 +25,7 @@ class ABTestDiskImage(object): persistent = ubman.config.persistent_data_dir + '/' + filename self.path = ubman.config.result_dir + '/' + filename - with u_boot_utils.persistent_file_helper(ubman.log, persistent): + with utils.persistent_file_helper(ubman.log, persistent): if os.path.exists(persistent): ubman.log.action('Disk image file ' + persistent + ' already exists') @@ -35,16 +35,16 @@ class ABTestDiskImage(object): os.ftruncate(fd, 524288) os.close(fd) cmd = ('sgdisk', persistent) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) cmd = ('sgdisk', '--new=1:64:512', '--change-name=1:misc', persistent) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) cmd = ('sgdisk', '--load-backup=' + persistent) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) cmd = ('cp', persistent, self.path) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) di = None @pytest.fixture(scope='function') diff --git a/test/py/tests/test_android/test_abootimg.py b/test/py/tests/test_android/test_abootimg.py index fd3e08fa899..c31fb466ec7 100644 --- a/test/py/tests/test_android/test_abootimg.py +++ b/test/py/tests/test_android/test_abootimg.py @@ -6,7 +6,7 @@ import os import pytest -import u_boot_utils +import utils """ These tests rely on disk image (boot.img), which is automatically created by @@ -122,7 +122,7 @@ class AbootimgTestDiskImage(object): persistent = ubman.config.persistent_data_dir + '/' + filename self.path = ubman.config.result_dir + '/' + filename ubman.log.action('persistent is ' + persistent) - with u_boot_utils.persistent_file_helper(ubman.log, persistent): + with utils.persistent_file_helper(ubman.log, persistent): if os.path.exists(persistent): ubman.log.action('Disk image file ' + persistent + ' already exists') @@ -133,12 +133,12 @@ class AbootimgTestDiskImage(object): f.write(hex_img) f.close() cmd = ('xxd', '-r', '-p', gz_hex, gz) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) cmd = ('gunzip', '-9', gz) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) cmd = ('cp', persistent, self.path) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) gtdi1 = None @pytest.fixture(scope='function') diff --git a/test/py/tests/test_android/test_avb.py b/test/py/tests/test_android/test_avb.py index 451a476da76..1d600b95c6f 100644 --- a/test/py/tests/test_android/test_avb.py +++ b/test/py/tests/test_android/test_avb.py @@ -15,7 +15,7 @@ For configuration verification: """ import pytest -import u_boot_utils as util +import utils as util # defauld mmc id mmc_dev = 1 diff --git a/test/py/tests/test_dfu.py b/test/py/tests/test_dfu.py index 6efb69990d4..7d6f41db7fb 100644 --- a/test/py/tests/test_dfu.py +++ b/test/py/tests/test_dfu.py @@ -9,7 +9,7 @@ import os import os.path import pytest -import u_boot_utils +import utils """ Note: This test relies on: @@ -143,9 +143,9 @@ def test_dfu(ubman, env__usb_dev_port, env__dfu_config): Nothing. """ - u_boot_utils.wait_until_file_open_fails( + utils.wait_until_file_open_fails( env__usb_dev_port['host_usb_dev_node'], True) - fh = u_boot_utils.attempt_to_open_file( + fh = utils.attempt_to_open_file( env__usb_dev_port['host_usb_dev_node']) if fh: fh.close() @@ -164,7 +164,7 @@ def test_dfu(ubman, env__usb_dev_port, env__dfu_config): cmd = 'dfu 0 ' + env__dfu_config['cmd_params'] ubman.run_command(cmd, wait_for_prompt=False) ubman.log.action('Waiting for DFU USB device to appear') - fh = u_boot_utils.wait_until_open_succeeds( + fh = utils.wait_until_open_succeeds( env__usb_dev_port['host_usb_dev_node']) fh.close() @@ -190,7 +190,7 @@ def test_dfu(ubman, env__usb_dev_port, env__dfu_config): ubman.ctrlc() ubman.log.action( 'Waiting for DFU USB device to disappear') - u_boot_utils.wait_until_file_open_fails( + utils.wait_until_file_open_fails( env__usb_dev_port['host_usb_dev_node'], ignore_errors) except: if not ignore_errors: @@ -213,7 +213,7 @@ def test_dfu(ubman, env__usb_dev_port, env__dfu_config): cmd = ['dfu-util', '-a', alt_setting, up_dn_load_arg, fn] if 'host_usb_port_path' in env__usb_dev_port: cmd += ['-p', env__usb_dev_port['host_usb_port_path']] - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) ubman.wait_for('Ctrl+C to exit ...') def dfu_write(alt_setting, fn): @@ -261,7 +261,7 @@ def test_dfu(ubman, env__usb_dev_port, env__dfu_config): Nothing. """ - test_f = u_boot_utils.PersistentRandomFile(ubman, + test_f = utils.PersistentRandomFile(ubman, 'dfu_%d.bin' % size, size) readback_fn = ubman.config.result_dir + '/dfu_readback.bin' @@ -279,7 +279,7 @@ def test_dfu(ubman, env__usb_dev_port, env__dfu_config): ubman.log.action('Comparing written and read data') written_hash = test_f.content_hash - read_back_hash = u_boot_utils.md5sum_file(readback_fn, size) + read_back_hash = utils.md5sum_file(readback_fn, size) assert(written_hash == read_back_hash) # This test may be executed against multiple USB ports. The test takes a @@ -295,7 +295,7 @@ def test_dfu(ubman, env__usb_dev_port, env__dfu_config): else: sizes = [] - dummy_f = u_boot_utils.PersistentRandomFile(ubman, + dummy_f = utils.PersistentRandomFile(ubman, 'dfu_dummy.bin', 1024) alt_setting_test_file = env__dfu_config.get('alt_id_test_file', '0') diff --git a/test/py/tests/test_efi_fit.py b/test/py/tests/test_efi_fit.py index 4a464382126..c836834845e 100644 --- a/test/py/tests/test_efi_fit.py +++ b/test/py/tests/test_efi_fit.py @@ -55,7 +55,7 @@ env__efi_fit_tftp_file = { import os.path import pytest -import u_boot_utils as util +import utils as util # Define the parametrized ITS data to be used for FIT images generation. ITS_DATA = ''' diff --git a/test/py/tests/test_efi_loader.py b/test/py/tests/test_efi_loader.py index ff880ffa527..58f2655191f 100644 --- a/test/py/tests/test_efi_loader.py +++ b/test/py/tests/test_efi_loader.py @@ -53,7 +53,7 @@ env__efi_helloworld_net_http_test_skip = True """ import pytest -import u_boot_utils +import utils PROTO_TFTP, PROTO_HTTP = range(0, 2) @@ -132,7 +132,7 @@ def fetch_file(ubman, env_conf, proto): addr = f.get('addr', None) if not addr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) fn = f['fn'] if proto == PROTO_TFTP: diff --git a/test/py/tests/test_env.py b/test/py/tests/test_env.py index 6f75e107bbe..376ea7b78b8 100644 --- a/test/py/tests/test_env.py +++ b/test/py/tests/test_env.py @@ -13,7 +13,7 @@ from subprocess import call, CalledProcessError import tempfile import pytest -import u_boot_utils +import utils # FIXME: This might be useful for other tests; # perhaps refactor it into ConsoleBase or some other state object? @@ -187,7 +187,7 @@ def test_env_initial_env_file(ubman): except: pass - u_boot_utils.run_and_log(cons, ['make', builddir, 'u-boot-initial-env']) + utils.run_and_log(cons, ['make', builddir, 'u-boot-initial-env']) assert os.path.exists(envfile) @@ -278,7 +278,7 @@ def test_env_import_checksum_no_size(state_test_env): env import function. """ c = state_test_env.ubman - ram_base = u_boot_utils.find_ram_base(state_test_env.ubman) + ram_base = utils.find_ram_base(state_test_env.ubman) addr = '%08x' % ram_base with c.disable_check('error_notification'): @@ -291,7 +291,7 @@ def test_env_import_whitelist_checksum_no_size(state_test_env): env import function when variables are passed as parameters. """ c = state_test_env.ubman - ram_base = u_boot_utils.find_ram_base(state_test_env.ubman) + ram_base = utils.find_ram_base(state_test_env.ubman) addr = '%08x' % ram_base with c.disable_check('error_notification'): @@ -303,7 +303,7 @@ def test_env_import_whitelist_checksum_no_size(state_test_env): def test_env_import_whitelist(state_test_env): """Test importing only a handful of env variables from an environment.""" c = state_test_env.ubman - ram_base = u_boot_utils.find_ram_base(state_test_env.ubman) + ram_base = utils.find_ram_base(state_test_env.ubman) addr = '%08x' % ram_base set_var(state_test_env, 'foo1', 'bar1') @@ -340,7 +340,7 @@ def test_env_import_whitelist_delete(state_test_env): environment to be imported. """ c = state_test_env.ubman - ram_base = u_boot_utils.find_ram_base(state_test_env.ubman) + ram_base = utils.find_ram_base(state_test_env.ubman) addr = '%08x' % ram_base set_var(state_test_env, 'foo1', 'bar1') @@ -446,16 +446,16 @@ def mk_env_ext4(state_test_env): # Some distributions do not add /sbin to the default PATH, where mkfs.ext4 lives os.environ["PATH"] += os.pathsep + '/sbin' try: - u_boot_utils.run_and_log(c, 'dd if=/dev/zero of=%s bs=1M count=16' % persistent) - u_boot_utils.run_and_log(c, 'mkfs.ext4 %s' % persistent) - sb_content = u_boot_utils.run_and_log(c, 'tune2fs -l %s' % persistent) + utils.run_and_log(c, 'dd if=/dev/zero of=%s bs=1M count=16' % persistent) + utils.run_and_log(c, 'mkfs.ext4 %s' % persistent) + sb_content = utils.run_and_log(c, 'tune2fs -l %s' % persistent) if 'metadata_csum' in sb_content: - u_boot_utils.run_and_log(c, 'tune2fs -O ^metadata_csum %s' % persistent) + utils.run_and_log(c, 'tune2fs -O ^metadata_csum %s' % persistent) except CalledProcessError: call('rm -f %s' % persistent, shell=True) raise - u_boot_utils.run_and_log(c, ['cp', '-f', persistent, fs_img]) + utils.run_and_log(c, ['cp', '-f', persistent, fs_img]) return fs_img @pytest.mark.boardspec('sandbox') @@ -560,7 +560,7 @@ def test_env_text(ubman): fname = os.path.join(path, 'infile') with open(fname, 'w') as inf: print(intext, file=inf) - result = u_boot_utils.run_and_log(cons, ['awk', '-f', script, fname]) + result = utils.run_and_log(cons, ['awk', '-f', script, fname]) if expect_val is not None: expect = '#define CONFIG_EXTRA_ENV_TEXT "%s"\n' % expect_val assert result == expect diff --git a/test/py/tests/test_event_dump.py b/test/py/tests/test_event_dump.py index 3f8ba647848..5a9d39da74c 100644 --- a/test/py/tests/test_event_dump.py +++ b/test/py/tests/test_event_dump.py @@ -4,7 +4,7 @@ import pytest import re -import u_boot_utils as util +import utils as util # This is only a partial test - coverting 64-bit sandbox. It does not test # big-endian images, nor 32-bit images diff --git a/test/py/tests/test_extension.py b/test/py/tests/test_extension.py index d0840f779bc..61223496054 100644 --- a/test/py/tests/test_extension.py +++ b/test/py/tests/test_extension.py @@ -6,7 +6,7 @@ import os import pytest -import u_boot_utils +import utils overlay_addr = 0x1000 diff --git a/test/py/tests/test_fit.py b/test/py/tests/test_fit.py index 459be5af39d..0f2d0940562 100755 --- a/test/py/tests/test_fit.py +++ b/test/py/tests/test_fit.py @@ -6,7 +6,7 @@ import os import pytest import struct -import u_boot_utils as util +import utils as util import fit_util # Define a base ITS which we can adjust using % and a dictionary diff --git a/test/py/tests/test_fit_auto_signed.py b/test/py/tests/test_fit_auto_signed.py index 72f39edacf8..bee93e4f084 100644 --- a/test/py/tests/test_fit_auto_signed.py +++ b/test/py/tests/test_fit_auto_signed.py @@ -17,7 +17,7 @@ The test does not run the sandbox. It only checks the host tool mkimage. import os import pytest -import u_boot_utils as util +import utils as util import binascii from Cryptodome.Hash import SHA1 from Cryptodome.Hash import SHA256 diff --git a/test/py/tests/test_fit_ecdsa.py b/test/py/tests/test_fit_ecdsa.py index 428d3e76a18..7b25c7779c6 100644 --- a/test/py/tests/test_fit_ecdsa.py +++ b/test/py/tests/test_fit_ecdsa.py @@ -12,7 +12,7 @@ This test doesn't run the sandbox. It only checks the host tool 'mkimage' import os import pytest -import u_boot_utils as util +import utils as util from Cryptodome.Hash import SHA256 from Cryptodome.PublicKey import ECC from Cryptodome.Signature import DSS diff --git a/test/py/tests/test_fit_hashes.py b/test/py/tests/test_fit_hashes.py index 7bc24a7c870..0b3c85f8e23 100644 --- a/test/py/tests/test_fit_hashes.py +++ b/test/py/tests/test_fit_hashes.py @@ -12,7 +12,7 @@ This test doesn't run the sandbox. It only checks the host tool 'mkimage' import os import pytest -import u_boot_utils as util +import utils as util kernel_hashes = { "sha512" : "f18c1486a2c29f56360301576cdfce4dfd8e8e932d0ed8e239a1f314b8ae1d77b2a58cd7fe32e4075e69448e623ce53b0b6aa6ce5626d2c189a5beae29a68d93", diff --git a/test/py/tests/test_fpga.py b/test/py/tests/test_fpga.py index 7450b13945f..74cd42b910e 100644 --- a/test/py/tests/test_fpga.py +++ b/test/py/tests/test_fpga.py @@ -8,7 +8,7 @@ import pytest import re import random -import u_boot_utils +import utils """ Note: This test relies on boardenv_* containing configuration values to define @@ -518,7 +518,7 @@ def test_fpga_secure_bit_auth(ubman): addr = f.get('addr', None) if not addr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) expected_tftp = 'Bytes transferred = ' fn = f['fn'] @@ -546,7 +546,7 @@ def test_fpga_secure_bit_img_auth_kup(ubman): keyaddr = f.get('keyaddr', None) if not keyaddr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) expected_tftp = 'Bytes transferred = ' keyfn = f['keyfn'] output = ubman.run_command('tftpboot %x %s' % (keyaddr, keyfn)) @@ -554,7 +554,7 @@ def test_fpga_secure_bit_img_auth_kup(ubman): addr = f.get('addr', None) if not addr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) expected_tftp = 'Bytes transferred = ' fn = f['enckupfn'] output = ubman.run_command('tftpboot %x %s' % (addr, fn)) diff --git a/test/py/tests/test_fs/conftest.py b/test/py/tests/test_fs/conftest.py index 7bfcf41ed6f..5b259796ebe 100644 --- a/test/py/tests/test_fs/conftest.py +++ b/test/py/tests/test_fs/conftest.py @@ -8,7 +8,7 @@ import pytest import re from subprocess import call, check_call, check_output, CalledProcessError from fstest_defs import * -import u_boot_utils as util +import utils as util # pylint: disable=E0611 from tests import fs_helper diff --git a/test/py/tests/test_gpio.py b/test/py/tests/test_gpio.py index 5c9d0b60bf5..46b674b7653 100644 --- a/test/py/tests/test_gpio.py +++ b/test/py/tests/test_gpio.py @@ -5,7 +5,7 @@ import pytest import time -import u_boot_utils +import utils """ test_gpio_input is intended to test the fix 4dbc107f4683. diff --git a/test/py/tests/test_gpt.py b/test/py/tests/test_gpt.py index 0bd6a21278a..cfc8f1319a9 100644 --- a/test/py/tests/test_gpt.py +++ b/test/py/tests/test_gpt.py @@ -6,7 +6,7 @@ import os import pytest -import u_boot_utils +import utils """ These tests rely on a 4 MB disk image, which is automatically created by @@ -63,7 +63,7 @@ class GptTestDiskImage(object): persistent = ubman.config.persistent_data_dir + '/' + filename self.path = ubman.config.result_dir + '/' + filename - with u_boot_utils.persistent_file_helper(ubman.log, persistent): + with utils.persistent_file_helper(ubman.log, persistent): if os.path.exists(persistent): ubman.log.action('Disk image file ' + persistent + ' already exists') @@ -75,23 +75,23 @@ class GptTestDiskImage(object): cmd = ('sgdisk', '--disk-guid=375a56f7-d6c9-4e81-b5f0-09d41ca89efe', persistent) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) # part1 offset 1MB size 1MB cmd = ('sgdisk', '--new=1:2048:4095', '--change-name=1:part1', '--partition-guid=1:33194895-67f6-4561-8457-6fdeed4f50a3', '-A 1:set:2', persistent) # part2 offset 2MB size 1.5MB - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) cmd = ('sgdisk', '--new=2:4096:7167', '--change-name=2:part2', '--partition-guid=2:cc9c6e4a-6551-4cb5-87be-3210f96c86fb', persistent) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) cmd = ('sgdisk', '--load-backup=' + persistent) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) cmd = ('cp', persistent, self.path) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) @pytest.fixture(scope='function') def state_disk_image(ubman): diff --git a/test/py/tests/test_kconfig.py b/test/py/tests/test_kconfig.py index 9aeffa748c4..b4a28ec7a5a 100644 --- a/test/py/tests/test_kconfig.py +++ b/test/py/tests/test_kconfig.py @@ -4,7 +4,7 @@ import pytest -import u_boot_utils as util +import utils as util # This is needed for Azure, since the default '..' directory is not writeable TMPDIR = '/tmp/test_kconfig' diff --git a/test/py/tests/test_md.py b/test/py/tests/test_md.py index 69e12eb625f..5c7bcbd420b 100644 --- a/test/py/tests/test_md.py +++ b/test/py/tests/test_md.py @@ -3,14 +3,14 @@ # Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. import pytest -import u_boot_utils +import utils @pytest.mark.buildconfigspec('cmd_memory') def test_md(ubman): """Test that md reads memory as expected, and that memory can be modified using the mw command.""" - ram_base = u_boot_utils.find_ram_base(ubman) + ram_base = utils.find_ram_base(ubman) addr = '%08x' % ram_base val = 'a5f09876' expected_response = addr + ': ' + val @@ -26,7 +26,7 @@ def test_md_repeat(ubman): """Test command repeat (via executing an empty command) operates correctly for "md"; the command must repeat and dump an incrementing address.""" - ram_base = u_boot_utils.find_ram_base(ubman) + ram_base = utils.find_ram_base(ubman) addr_base = '%08x' % ram_base words = 0x10 addr_repeat = '%08x' % (ram_base + (words * 4)) diff --git a/test/py/tests/test_mmc.py b/test/py/tests/test_mmc.py index 4ecd999c02c..4916dcd8529 100644 --- a/test/py/tests/test_mmc.py +++ b/test/py/tests/test_mmc.py @@ -4,7 +4,7 @@ import pytest import random import re -import u_boot_utils +import utils """ Note: This test doesn't rely on boardenv_* configuration values but it can @@ -290,7 +290,7 @@ def test_mmc_fatload_fatwrite(ubman): for y in mmc_modes: ubman.run_command('mmc dev %d %d %d' % x, part, y) part_detect = 1 - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) devices[x]['addr_%d' % part] = addr size = random.randint(4, 1 * 1024 * 1024) devices[x]['size_%d' % part] = size @@ -394,7 +394,7 @@ def test_mmc_ext4load_ext4write(ubman): for y in mmc_modes: ubman.run_command('mmc dev %d %d %d' % x, part, y) part_detect = 1 - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) devices[x]['addr_%d' % part] = addr size = random.randint(4, 1 * 1024 * 1024) devices[x]['size_%d' % part] = size @@ -658,7 +658,7 @@ def test_mmc_fat_read_write_files(ubman): for y in mmc_modes: ubman.run_command('mmc dev %d %d %d' % x, part, y) part_detect = 1 - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) count_f = 0 addr_l = [] size_l = [] diff --git a/test/py/tests/test_mmc_rd.py b/test/py/tests/test_mmc_rd.py index 3c8356f872f..cd1e299aa9d 100644 --- a/test/py/tests/test_mmc_rd.py +++ b/test/py/tests/test_mmc_rd.py @@ -7,7 +7,7 @@ import pytest import time -import u_boot_utils +import utils """ This test relies on boardenv_* to containing configuration values to define @@ -241,7 +241,7 @@ def test_mmc_rd(ubman, env__mmc_rd_config): bcfg = ubman.config.buildconfig has_cmd_memory = bcfg.get('config_cmd_memory', 'n') == 'y' has_cmd_crc32 = bcfg.get('config_cmd_crc32', 'n') == 'y' - ram_base = u_boot_utils.find_ram_base(ubman) + ram_base = utils.find_ram_base(ubman) addr = '0x%08x' % ram_base # Select MMC device diff --git a/test/py/tests/test_mmc_wr.py b/test/py/tests/test_mmc_wr.py index 533bae04fd2..41a75f885e1 100644 --- a/test/py/tests/test_mmc_wr.py +++ b/test/py/tests/test_mmc_wr.py @@ -6,7 +6,7 @@ # to the eMMC or SD card, then reads it back and performs a comparison. import pytest -import u_boot_utils +import utils """ This test relies on boardenv_* to containing configuration values to define @@ -61,7 +61,7 @@ def test_mmc_wr(ubman, env__mmc_wr_config): count_bytes = count_sectors * 512 bcfg = ubman.config.buildconfig - ram_base = u_boot_utils.find_ram_base(ubman) + ram_base = utils.find_ram_base(ubman) src_addr = '0x%08x' % ram_base dst_addr = '0x%08x' % (ram_base + count_bytes) diff --git a/test/py/tests/test_net.py b/test/py/tests/test_net.py index 52ecf93d41d..9f4ee0c9ed4 100644 --- a/test/py/tests/test_net.py +++ b/test/py/tests/test_net.py @@ -5,7 +5,7 @@ # tftpboot commands. import pytest -import u_boot_utils +import utils import uuid import datetime import re @@ -315,7 +315,7 @@ def test_net_nfs(ubman): addr = f.get('addr', None) if not addr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) fn = f['fn'] output = ubman.run_command('nfs %x %s' % (addr, fn)) @@ -411,7 +411,7 @@ def test_net_tftpput(ubman): addr = f.get("addr", None) if not addr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) sz = f.get("size", None) timeout = f.get("timeout", ubman.p.timeout) diff --git a/test/py/tests/test_net_boot.py b/test/py/tests/test_net_boot.py index d0230808b9f..abf6dfbaf5e 100644 --- a/test/py/tests/test_net_boot.py +++ b/test/py/tests/test_net_boot.py @@ -2,7 +2,7 @@ # (C) Copyright 2023, Advanced Micro Devices, Inc. import pytest -import u_boot_utils +import utils import test_net import re @@ -130,7 +130,7 @@ def setup_tftpboot_boot(ubman): setup_networking(ubman) addr = f.get('addr', None) if not addr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) fn = f['fn'] timeout = f.get('timeout', 50000) diff --git a/test/py/tests/test_of_migrate.py b/test/py/tests/test_of_migrate.py index 3b0fa2f871b..1692cbc48bb 100644 --- a/test/py/tests/test_of_migrate.py +++ b/test/py/tests/test_of_migrate.py @@ -7,7 +7,7 @@ import os import pytest -import u_boot_utils as util +import utils as util # This is needed for Azure, since the default '..' directory is not writeable TMPDIR1 = '/tmp/test_no_migrate' diff --git a/test/py/tests/test_ofplatdata.py b/test/py/tests/test_ofplatdata.py index 47d6ddc8a50..d414a910d9f 100644 --- a/test/py/tests/test_ofplatdata.py +++ b/test/py/tests/test_ofplatdata.py @@ -2,7 +2,7 @@ # Copyright (c) 2016 Google, Inc import pytest -import u_boot_utils as util +import utils as util @pytest.mark.boardspec('sandbox_spl') @pytest.mark.buildconfigspec('spl_of_platdata') diff --git a/test/py/tests/test_optee_rpmb.py b/test/py/tests/test_optee_rpmb.py index d20d4bc4828..1ef1c117d27 100644 --- a/test/py/tests/test_optee_rpmb.py +++ b/test/py/tests/test_optee_rpmb.py @@ -7,7 +7,7 @@ This tests optee_rpmb cmd in U-Boot """ import pytest -import u_boot_utils as util +import utils as util @pytest.mark.buildconfigspec('cmd_optee_rpmb') def test_optee_rpmb_read_write(ubman): diff --git a/test/py/tests/test_pinmux.py b/test/py/tests/test_pinmux.py index f8c3f0dfbef..ee79e843341 100644 --- a/test/py/tests/test_pinmux.py +++ b/test/py/tests/test_pinmux.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: GPL-2.0 import pytest -import u_boot_utils +import utils @pytest.mark.buildconfigspec('cmd_pinmux') def test_pinmux_usage_1(ubman): diff --git a/test/py/tests/test_pstore.py b/test/py/tests/test_pstore.py index 69b13fb0e4b..70e07503ad3 100644 --- a/test/py/tests/test_pstore.py +++ b/test/py/tests/test_pstore.py @@ -3,7 +3,7 @@ # Author: Frédéric Danis import pytest -import u_boot_utils +import utils import os import tempfile import shutil diff --git a/test/py/tests/test_sandbox_opts.py b/test/py/tests/test_sandbox_opts.py index a6dc394e554..16901409172 100644 --- a/test/py/tests/test_sandbox_opts.py +++ b/test/py/tests/test_sandbox_opts.py @@ -4,7 +4,7 @@ import pytest -import u_boot_utils as util +import utils as util # This is needed for Azure, since the default '..' directory is not writeable TMPDIR = '/tmp/test_cmdline' diff --git a/test/py/tests/test_scp03.py b/test/py/tests/test_scp03.py index 799754ac54b..296d7c7d953 100644 --- a/test/py/tests/test_scp03.py +++ b/test/py/tests/test_scp03.py @@ -11,7 +11,7 @@ For additional details check doc/usage/scp03.rst """ import pytest -import u_boot_utils as util +import utils as util @pytest.mark.buildconfigspec('cmd_scp03') def test_scp03(ubman): diff --git a/test/py/tests/test_sf.py b/test/py/tests/test_sf.py index 9c3b8927c0f..5b4ba80f18b 100644 --- a/test/py/tests/test_sf.py +++ b/test/py/tests/test_sf.py @@ -5,7 +5,7 @@ import re import pytest import random -import u_boot_utils +import utils """ Note: This test relies on boardenv_* containing configuration values to define @@ -57,7 +57,7 @@ def sf_prepare(ubman, env__sf_config): """ sf_params = {} - sf_params['ram_base'] = u_boot_utils.find_ram_base(ubman) + sf_params['ram_base'] = utils.find_ram_base(ubman) probe_id = env__sf_config.get('id', 0) speed = env__sf_config.get('speed', 0) @@ -123,14 +123,14 @@ def sf_read(ubman, env__sf_config, sf_params): cmd = 'mw.b %08x %02x %x' % (addr, pattern, count) ubman.run_command(cmd) - crc_pattern = u_boot_utils.crc32(ubman, addr, count) + crc_pattern = utils.crc32(ubman, addr, count) if crc_expected: assert crc_pattern != crc_expected cmd = 'sf read %08x %08x %x' % (addr, offset, count) response = ubman.run_command(cmd) assert 'Read: OK' in response, 'Read operation failed' - crc_readback = u_boot_utils.crc32(ubman, addr, count) + crc_readback = utils.crc32(ubman, addr, count) assert crc_pattern != crc_readback, 'sf read did not update RAM content.' if crc_expected: assert crc_readback == crc_expected @@ -156,7 +156,7 @@ def sf_update(ubman, env__sf_config, sf_params): cmd = 'mw.b %08x %02x %x' % (addr, pattern, count) ubman.run_command(cmd) - crc_pattern = u_boot_utils.crc32(ubman, addr, count) + crc_pattern = utils.crc32(ubman, addr, count) cmd = 'sf update %08x %08x %x' % (addr, offset, count) ubman.run_command(cmd) @@ -201,7 +201,7 @@ def test_sf_erase(ubman, env__sf_config): cmd = 'mw.b %08x ff %x' % (addr, count) ubman.run_command(cmd) - crc_ffs = u_boot_utils.crc32(ubman, addr, count) + crc_ffs = utils.crc32(ubman, addr, count) crc_read = sf_read(ubman, env__sf_config, sf_params) assert crc_ffs == crc_read, 'Unexpected CRC32 after erase operation.' diff --git a/test/py/tests/test_source.py b/test/py/tests/test_source.py index 30160d06dc6..60013e438ba 100644 --- a/test/py/tests/test_source.py +++ b/test/py/tests/test_source.py @@ -3,7 +3,7 @@ import os import pytest -import u_boot_utils as util +import utils as util @pytest.mark.boardspec('sandbox') @pytest.mark.buildconfigspec('cmd_echo') diff --git a/test/py/tests/test_spi.py b/test/py/tests/test_spi.py index 5e61ef1162d..dd767528dbf 100644 --- a/test/py/tests/test_spi.py +++ b/test/py/tests/test_spi.py @@ -51,7 +51,7 @@ env__spi_lock_unlock = { import random import re import pytest -import u_boot_utils +import utils SPI_DATA = {} EXPECTED_ERASE = 'Erased: OK' @@ -198,7 +198,7 @@ def test_spi_erase_block(ubman): def spi_write_twice(ubman, page_size, erase_size, total_size, timeout): ''' Random write till page size, random till size and full size ''' - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) old_size = 0 for size in ( @@ -271,7 +271,7 @@ def test_spi_write_twice(ubman): def spi_write_continues(ubman, page_size, erase_size, total_size, timeout): ''' Write with random size of data to continue SPI write case ''' spi_erase_block(ubman, erase_size, total_size) - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) output = ubman.run_command(f'crc32 {hex(addr + 0x10000)} {hex(total_size)}') m = re.search('==> (.+?)$', output) @@ -327,7 +327,7 @@ def spi_read_twice(ubman, page_size, total_size, timeout): ''' Read the whole SPI flash twice, random_size till full flash size, random till page size ''' for size in random.randint(4, page_size), random.randint(4, total_size), total_size: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) size = size & ~3 with ubman.temporary_timeout(timeout): output = ubman.run_command( @@ -451,13 +451,13 @@ def protect_ops(ubman, lock_addr, lock_size, ops="unlock"): def erase_write_ops(ubman, start, size): ''' Basic erase and write operation for flash ''' - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) flash_ops(ubman, 'erase', start, size, 0, 0, EXPECTED_ERASE) flash_ops(ubman, 'write', start, size, addr, 0, EXPECTED_WRITE) def spi_lock_unlock(ubman, lock_addr, lock_size): ''' Lock unlock operations for SPI family flash ''' - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) erase_size = get_erase_size() # Find the protected/un-protected region @@ -612,7 +612,7 @@ def test_spi_negative(ubman): total_size = get_total_size() erase_size = get_erase_size() page_size = get_page_size() - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) i = 0 while i < loop: # Erase negative test diff --git a/test/py/tests/test_tpm2.py b/test/py/tests/test_tpm2.py index 47e0e7a1b6d..ade102a387e 100644 --- a/test/py/tests/test_tpm2.py +++ b/test/py/tests/test_tpm2.py @@ -4,7 +4,7 @@ import os.path import pytest -import u_boot_utils +import utils import re import time @@ -197,7 +197,7 @@ def test_tpm2_get_capability(ubman): tpm2_sandbox_init(ubman) force_init(ubman) - ram = u_boot_utils.find_ram_base(ubman) + ram = utils.find_ram_base(ubman) read_cap = ubman.run_command('tpm2 get_capability 0x6 0x20e 0x200 1') #0x%x 1' % ram) output = ubman.run_command('echo $?') @@ -220,7 +220,7 @@ def test_tpm2_dam_parameters(ubman): if is_sandbox(ubman): tpm2_sandbox_init(ubman) force_init(ubman) - ram = u_boot_utils.find_ram_base(ubman) + ram = utils.find_ram_base(ubman) # Set the DAM parameters to known values ubman.run_command('tpm2 dam_parameters 3 10 0') @@ -245,7 +245,7 @@ def test_tpm2_pcr_read(ubman): tpm2_sandbox_init(ubman) force_init(ubman) - ram = u_boot_utils.find_ram_base(ubman) + ram = utils.find_ram_base(ubman) read_pcr = ubman.run_command('tpm2 pcr_read 10 0x%x' % ram) output = ubman.run_command('echo $?') @@ -273,7 +273,7 @@ def test_tpm2_pcr_extend(ubman): if is_sandbox(ubman): tpm2_sandbox_init(ubman) force_init(ubman) - ram = u_boot_utils.find_ram_base(ubman) + ram = utils.find_ram_base(ubman) read_pcr = ubman.run_command('tpm2 pcr_read 10 0x%x' % (ram + 0x20)) output = ubman.run_command('echo $?') diff --git a/test/py/tests/test_trace.py b/test/py/tests/test_trace.py index d008f688571..6fa4229233b 100644 --- a/test/py/tests/test_trace.py +++ b/test/py/tests/test_trace.py @@ -6,7 +6,7 @@ import os import pytest import re -import u_boot_utils as util +import utils as util # This is needed for Azure, since the default '..' directory is not writeable TMPDIR = '/tmp/test_trace' diff --git a/test/py/tests/test_ums.py b/test/py/tests/test_ums.py index fa13a393fc6..caf6c0a7270 100644 --- a/test/py/tests/test_ums.py +++ b/test/py/tests/test_ums.py @@ -11,7 +11,7 @@ import os.path import pytest import re import time -import u_boot_utils +import utils """ Note: This test relies on: @@ -113,8 +113,7 @@ def test_ums(ubman, env__usb_dev_port, env__block_devs): mount_subdir = env__block_devs[0]['writable_fs_subdir'] part_num = env__block_devs[0]['writable_fs_partition'] host_ums_part_node = '%s-part%d' % (host_ums_dev_node, part_num) - test_f = u_boot_utils.PersistentRandomFile(ubman, 'ums.bin', - 1024 * 1024); + test_f = utils.PersistentRandomFile(ubman, 'ums.bin', 1024 * 1024); mounted_test_fn = mount_point + '/' + mount_subdir + test_f.fn else: host_ums_part_node = host_ums_dev_node @@ -136,7 +135,7 @@ def test_ums(ubman, env__usb_dev_port, env__block_devs): cmd = 'ums %s %s %s' % (tgt_usb_ctlr, tgt_dev_type, tgt_dev_id) ubman.run_command(cmd, wait_for_prompt=False) ubman.wait_for(re.compile('UMS: LUN.*[\r\n]')) - fh = u_boot_utils.wait_until_open_succeeds(host_ums_part_node) + fh = utils.wait_until_open_succeeds(host_ums_part_node) ubman.log.action('Reading raw data from UMS device') fh.read(4096) fh.close() @@ -153,7 +152,7 @@ def test_ums(ubman, env__usb_dev_port, env__block_devs): ubman.log.action('Mounting exported UMS device') cmd = ('/bin/mount', host_ums_part_node) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) def umount(ignore_errors): """Unmount the block device that U-Boot exports. @@ -170,7 +169,7 @@ def test_ums(ubman, env__usb_dev_port, env__block_devs): ubman.log.action('Unmounting UMS device') cmd = ('/bin/umount', host_ums_part_node) - u_boot_utils.run_and_log(ubman, cmd, ignore_errors) + utils.run_and_log(ubman, cmd, ignore_errors) def stop_ums(ignore_errors): """Stop U-Boot's ums shell command from executing. @@ -191,7 +190,7 @@ def test_ums(ubman, env__usb_dev_port, env__block_devs): ubman.log.action( 'Stopping long-running U-Boot ums shell command') ubman.ctrlc() - u_boot_utils.wait_until_file_open_fails(host_ums_part_node, + utils.wait_until_file_open_fails(host_ums_part_node, ignore_errors) ignore_cleanup_errors = True @@ -202,11 +201,11 @@ def test_ums(ubman, env__usb_dev_port, env__block_devs): mount() ubman.log.action('Writing test file via UMS') cmd = ('rm', '-f', mounted_test_fn) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) if os.path.exists(mounted_test_fn): raise Exception('Could not rm target UMS test file') cmd = ('cp', test_f.abs_fn, mounted_test_fn) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) ignore_cleanup_errors = False finally: umount(ignore_errors=ignore_cleanup_errors) @@ -219,9 +218,9 @@ def test_ums(ubman, env__usb_dev_port, env__block_devs): try: mount() ubman.log.action('Reading test file back via UMS') - read_back_hash = u_boot_utils.md5sum_file(mounted_test_fn) + read_back_hash = utils.md5sum_file(mounted_test_fn) cmd = ('rm', '-f', mounted_test_fn) - u_boot_utils.run_and_log(ubman, cmd) + utils.run_and_log(ubman, cmd) ignore_cleanup_errors = False finally: umount(ignore_errors=ignore_cleanup_errors) diff --git a/test/py/tests/test_upl.py b/test/py/tests/test_upl.py index ca06dffd0bf..c8eeaa024e5 100644 --- a/test/py/tests/test_upl.py +++ b/test/py/tests/test_upl.py @@ -6,7 +6,7 @@ import os import pytest -import u_boot_utils +import utils @pytest.mark.boardspec('sandbox_vpl') def test_upl_handoff(ubman): diff --git a/test/py/tests/test_usb.py b/test/py/tests/test_usb.py index 6247f9211df..1dcd0834f55 100644 --- a/test/py/tests/test_usb.py +++ b/test/py/tests/test_usb.py @@ -4,7 +4,7 @@ import pytest import random import re -import u_boot_utils +import utils """ Note: This test doesn't rely on boardenv_* configuration values but it can @@ -296,7 +296,7 @@ def test_usb_fatls_fatinfo(ubman): pytest.skip('No %s partition detected' % fs.upper()) def usb_fatload_fatwrite(ubman, fs, x, part): - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) size = random.randint(4, 1 * 1024 * 1024) output = ubman.run_command('crc32 %x %x' % (addr, size)) m = re.search('==> (.+?)', output) @@ -391,7 +391,7 @@ def test_usb_ext4ls(ubman): pytest.skip('No %s partition detected' % fs.upper()) def usb_ext4load_ext4write(ubman, fs, x, part): - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) size = random.randint(4, 1 * 1024 * 1024) output = ubman.run_command('crc32 %x %x' % (addr, size)) m = re.search('==> (.+?)', output) @@ -505,7 +505,7 @@ def test_usb_ext2load(ubman): part_detect = 1 file, size, expected_crc32 = \ usb_ext4load_ext4write(ubman, fs, x, part) - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) offset = random.randrange(128, 1024, 128) output = ubman.run_command( @@ -572,7 +572,7 @@ def test_usb_load(ubman): for part in partitions: part_detect = 1 - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) if fs == 'fat': file, size, expected_crc32 = \ @@ -618,7 +618,7 @@ def test_usb_save(ubman): for part in partitions: part_detect = 1 - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) size = random.randint(4, 1 * 1024 * 1024) file = '%s_%d' % ('uboot_test', size) diff --git a/test/py/tests/test_ut.py b/test/py/tests/test_ut.py index 3bcdc7ac954..d7cf95d461b 100644 --- a/test/py/tests/test_ut.py +++ b/test/py/tests/test_ut.py @@ -13,7 +13,7 @@ import os import os.path import pytest -import u_boot_utils +import utils # pylint: disable=E0611 from tests import fs_helper from test_android import test_abootimg @@ -52,8 +52,8 @@ def setup_image(cons, devnum, part_type, img_size=20, second_part=False, if second_part: spec += '\ntype=c' - u_boot_utils.run_and_log(cons, f'qemu-img create {fname} 20M') - u_boot_utils.run_and_log(cons, f'sfdisk {fname}', + utils.run_and_log(cons, f'qemu-img create {fname} 20M') + utils.run_and_log(cons, f'sfdisk {fname}', stdin=spec.encode('utf-8')) return fname, mnt @@ -149,12 +149,12 @@ booti ${kernel_addr_r} ${ramdisk_addr_r} ${fdt_addr_r} infname = os.path.join(cons.config.source_dir, 'test/py/tests/bootstd/armbian.bmp.xz') bmp_file = os.path.join(bootdir, 'boot.bmp') - u_boot_utils.run_and_log( + utils.run_and_log( cons, ['sh', '-c', f'xz -dc {infname} >{bmp_file}']) mkimage = cons.config.build_dir + '/tools/mkimage' - u_boot_utils.run_and_log( + utils.run_and_log( cons, f'{mkimage} -C none -A arm -T script -d {cmd_fname} {scr_fname}') kernel = 'vmlinuz-5.15.63-rockchip64' @@ -165,16 +165,16 @@ booti ${kernel_addr_r} ${ramdisk_addr_r} ${fdt_addr_r} symlink = os.path.join(bootdir, 'Image') if os.path.exists(symlink): os.remove(symlink) - u_boot_utils.run_and_log( + utils.run_and_log( cons, f'echo here {kernel} {symlink}') os.symlink(kernel, symlink) fsfile = 'ext18M.img' - u_boot_utils.run_and_log(cons, f'fallocate -l 18M {fsfile}') - u_boot_utils.run_and_log(cons, f'mkfs.ext4 {fsfile} -d {mnt}') - u_boot_utils.run_and_log(cons, f'dd if={fsfile} of={fname} bs=1M seek=1') - u_boot_utils.run_and_log(cons, f'rm -rf {mnt}') - u_boot_utils.run_and_log(cons, f'rm -f {fsfile}') + utils.run_and_log(cons, f'fallocate -l 18M {fsfile}') + utils.run_and_log(cons, f'mkfs.ext4 {fsfile} -d {mnt}') + utils.run_and_log(cons, f'dd if={fsfile} of={fname} bs=1M seek=1') + utils.run_and_log(cons, f'rm -rf {mnt}') + utils.run_and_log(cons, f'rm -f {fsfile}') def setup_bootflow_image(cons): """Create a 20MB disk image with a single FAT partition""" @@ -208,7 +208,7 @@ label Fedora-Workstation-armhfp-31-1.9 (5.3.7-301.fc31.armv7hl) with open(inf, 'wb') as fd: fd.write(gzip.compress(b'vmlinux')) mkimage = cons.config.build_dir + '/tools/mkimage' - u_boot_utils.run_and_log( + utils.run_and_log( cons, f'{mkimage} -f auto -d {inf} {os.path.join(mnt, vmlinux)}') with open(os.path.join(mnt, initrd), 'w', encoding='ascii') as fd: @@ -217,16 +217,16 @@ label Fedora-Workstation-armhfp-31-1.9 (5.3.7-301.fc31.armv7hl) mkdir_cond(os.path.join(mnt, dtbdir)) dtb_file = os.path.join(mnt, f'{dtbdir}/sandbox.dtb') - u_boot_utils.run_and_log( + utils.run_and_log( cons, f'dtc -o {dtb_file}', stdin=b'/dts-v1/; / {};') fsfile = 'vfat18M.img' - u_boot_utils.run_and_log(cons, f'fallocate -l 18M {fsfile}') - u_boot_utils.run_and_log(cons, f'mkfs.vfat {fsfile}') - u_boot_utils.run_and_log(cons, ['sh', '-c', f'mcopy -i {fsfile} {mnt}/* ::/']) - u_boot_utils.run_and_log(cons, f'dd if={fsfile} of={fname} bs=1M seek=1') - u_boot_utils.run_and_log(cons, f'rm -rf {mnt}') - u_boot_utils.run_and_log(cons, f'rm -f {fsfile}') + utils.run_and_log(cons, f'fallocate -l 18M {fsfile}') + utils.run_and_log(cons, f'mkfs.vfat {fsfile}') + utils.run_and_log(cons, ['sh', '-c', f'mcopy -i {fsfile} {mnt}/* ::/']) + utils.run_and_log(cons, f'dd if={fsfile} of={fname} bs=1M seek=1') + utils.run_and_log(cons, f'rm -rf {mnt}') + utils.run_and_log(cons, f'rm -f {fsfile}') def setup_cros_image(cons): """Create a 20MB disk image with ChromiumOS partitions""" @@ -248,7 +248,7 @@ def setup_cros_image(cons): """ kern_part = os.path.join(cons.config.result_dir, f'kern-part-{arch}.bin') - u_boot_utils.run_and_log( + utils.run_and_log( cons, f'futility vbutil_kernel --pack {kern_part} ' '--keyblock doc/chromium/files/devkeys/kernel.keyblock ' @@ -276,8 +276,8 @@ def setup_cros_image(cons): mmc_dev = 5 fname = os.path.join(cons.config.source_dir, f'mmc{mmc_dev}.img') - u_boot_utils.run_and_log(cons, f'qemu-img create {fname} 20M') - u_boot_utils.run_and_log(cons, f'cgpt create {fname}') + utils.run_and_log(cons, f'qemu-img create {fname} 20M') + utils.run_and_log(cons, f'cgpt create {fname}') uuid_state = 'ebd0a0a2-b9e5-4433-87c0-68b6b72699c7' uuid_kern = 'fe3a2a5d-4f32-41a7-b725-accc3285a309' @@ -316,13 +316,13 @@ def setup_cros_image(cons): size = int(size_str[:-1]) * sect_1mb else: size = int(size_str) - u_boot_utils.run_and_log( + utils.run_and_log( cons, f"cgpt add -i {part['num']} -b {ptr} -s {size} -t {part['type']} {fname}") ptr += size - u_boot_utils.run_and_log(cons, f'cgpt boot -p {fname}') - out = u_boot_utils.run_and_log(cons, f'cgpt show -q {fname}') + utils.run_and_log(cons, f'cgpt boot -p {fname}') + out = utils.run_and_log(cons, f'cgpt show -q {fname}') # We expect something like this: # 8239 2048 1 Basic data @@ -389,8 +389,8 @@ def setup_android_image(cons): mmc_dev = 7 fname = os.path.join(cons.config.source_dir, f'mmc{mmc_dev}.img') - u_boot_utils.run_and_log(cons, f'qemu-img create {fname} 20M') - u_boot_utils.run_and_log(cons, f'cgpt create {fname}') + utils.run_and_log(cons, f'qemu-img create {fname} 20M') + utils.run_and_log(cons, f'cgpt create {fname}') ptr = 40 @@ -412,13 +412,13 @@ def setup_android_image(cons): size = int(size_str[:-1]) * sect_1mb else: size = int(size_str) - u_boot_utils.run_and_log( + utils.run_and_log( cons, f"cgpt add -i {part['num']} -b {ptr} -s {size} -l {part['label']} -t basicdata {fname}") ptr += size - u_boot_utils.run_and_log(cons, f'cgpt boot -p {fname}') - out = u_boot_utils.run_and_log(cons, f'cgpt show -q {fname}') + utils.run_and_log(cons, f'cgpt boot -p {fname}') + out = utils.run_and_log(cons, f'cgpt show -q {fname}') # Create a dict (indexed by partition number) containing the above info for line in out.splitlines(): @@ -445,8 +445,8 @@ def setup_android_image(cons): mmc_dev = 8 fname = os.path.join(cons.config.source_dir, f'mmc{mmc_dev}.img') - u_boot_utils.run_and_log(cons, f'qemu-img create {fname} 20M') - u_boot_utils.run_and_log(cons, f'cgpt create {fname}') + utils.run_and_log(cons, f'qemu-img create {fname} 20M') + utils.run_and_log(cons, f'cgpt create {fname}') ptr = 40 @@ -466,13 +466,13 @@ def setup_android_image(cons): size = int(size_str[:-1]) * sect_1mb else: size = int(size_str) - u_boot_utils.run_and_log( + utils.run_and_log( cons, f"cgpt add -i {part['num']} -b {ptr} -s {size} -l {part['label']} -t basicdata {fname}") ptr += size - u_boot_utils.run_and_log(cons, f'cgpt boot -p {fname}') - out = u_boot_utils.run_and_log(cons, f'cgpt show -q {fname}') + utils.run_and_log(cons, f'cgpt boot -p {fname}') + out = utils.run_and_log(cons, f'cgpt show -q {fname}') # Create a dict (indexed by partition number) containing the above info for line in out.splitlines(): @@ -502,7 +502,7 @@ def setup_cedit_file(cons): 'test/boot/files/expo_ids.h') expo_tool = os.path.join(cons.config.source_dir, 'tools/expo.py') outfname = 'cedit.dtb' - u_boot_utils.run_and_log( + utils.run_and_log( cons, f'{expo_tool} -e {inhname} -l {infname} -o {outfname}') @pytest.mark.buildconfigspec('ut_dm') @@ -528,7 +528,7 @@ def test_ut_dm_init(ubman): data = b'\x00' * (2 * 1024 * 1024) with open(fn, 'wb') as fh: fh.write(data) - u_boot_utils.run_and_log( + utils.run_and_log( ubman, f'sfdisk {fn}', stdin=b'type=83') fs_helper.mk_fs(ubman.config, 'ext2', 0x200000, '2MB', None) @@ -559,12 +559,12 @@ def setup_efi_image(cons): with open(efi_dst, 'wb') as outf: outf.write(inf.read()) fsfile = 'vfat18M.img' - u_boot_utils.run_and_log(cons, f'fallocate -l 18M {fsfile}') - u_boot_utils.run_and_log(cons, f'mkfs.vfat {fsfile}') - u_boot_utils.run_and_log(cons, ['sh', '-c', f'mcopy -vs -i {fsfile} {mnt}/* ::/']) - u_boot_utils.run_and_log(cons, f'dd if={fsfile} of={fname} bs=1M seek=1') - u_boot_utils.run_and_log(cons, f'rm -rf {mnt}') - u_boot_utils.run_and_log(cons, f'rm -f {fsfile}') + utils.run_and_log(cons, f'fallocate -l 18M {fsfile}') + utils.run_and_log(cons, f'mkfs.vfat {fsfile}') + utils.run_and_log(cons, ['sh', '-c', f'mcopy -vs -i {fsfile} {mnt}/* ::/']) + utils.run_and_log(cons, f'dd if={fsfile} of={fname} bs=1M seek=1') + utils.run_and_log(cons, f'rm -rf {mnt}') + utils.run_and_log(cons, f'rm -f {fsfile}') @pytest.mark.buildconfigspec('cmd_bootflow') @pytest.mark.buildconfigspec('sandbox') diff --git a/test/py/tests/test_vbe_vpl.py b/test/py/tests/test_vbe_vpl.py index 11389176335..317a324281e 100644 --- a/test/py/tests/test_vbe_vpl.py +++ b/test/py/tests/test_vbe_vpl.py @@ -6,7 +6,7 @@ import os import pytest -import u_boot_utils +import utils @pytest.mark.boardspec('sandbox_vpl') @pytest.mark.requiredtool('dtc') @@ -19,13 +19,13 @@ def test_vbe_vpl(ubman): # Enable firmware1 and the mmc that it uses. These are needed for the full # VBE flow. - u_boot_utils.run_and_log( + utils.run_and_log( cons, f'fdtput -t s {fdt} /bootstd/firmware0 status disabled') - u_boot_utils.run_and_log( + utils.run_and_log( cons, f'fdtput -t s {fdt} /bootstd/firmware1 status okay') - u_boot_utils.run_and_log( + utils.run_and_log( cons, f'fdtput -t s {fdt} /mmc3 status okay') - u_boot_utils.run_and_log( + utils.run_and_log( cons, f'fdtput -t s {fdt} /mmc3 filename {image_fname}') # Remove any existing RAM file, so we don't have old data present diff --git a/test/py/tests/test_vboot.py b/test/py/tests/test_vboot.py index 724833dc976..691c6e6b839 100644 --- a/test/py/tests/test_vboot.py +++ b/test/py/tests/test_vboot.py @@ -42,7 +42,7 @@ import os import shutil import struct import pytest -import u_boot_utils as util +import utils as util import vboot_forge import vboot_evil diff --git a/test/py/tests/test_zynq_secure.py b/test/py/tests/test_zynq_secure.py index 0261d62a307..f066a03b182 100644 --- a/test/py/tests/test_zynq_secure.py +++ b/test/py/tests/test_zynq_secure.py @@ -3,7 +3,7 @@ import pytest import re -import u_boot_utils +import utils import test_net """ @@ -68,7 +68,7 @@ def test_zynq_aes_image(ubman): srcaddr = f.get('srcaddr', None) if not srcaddr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) expected_tftp = 'Bytes transferred = ' fn = f['fn'] @@ -96,7 +96,7 @@ def test_zynq_aes_bitstream(ubman): srcaddr = f.get('srcaddr', None) if not srcaddr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) expected_tftp = 'Bytes transferred = ' fn = f['fnbit'] @@ -124,7 +124,7 @@ def test_zynq_aes_partial_bitstream(ubman): srcaddr = f.get('srcaddr', None) if not srcaddr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) expected_tftp = 'Bytes transferred = ' fn = f['fnpbit'] @@ -150,7 +150,7 @@ def test_zynq_rsa_image(ubman): srcaddr = f.get('srcaddr', None) if not srcaddr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) expected_tftp = 'Bytes transferred = ' fn = f['fn'] @@ -176,7 +176,7 @@ def test_zynq_rsa_image_invalid(ubman): srcaddr = f.get('srcaddr', None) if not srcaddr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) expected_tftp = 'Bytes transferred = ' fninvalid = f['fninvalid'] diff --git a/test/py/tests/test_zynqmp_secure.py b/test/py/tests/test_zynqmp_secure.py index 7549e2cc39f..c057e36383f 100644 --- a/test/py/tests/test_zynqmp_secure.py +++ b/test/py/tests/test_zynqmp_secure.py @@ -3,7 +3,7 @@ import pytest import re -import u_boot_utils +import utils import test_net """ @@ -45,7 +45,7 @@ def test_zynqmp_secure_boot_image(ubman): addr = f.get('addr', None) if not addr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) expected_tftp = 'Bytes transferred = ' fn = f['fn'] @@ -78,7 +78,7 @@ def test_zynqmp_secure_boot_img_kup(ubman): keyaddr = f.get('keyaddr', None) if not keyaddr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) expected_tftp = 'Bytes transferred = ' keyfn = f['keyfn'] output = ubman.run_command('tftpboot %x %s' % (keyaddr, keyfn)) @@ -86,7 +86,7 @@ def test_zynqmp_secure_boot_img_kup(ubman): addr = f.get('addr', None) if not addr: - addr = u_boot_utils.find_ram_base(ubman) + addr = utils.find_ram_base(ubman) expected_tftp = 'Bytes transferred = ' fn = f['enckupfn'] output = ubman.run_command('tftpboot %x %s' % (addr, fn)) diff --git a/test/py/u_boot_console_base.py b/test/py/u_boot_console_base.py deleted file mode 100644 index 7eaceb39d9d..00000000000 --- a/test/py/u_boot_console_base.py +++ /dev/null @@ -1,613 +0,0 @@ -# SPDX-License-Identifier: GPL-2.0 -# Copyright (c) 2015 Stephen Warren -# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. - -# Common logic to interact with U-Boot via the console. This class provides -# the interface that tests use to execute U-Boot shell commands and wait for -# their results. Sub-classes exist to perform board-type-specific setup -# operations, such as spawning a sub-process for Sandbox, or attaching to the -# serial console of real hardware. - -import multiplexed_log -import os -import pytest -import re -import sys -import u_boot_spawn -from u_boot_spawn import BootFail, Timeout, Unexpected, handle_exception - -# Regexes for text we expect U-Boot to send to the console. -pattern_u_boot_spl_signon = re.compile('(U-Boot SPL \\d{4}\\.\\d{2}[^\r\n]*\\))') -pattern_u_boot_main_signon = re.compile('(U-Boot \\d{4}\\.\\d{2}[^\r\n]*\\))') -pattern_stop_autoboot_prompt = re.compile('Hit any key to stop autoboot: ') -pattern_unknown_command = re.compile('Unknown command \'.*\' - try \'help\'') -pattern_error_notification = re.compile('## Error: ') -pattern_error_please_reset = re.compile('### ERROR ### Please RESET the board ###') -pattern_ready_prompt = re.compile('{lab ready in (.*)s: (.*)}') -pattern_lab_mode = re.compile('{lab mode.*}') - -PAT_ID = 0 -PAT_RE = 1 - -# Timeout before expecting the console to be ready (in milliseconds) -TIMEOUT_MS = 30000 # Standard timeout -TIMEOUT_CMD_MS = 10000 # Command-echo timeout - -# Timeout for board preparation in lab mode. This needs to be enough to build -# U-Boot, write it to the board and then boot the board. Since this process is -# under the control of another program (e.g. Labgrid), it will failure sooner -# if something goes way. So use a very long timeout here to cover all possible -# situations. -TIMEOUT_PREPARE_MS = 3 * 60 * 1000 - -bad_pattern_defs = ( - ('spl_signon', pattern_u_boot_spl_signon), - ('main_signon', pattern_u_boot_main_signon), - ('stop_autoboot_prompt', pattern_stop_autoboot_prompt), - ('unknown_command', pattern_unknown_command), - ('error_notification', pattern_error_notification), - ('error_please_reset', pattern_error_please_reset), -) - -class ConsoleDisableCheck(object): - """Context manager (for Python's with statement) that temporarily disables - the specified console output error check. This is useful when deliberately - executing a command that is known to trigger one of the error checks, in - order to test that the error condition is actually raised. This class is - used internally by ConsoleBase::disable_check(); it is not intended for - direct usage.""" - - def __init__(self, console, check_type): - self.console = console - self.check_type = check_type - - def __enter__(self): - self.console.disable_check_count[self.check_type] += 1 - self.console.eval_bad_patterns() - - def __exit__(self, extype, value, traceback): - self.console.disable_check_count[self.check_type] -= 1 - self.console.eval_bad_patterns() - -class ConsoleEnableCheck(object): - """Context manager (for Python's with statement) that temporarily enables - the specified console output error check. This is useful when executing a - command that might raise an extra bad pattern, beyond the default bad - patterns, in order to validate that the extra bad pattern is actually - detected. This class is used internally by ConsoleBase::enable_check(); it - is not intended for direct usage.""" - - def __init__(self, console, check_type, check_pattern): - self.console = console - self.check_type = check_type - self.check_pattern = check_pattern - - def __enter__(self): - global bad_pattern_defs - self.default_bad_patterns = bad_pattern_defs - bad_pattern_defs += ((self.check_type, self.check_pattern),) - self.console.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs} - self.console.eval_bad_patterns() - - def __exit__(self, extype, value, traceback): - global bad_pattern_defs - bad_pattern_defs = self.default_bad_patterns - self.console.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs} - self.console.eval_bad_patterns() - -class ConsoleSetupTimeout(object): - """Context manager (for Python's with statement) that temporarily sets up - timeout for specific command. This is useful when execution time is greater - then default 30s.""" - - def __init__(self, console, timeout): - self.p = console.p - self.orig_timeout = self.p.timeout - self.p.timeout = timeout - - def __enter__(self): - return self - - def __exit__(self, extype, value, traceback): - self.p.timeout = self.orig_timeout - -class ConsoleBase(object): - """The interface through which test functions interact with the U-Boot - console. This primarily involves executing shell commands, capturing their - results, and checking for common error conditions. Some common utilities - are also provided too.""" - - def __init__(self, log, config, max_fifo_fill): - """Initialize a U-Boot console connection. - - Can only usefully be called by sub-classes. - - Args: - log: A multiplexed_log.Logfile object, to which the U-Boot output - will be logged. - config: A configuration data structure, as built by conftest.py. - max_fifo_fill: The maximum number of characters to send to U-Boot - command-line before waiting for U-Boot to echo the characters - back. For UART-based HW without HW flow control, this value - should be set less than the UART RX FIFO size to avoid - overflow, assuming that U-Boot can't keep up with full-rate - traffic at the baud rate. - - Returns: - Nothing. - """ - - self.log = log - self.config = config - self.max_fifo_fill = max_fifo_fill - - self.logstream = self.log.get_stream('console', sys.stdout) - - # Array slice removes leading/trailing quotes - self.prompt = self.config.buildconfig['config_sys_prompt'][1:-1] - self.prompt_compiled = re.compile('^' + re.escape(self.prompt), re.MULTILINE) - self.p = None - self.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs} - self.eval_bad_patterns() - - self.at_prompt = False - self.at_prompt_logevt = None - self.lab_mode = False - - def get_spawn(self): - # This is not called, ssubclass must define this. - # Return a value to avoid: - # u_boot_console_base.py:348:12: E1128: Assigning result of a function - # call, where the function returns None (assignment-from-none) - return u_boot_spawn.Spawn([]) - - - def eval_bad_patterns(self): - self.bad_patterns = [pat[PAT_RE] for pat in bad_pattern_defs \ - if self.disable_check_count[pat[PAT_ID]] == 0] - self.bad_pattern_ids = [pat[PAT_ID] for pat in bad_pattern_defs \ - if self.disable_check_count[pat[PAT_ID]] == 0] - - def close(self): - """Terminate the connection to the U-Boot console. - - This function is only useful once all interaction with U-Boot is - complete. Once this function is called, data cannot be sent to or - received from U-Boot. - - Args: - None. - - Returns: - Nothing. - """ - - if self.p: - self.log.start_section('Stopping U-Boot') - close_type = self.p.close() - self.log.info(f'Close type: {close_type}') - self.log.end_section('Stopping U-Boot') - self.logstream.close() - - def set_lab_mode(self): - """Select lab mode - - This tells us that we will get a 'lab ready' message when the board is - ready for use. We don't need to look for signon messages. - """ - self.log.info(f'test.py: Lab mode is active') - self.p.timeout = TIMEOUT_PREPARE_MS - self.lab_mode = True - - def wait_for_boot_prompt(self, loop_num = 1): - """Wait for the boot up until command prompt. This is for internal use only. - """ - try: - self.log.info('Waiting for U-Boot to be ready') - bcfg = self.config.buildconfig - config_spl_serial = bcfg.get('config_spl_serial', 'n') == 'y' - env_spl_skipped = self.config.env.get('env__spl_skipped', False) - env_spl_banner_times = self.config.env.get('env__spl_banner_times', 1) - - while not self.lab_mode and loop_num > 0: - loop_num -= 1 - while config_spl_serial and not env_spl_skipped and env_spl_banner_times > 0: - m = self.p.expect([pattern_u_boot_spl_signon, - pattern_lab_mode] + self.bad_patterns) - if m == 1: - self.set_lab_mode() - break - elif m != 0: - raise BootFail('Bad pattern found on SPL console: ' + - self.bad_pattern_ids[m - 1]) - env_spl_banner_times -= 1 - - if not self.lab_mode: - m = self.p.expect([pattern_u_boot_main_signon, - pattern_lab_mode] + self.bad_patterns) - if m == 1: - self.set_lab_mode() - elif m != 0: - raise BootFail('Bad pattern found on console: ' + - self.bad_pattern_ids[m - 1]) - if not self.lab_mode: - self.u_boot_version_string = self.p.after - while True: - m = self.p.expect([self.prompt_compiled, pattern_ready_prompt, - pattern_stop_autoboot_prompt] + self.bad_patterns) - if m == 0: - self.log.info(f'Found ready prompt {m}') - break - elif m == 1: - m = pattern_ready_prompt.search(self.p.after) - self.u_boot_version_string = m.group(2) - self.log.info(f'Lab: Board is ready') - self.p.timeout = TIMEOUT_MS - break - if m == 2: - self.log.info(f'Found autoboot prompt {m}') - self.p.send(' ') - continue - if not self.lab_mode: - raise BootFail('Missing prompt / ready message on console: ' + - self.bad_pattern_ids[m - 3]) - self.log.info(f'U-Boot is ready') - - finally: - self.log.timestamp() - - def run_command(self, cmd, wait_for_echo=True, send_nl=True, - wait_for_prompt=True, wait_for_reboot=False): - """Execute a command via the U-Boot console. - - The command is always sent to U-Boot. - - U-Boot echoes any command back to its output, and this function - typically waits for that to occur. The wait can be disabled by setting - wait_for_echo=False, which is useful e.g. when sending CTRL-C to - interrupt a long-running command such as "ums". - - Command execution is typically triggered by sending a newline - character. This can be disabled by setting send_nl=False, which is - also useful when sending CTRL-C. - - This function typically waits for the command to finish executing, and - returns the console output that it generated. This can be disabled by - setting wait_for_prompt=False, which is useful when invoking a long- - running command such as "ums". - - Args: - cmd: The command to send. - wait_for_echo: Boolean indicating whether to wait for U-Boot to - echo the command text back to its output. - send_nl: Boolean indicating whether to send a newline character - after the command string. - wait_for_prompt: Boolean indicating whether to wait for the - command prompt to be sent by U-Boot. This typically occurs - immediately after the command has been executed. - wait_for_reboot: Boolean indication whether to wait for the - reboot U-Boot. If this sets True, wait_for_prompt must also - be True. - - Returns: - If wait_for_prompt == False: - Nothing. - Else: - The output from U-Boot during command execution. In other - words, the text U-Boot emitted between the point it echod the - command string and emitted the subsequent command prompts. - """ - - if self.at_prompt and \ - self.at_prompt_logevt != self.logstream.logfile.cur_evt: - self.logstream.write(self.prompt, implicit=True) - - try: - self.at_prompt = False - if not self.p: - raise BootFail( - f"Lab failure: Connection lost when sending command '{cmd}'") - - if send_nl: - cmd += '\n' - rem = cmd # Remaining to be sent - with self.temporary_timeout(TIMEOUT_CMD_MS): - while rem: - # Limit max outstanding data, so UART FIFOs don't overflow - chunk = rem[:self.max_fifo_fill] - rem = rem[self.max_fifo_fill:] - self.p.send(chunk) - if not wait_for_echo: - continue - chunk = re.escape(chunk) - chunk = chunk.replace('\\\n', '[\r\n]') - m = self.p.expect([chunk] + self.bad_patterns) - if m != 0: - self.at_prompt = False - raise BootFail(f"Failed to get echo on console (cmd '{cmd}':rem '{rem}'): " + - self.bad_pattern_ids[m - 1]) - if not wait_for_prompt: - return - if wait_for_reboot: - self.wait_for_boot_prompt() - else: - m = self.p.expect([self.prompt_compiled] + self.bad_patterns) - if m != 0: - self.at_prompt = False - raise BootFail('Missing prompt on console: ' + - self.bad_pattern_ids[m - 1]) - self.at_prompt = True - self.at_prompt_logevt = self.logstream.logfile.cur_evt - # Only strip \r\n; space/TAB might be significant if testing - # indentation. - return self.p.before.strip('\r\n') - except Timeout as exc: - handle_exception(self.config, self, self.log, exc, - f"Lab failure: Timeout executing '{cmd}'", True) - raise - except BootFail as exc: - handle_exception(self.config, self, self.log, exc, - f"'Boot fail '{cmd}'", - True, self.get_spawn_output()) - raise - finally: - self.log.timestamp() - - def run_command_list(self, cmds): - """Run a list of commands. - - This is a helper function to call run_command() with default arguments - for each command in a list. - - Args: - cmd: List of commands (each a string). - Returns: - A list of output strings from each command, one element for each - command. - """ - output = [] - for cmd in cmds: - output.append(self.run_command(cmd)) - return output - - def ctrlc(self): - """Send a CTRL-C character to U-Boot. - - This is useful in order to stop execution of long-running synchronous - commands such as "ums". - - Args: - None. - - Returns: - Nothing. - """ - - self.log.action('Sending Ctrl-C') - self.run_command(chr(3), wait_for_echo=False, send_nl=False) - - def wait_for(self, text): - """Wait for a pattern to be emitted by U-Boot. - - This is useful when a long-running command such as "dfu" is executing, - and it periodically emits some text that should show up at a specific - location in the log file. - - Args: - text: The text to wait for; either a string (containing raw text, - not a regular expression) or an re object. - - Returns: - Nothing. - """ - - if type(text) == type(''): - text = re.escape(text) - m = self.p.expect([text] + self.bad_patterns) - if m != 0: - raise Unexpected( - "Unexpected pattern found on console (exp '{text}': " + - self.bad_pattern_ids[m - 1]) - - def drain_console(self): - """Read from and log the U-Boot console for a short time. - - U-Boot's console output is only logged when the test code actively - waits for U-Boot to emit specific data. There are cases where tests - can fail without doing this. For example, if a test asks U-Boot to - enable USB device mode, then polls until a host-side device node - exists. In such a case, it is useful to log U-Boot's console output - in case U-Boot printed clues as to why the host-side even did not - occur. This function will do that. - - Args: - None. - - Returns: - Nothing. - """ - - # If we are already not connected to U-Boot, there's nothing to drain. - # This should only happen when a previous call to run_command() or - # wait_for() failed (and hence the output has already been logged), or - # the system is shutting down. - if not self.p: - return - - orig_timeout = self.p.timeout - try: - # Drain the log for a relatively short time. - self.p.timeout = 1000 - # Wait for something U-Boot will likely never send. This will - # cause the console output to be read and logged. - self.p.expect(['This should never match U-Boot output']) - except: - # We expect a timeout, since U-Boot won't print what we waited - # for. Squash it when it happens. - # - # Squash any other exception too. This function is only used to - # drain (and log) the U-Boot console output after a failed test. - # The U-Boot process will be restarted, or target board reset, once - # this function returns. So, we don't care about detecting any - # additional errors, so they're squashed so that the rest of the - # post-test-failure cleanup code can continue operation, and - # correctly terminate any log sections, etc. - pass - finally: - self.p.timeout = orig_timeout - - def ensure_spawned(self, expect_reset=False): - """Ensure a connection to a correctly running U-Boot instance. - - This may require spawning a new Sandbox process or resetting target - hardware, as defined by the implementation sub-class. - - This is an internal function and should not be called directly. - - Args: - expect_reset: Boolean indication whether this boot is expected - to be reset while the 1st boot process after main boot before - prompt. False by default. - - Returns: - Nothing. - """ - - if self.p: - # Reset the console timeout value as some tests may change - # its default value during the execution - if not self.config.gdbserver: - self.p.timeout = TIMEOUT_MS - return - try: - self.log.start_section('Starting U-Boot') - self.at_prompt = False - self.p = self.get_spawn() - # Real targets can take a long time to scroll large amounts of - # text if LCD is enabled. This value may need tweaking in the - # future, possibly per-test to be optimal. This works for 'help' - # on board 'seaboard'. - if not self.config.gdbserver: - self.p.timeout = TIMEOUT_MS - self.p.logfile_read = self.logstream - if self.config.use_running_system: - # Send an empty command to set up the 'expect' logic. This has - # the side effect of ensuring that there was no partial command - # line entered - self.run_command(' ') - else: - if expect_reset: - loop_num = 2 - else: - loop_num = 1 - self.wait_for_boot_prompt(loop_num = loop_num) - self.at_prompt = True - self.at_prompt_logevt = self.logstream.logfile.cur_evt - except Exception as ex: - self.log.error(str(ex)) - self.cleanup_spawn() - raise - finally: - self.log.timestamp() - self.log.end_section('Starting U-Boot') - - def cleanup_spawn(self): - """Shut down all interaction with the U-Boot instance. - - This is used when an error is detected prior to re-establishing a - connection with a fresh U-Boot instance. - - This is an internal function and should not be called directly. - - Args: - None. - - Returns: - Nothing. - """ - - try: - if self.p: - self.p.close() - except: - pass - self.p = None - - def restart_uboot(self, expect_reset=False): - """Shut down and restart U-Boot.""" - self.cleanup_spawn() - self.ensure_spawned(expect_reset) - - def get_spawn_output(self): - """Return the start-up output from U-Boot - - Returns: - The output produced by ensure_spawed(), as a string. - """ - if self.p: - return self.p.get_expect_output() - return None - - def validate_version_string_in_text(self, text): - """Assert that a command's output includes the U-Boot signon message. - - This is primarily useful for validating the "version" command without - duplicating the signon text regex in a test function. - - Args: - text: The command output text to check. - - Returns: - Nothing. An exception is raised if the validation fails. - """ - - assert(self.u_boot_version_string in text) - - def disable_check(self, check_type): - """Temporarily disable an error check of U-Boot's output. - - Create a new context manager (for use with the "with" statement) which - temporarily disables a particular console output error check. - - Args: - check_type: The type of error-check to disable. Valid values may - be found in self.disable_check_count above. - - Returns: - A context manager object. - """ - - return ConsoleDisableCheck(self, check_type) - - def enable_check(self, check_type, check_pattern): - """Temporarily enable an error check of U-Boot's output. - - Create a new context manager (for use with the "with" statement) which - temporarily enables a particular console output error check. The - arguments form a new element of bad_pattern_defs defined above. - - Args: - check_type: The type of error-check or bad pattern to enable. - check_pattern: The regexes for text error pattern or bad pattern - to be checked. - - Returns: - A context manager object. - """ - - return ConsoleEnableCheck(self, check_type, check_pattern) - - def temporary_timeout(self, timeout): - """Temporarily set up different timeout for commands. - - Create a new context manager (for use with the "with" statement) which - temporarily change timeout. - - Args: - timeout: Time in milliseconds. - - Returns: - A context manager object. - """ - - return ConsoleSetupTimeout(self, timeout) diff --git a/test/py/u_boot_console_exec_attach.py b/test/py/u_boot_console_exec_attach.py deleted file mode 100644 index 8b253b4451d..00000000000 --- a/test/py/u_boot_console_exec_attach.py +++ /dev/null @@ -1,85 +0,0 @@ -# SPDX-License-Identifier: GPL-2.0 -# Copyright (c) 2015 Stephen Warren -# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. - -""" -Logic to interact with U-Boot running on real hardware, typically via a -physical serial port. -""" - -import sys -from u_boot_spawn import Spawn -from u_boot_console_base import ConsoleBase - -class ConsoleExecAttach(ConsoleBase): - """Represents a physical connection to a U-Boot console, typically via a - serial port. This implementation executes a sub-process to attach to the - console, expecting that the stdin/out of the sub-process will be forwarded - to/from the physical hardware. This approach isolates the test infra- - structure from the user-/installation-specific details of how to - communicate with, and the identity of, serial ports etc.""" - - def __init__(self, log, config): - """Initialize a U-Boot console connection. - - Args: - log: A multiplexed_log.Logfile instance. - config: A "configuration" object as defined in conftest.py. - - Returns: - Nothing. - """ - - # The max_fifo_fill value might need tweaking per-board/-SoC? - # 1 would be safe anywhere, but is very slow (a pexpect issue?). - # 16 is a common FIFO size. - # HW flow control would mean this could be infinite. - super(ConsoleExecAttach, self).__init__(log, config, max_fifo_fill=16) - - with self.log.section('flash'): - self.log.action('Flashing U-Boot') - cmd = ['u-boot-test-flash', config.board_type, config.board_identity] - runner = self.log.get_runner(cmd[0], sys.stdout) - runner.run(cmd) - runner.close() - self.log.status_pass('OK') - - def get_spawn(self): - """Connect to a fresh U-Boot instance. - - The target board is reset, so that U-Boot begins running from scratch. - - Args: - None. - - Returns: - A u_boot_spawn.Spawn object that is attached to U-Boot. - """ - - args = [self.config.board_type, self.config.board_identity] - s = Spawn(['u-boot-test-console'] + args) - - if self.config.use_running_system: - self.log.action('Connecting to board without reset') - else: - try: - self.log.action('Resetting board') - cmd = ['u-boot-test-reset'] + args - runner = self.log.get_runner(cmd[0], sys.stdout) - runner.run(cmd) - runner.close() - except: - s.close() - raise - - return s - - def close(self): - super().close() - - self.log.action('Releasing board') - args = [self.config.board_type, self.config.board_identity] - cmd = ['u-boot-test-release'] + args - runner = self.log.get_runner(cmd[0], sys.stdout) - runner.run(cmd) - runner.close() diff --git a/test/py/u_boot_console_sandbox.py b/test/py/u_boot_console_sandbox.py deleted file mode 100644 index 7bc44c78b8b..00000000000 --- a/test/py/u_boot_console_sandbox.py +++ /dev/null @@ -1,119 +0,0 @@ -# SPDX-License-Identifier: GPL-2.0 -# Copyright (c) 2015 Stephen Warren -# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. - -""" -Logic to interact with the sandbox port of U-Boot, running as a sub-process. -""" - -import time -from u_boot_spawn import Spawn -from u_boot_console_base import ConsoleBase - -class ConsoleSandbox(ConsoleBase): - """Represents a connection to a sandbox U-Boot console, executed as a sub- - process.""" - - def __init__(self, log, config): - """Initialize a U-Boot console connection. - - Args: - log: A multiplexed_log.Logfile instance. - config: A "configuration" object as defined in conftest.py. - - Returns: - Nothing. - """ - - super(ConsoleSandbox, self).__init__(log, config, max_fifo_fill=1024) - self.sandbox_flags = [] - self.use_dtb = True - - def get_spawn(self): - """Connect to a fresh U-Boot instance. - - A new sandbox process is created, so that U-Boot begins running from - scratch. - - Args: - None. - - Returns: - A u_boot_spawn.Spawn object that is attached to U-Boot. - """ - - bcfg = self.config.buildconfig - config_spl = bcfg.get('config_spl', 'n') == 'y' - config_vpl = bcfg.get('config_vpl', 'n') == 'y' - if config_vpl: - # Run TPL first, which runs VPL - fname = '/tpl/u-boot-tpl' - else: - fname = '/spl/u-boot-spl' if config_spl else '/u-boot' - print(fname) - cmd = [] - if self.config.gdbserver: - cmd += ['gdbserver', self.config.gdbserver] - cmd += [self.config.build_dir + fname, '-v'] - if self.use_dtb: - cmd += ['-d', self.config.dtb] - cmd += self.sandbox_flags - return Spawn(cmd, cwd=self.config.source_dir, decode_signal=True) - - def restart_uboot_with_flags(self, flags, expect_reset=False, use_dtb=True): - """Run U-Boot with the given command-line flags - - Args: - flags: List of flags to pass, each a string - expect_reset: Boolean indication whether this boot is expected - to be reset while the 1st boot process after main boot before - prompt. False by default. - use_dtb: True to use a device tree file, False to run without one - - Returns: - A u_boot_spawn.Spawn object that is attached to U-Boot. - """ - - try: - self.sandbox_flags = flags - self.use_dtb = use_dtb - return self.restart_uboot(expect_reset) - finally: - self.sandbox_flags = [] - self.use_dtb = True - - def kill(self, sig): - """Send a specific Unix signal to the sandbox process. - - Args: - sig: The Unix signal to send to the process. - - Returns: - Nothing. - """ - - self.log.action('kill %d' % sig) - self.p.kill(sig) - - def validate_exited(self): - """Determine whether the sandbox process has exited. - - If required, this function waits a reasonable time for the process to - exit. - - Args: - None. - - Returns: - Boolean indicating whether the process has exited. - """ - - p = self.p - self.p = None - for i in range(100): - ret = not p.isalive() - if ret: - break - time.sleep(0.1) - p.close() - return ret diff --git a/test/py/u_boot_spawn.py b/test/py/u_boot_spawn.py deleted file mode 100644 index c703454389d..00000000000 --- a/test/py/u_boot_spawn.py +++ /dev/null @@ -1,345 +0,0 @@ -# SPDX-License-Identifier: GPL-2.0 -# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. - -""" -Logic to spawn a sub-process and interact with its stdio. -""" - -import io -import os -import re -import pty -import pytest -import signal -import select -import sys -import termios -import time -import traceback - -# Character to send (twice) to exit the terminal -EXIT_CHAR = 0x1d # FS (Ctrl + ]) - -class Timeout(Exception): - """An exception sub-class that indicates that a timeout occurred.""" - -class BootFail(Exception): - """An exception sub-class that indicates that a boot failure occurred. - - This is used when a bad pattern is seen when waiting for the boot prompt. - It is regarded as fatal, to avoid trying to boot the again and again to no - avail. - """ - -class Unexpected(Exception): - """An exception sub-class that indicates that unexpected test was seen.""" - - -def handle_exception(ubconfig, console, log, err, name, fatal, output=''): - """Handle an exception from the console - - Exceptions can occur when there is unexpected output or due to the board - crashing or hanging. Some exceptions are likely fatal, where retrying will - just chew up time to no available. In those cases it is best to cause - further tests be skipped. - - Args: - ubconfig (ArbitraryAttributeContainer): ubconfig object - log (Logfile): Place to log errors - console (ConsoleBase): Console to clean up, if fatal - err (Exception): Exception which was thrown - name (str): Name of problem, to log - fatal (bool): True to abort all tests - output (str): Extra output to report on boot failure. This can show the - target's console output as it tried to boot - """ - msg = f'{name}: ' - if fatal: - msg += 'Marking connection bad - no other tests will run' - else: - msg += 'Assuming that lab is healthy' - print(msg) - log.error(msg) - log.error(f'Error: {err}') - - if output: - msg += f'; output {output}' - - if fatal: - ubconfig.connection_ok = False - console.cleanup_spawn() - pytest.exit(msg) - - -class Spawn: - """Represents the stdio of a freshly created sub-process. Commands may be - sent to the process, and responses waited for. - - Members: - output: accumulated output from expect() - """ - - def __init__(self, args, cwd=None, decode_signal=False): - """Spawn (fork/exec) the sub-process. - - Args: - args: array of processs arguments. argv[0] is the command to - execute. - cwd: the directory to run the process in, or None for no change. - decode_signal (bool): True to indicate the exception number when - something goes wrong - - Returns: - Nothing. - """ - self.decode_signal = decode_signal - self.waited = False - self.exit_code = 0 - self.exit_info = '' - self.buf = '' - self.output = '' - self.logfile_read = None - self.before = '' - self.after = '' - self.timeout = None - # http://stackoverflow.com/questions/7857352/python-regex-to-match-vt100-escape-sequences - self.re_vt100 = re.compile(r'(\x1b\[|\x9b)[^@-_]*[@-_]|\x1b[@-_]', re.I) - - (self.pid, self.fd) = pty.fork() - if self.pid == 0: - try: - # For some reason, SIGHUP is set to SIG_IGN at this point when - # run under "go" (www.go.cd). Perhaps this happens under any - # background (non-interactive) system? - signal.signal(signal.SIGHUP, signal.SIG_DFL) - if cwd: - os.chdir(cwd) - os.execvp(args[0], args) - except: - print('CHILD EXECEPTION:') - traceback.print_exc() - finally: - os._exit(255) - - old = None - try: - isatty = False - try: - isatty = os.isatty(sys.stdout.fileno()) - - # with --capture=tee-sys we cannot call fileno() - except io.UnsupportedOperation as exc: - pass - if isatty: - new = termios.tcgetattr(self.fd) - old = new - new[3] = new[3] & ~(termios.ICANON | termios.ISIG) - new[3] = new[3] & ~termios.ECHO - new[6][termios.VMIN] = 0 - new[6][termios.VTIME] = 0 - termios.tcsetattr(self.fd, termios.TCSANOW, new) - - self.poll = select.poll() - self.poll.register(self.fd, select.POLLIN | select.POLLPRI | select.POLLERR | - select.POLLHUP | select.POLLNVAL) - except: - if old: - termios.tcsetattr(self.fd, termios.TCSANOW, old) - self.close() - raise - - def kill(self, sig): - """Send unix signal "sig" to the child process. - - Args: - sig: The signal number to send. - - Returns: - Nothing. - """ - - os.kill(self.pid, sig) - - def checkalive(self): - """Determine whether the child process is still running. - - Returns: - tuple: - True if process is alive, else False - 0 if process is alive, else exit code of process - string describing what happened ('' or 'status/signal n') - """ - - if self.waited: - return False, self.exit_code, self.exit_info - - w = os.waitpid(self.pid, os.WNOHANG) - if w[0] == 0: - return True, 0, 'running' - status = w[1] - - if os.WIFEXITED(status): - self.exit_code = os.WEXITSTATUS(status) - self.exit_info = 'status %d' % self.exit_code - elif os.WIFSIGNALED(status): - signum = os.WTERMSIG(status) - self.exit_code = -signum - self.exit_info = 'signal %d (%s)' % (signum, signal.Signals(signum).name) - self.waited = True - return False, self.exit_code, self.exit_info - - def isalive(self): - """Determine whether the child process is still running. - - Args: - None. - - Returns: - Boolean indicating whether process is alive. - """ - return self.checkalive()[0] - - def send(self, data): - """Send data to the sub-process's stdin. - - Args: - data: The data to send to the process. - - Returns: - Nothing. - """ - - os.write(self.fd, data.encode(errors='replace')) - - def receive(self, num_bytes): - """Receive data from the sub-process's stdin. - - Args: - num_bytes (int): Maximum number of bytes to read - - Returns: - str: The data received - - Raises: - ValueError if U-Boot died - """ - try: - c = os.read(self.fd, num_bytes).decode(errors='replace') - except OSError as err: - # With sandbox, try to detect when U-Boot exits when it - # shouldn't and explain why. This is much more friendly than - # just dying with an I/O error - if self.decode_signal and err.errno == 5: # I/O error - alive, _, info = self.checkalive() - if alive: - raise err - raise ValueError('U-Boot exited with %s' % info) - raise - return c - - def expect(self, patterns): - """Wait for the sub-process to emit specific data. - - This function waits for the process to emit one pattern from the - supplied list of patterns, or for a timeout to occur. - - Args: - patterns: A list of strings or regex objects that we expect to - see in the sub-process' stdout. - - Returns: - The index within the patterns array of the pattern the process - emitted. - - Notable exceptions: - Timeout, if the process did not emit any of the patterns within - the expected time. - """ - - for pi in range(len(patterns)): - if type(patterns[pi]) == type(''): - patterns[pi] = re.compile(patterns[pi]) - - tstart_s = time.time() - try: - while True: - earliest_m = None - earliest_pi = None - for pi in range(len(patterns)): - pattern = patterns[pi] - m = pattern.search(self.buf) - if not m: - continue - if earliest_m and m.start() >= earliest_m.start(): - continue - earliest_m = m - earliest_pi = pi - if earliest_m: - pos = earliest_m.start() - posafter = earliest_m.end() - self.before = self.buf[:pos] - self.after = self.buf[pos:posafter] - self.output += self.buf[:posafter] - self.buf = self.buf[posafter:] - return earliest_pi - tnow_s = time.time() - if self.timeout: - tdelta_ms = (tnow_s - tstart_s) * 1000 - poll_maxwait = self.timeout - tdelta_ms - if tdelta_ms > self.timeout: - raise Timeout() - else: - poll_maxwait = None - events = self.poll.poll(poll_maxwait) - if not events: - raise Timeout() - c = self.receive(1024) - if self.logfile_read: - self.logfile_read.write(c) - self.buf += c - # count=0 is supposed to be the default, which indicates - # unlimited substitutions, but in practice the version of - # Python in Ubuntu 14.04 appears to default to count=2! - self.buf = self.re_vt100.sub('', self.buf, count=1000000) - finally: - if self.logfile_read: - self.logfile_read.flush() - - def close(self): - """Close the stdio connection to the sub-process. - - This also waits a reasonable time for the sub-process to stop running. - - Args: - None. - - Returns: - str: Type of closure completed - """ - # For Labgrid-sjg, ask it is exit gracefully, so it can transition the - # board to the final state (like 'off') before exiting. - if os.environ.get('USE_LABGRID_SJG'): - self.send(chr(EXIT_CHAR) * 2) - - # Wait about 10 seconds for Labgrid to close and power off the board - for _ in range(100): - if not self.isalive(): - return 'normal' - time.sleep(0.1) - - # That didn't work, so try closing the PTY - os.close(self.fd) - for _ in range(100): - if not self.isalive(): - return 'break' - time.sleep(0.1) - - return 'timeout' - - def get_expect_output(self): - """Return the output read by expect() - - Returns: - The output processed by expect(), as a string. - """ - return self.output diff --git a/test/py/u_boot_utils.py b/test/py/u_boot_utils.py deleted file mode 100644 index ca80e4b0b0a..00000000000 --- a/test/py/u_boot_utils.py +++ /dev/null @@ -1,382 +0,0 @@ -# SPDX-License-Identifier: GPL-2.0 -# Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved. - -""" -Utility code shared across multiple tests. -""" - -import hashlib -import inspect -import os -import os.path -import pathlib -import signal -import sys -import time -import re -import pytest - -def md5sum_data(data): - """Calculate the MD5 hash of some data. - - Args: - data: The data to hash. - - Returns: - The hash of the data, as a binary string. - """ - - h = hashlib.md5() - h.update(data) - return h.digest() - -def md5sum_file(fn, max_length=None): - """Calculate the MD5 hash of the contents of a file. - - Args: - fn: The filename of the file to hash. - max_length: The number of bytes to hash. If the file has more - bytes than this, they will be ignored. If None or omitted, the - entire file will be hashed. - - Returns: - The hash of the file content, as a binary string. - """ - - with open(fn, 'rb') as fh: - if max_length: - params = [max_length] - else: - params = [] - data = fh.read(*params) - return md5sum_data(data) - -class PersistentRandomFile: - """Generate and store information about a persistent file containing - random data.""" - - def __init__(self, ubman, fn, size): - """Create or process the persistent file. - - If the file does not exist, it is generated. - - If the file does exist, its content is hashed for later comparison. - - These files are always located in the "persistent data directory" of - the current test run. - - Args: - ubman: A console connection to U-Boot. - fn: The filename (without path) to create. - size: The desired size of the file in bytes. - - Returns: - Nothing. - """ - - self.fn = fn - - self.abs_fn = ubman.config.persistent_data_dir + '/' + fn - - if os.path.exists(self.abs_fn): - ubman.log.action('Persistent data file ' + self.abs_fn + - ' already exists') - self.content_hash = md5sum_file(self.abs_fn) - else: - ubman.log.action('Generating ' + self.abs_fn + - ' (random, persistent, %d bytes)' % size) - data = os.urandom(size) - with open(self.abs_fn, 'wb') as fh: - fh.write(data) - self.content_hash = md5sum_data(data) - -def attempt_to_open_file(fn): - """Attempt to open a file, without throwing exceptions. - - Any errors (exceptions) that occur during the attempt to open the file - are ignored. This is useful in order to test whether a file (in - particular, a device node) exists and can be successfully opened, in order - to poll for e.g. USB enumeration completion. - - Args: - fn: The filename to attempt to open. - - Returns: - An open file handle to the file, or None if the file could not be - opened. - """ - - try: - return open(fn, 'rb') - except: - return None - -def wait_until_open_succeeds(fn): - """Poll until a file can be opened, or a timeout occurs. - - Continually attempt to open a file, and return when this succeeds, or - raise an exception after a timeout. - - Args: - fn: The filename to attempt to open. - - Returns: - An open file handle to the file. - """ - - for i in range(100): - fh = attempt_to_open_file(fn) - if fh: - return fh - time.sleep(0.1) - raise Exception('File could not be opened') - -def wait_until_file_open_fails(fn, ignore_errors): - """Poll until a file cannot be opened, or a timeout occurs. - - Continually attempt to open a file, and return when this fails, or - raise an exception after a timeout. - - Args: - fn: The filename to attempt to open. - ignore_errors: Indicate whether to ignore timeout errors. If True, the - function will simply return if a timeout occurs, otherwise an - exception will be raised. - - Returns: - Nothing. - """ - - for _ in range(100): - fh = attempt_to_open_file(fn) - if not fh: - return - fh.close() - time.sleep(0.1) - if ignore_errors: - return - raise Exception('File can still be opened') - -def run_and_log(ubman, cmd, ignore_errors=False, stdin=None, env=None): - """Run a command and log its output. - - Args: - ubman: A console connection to U-Boot. - cmd: The command to run, as an array of argv[], or a string. - If a string, note that it is split up so that quoted spaces - will not be preserved. E.g. "fred and" becomes ['"fred', 'and"'] - ignore_errors: Indicate whether to ignore errors. If True, the function - will simply return if the command cannot be executed or exits with - an error code, otherwise an exception will be raised if such - problems occur. - stdin: Input string to pass to the command as stdin (or None) - env: Environment to use, or None to use the current one - - Returns: - The output as a string. - """ - if isinstance(cmd, str): - cmd = cmd.split() - runner = ubman.log.get_runner(cmd[0], sys.stdout) - output = runner.run(cmd, ignore_errors=ignore_errors, stdin=stdin, env=env) - runner.close() - return output - -def run_and_log_expect_exception(ubman, cmd, retcode, msg): - """Run a command that is expected to fail. - - This runs a command and checks that it fails with the expected return code - and exception method. If not, an exception is raised. - - Args: - ubman: A console connection to U-Boot. - cmd: The command to run, as an array of argv[]. - retcode: Expected non-zero return code from the command. - msg: String that should be contained within the command's output. - """ - try: - runner = ubman.log.get_runner(cmd[0], sys.stdout) - runner.run(cmd) - except Exception: - assert retcode == runner.exit_status - assert msg in runner.output - else: - raise Exception("Expected an exception with retcode %d message '%s'," - "but it was not raised" % (retcode, msg)) - finally: - runner.close() - -ram_base = None -def find_ram_base(ubman): - """Find the running U-Boot's RAM location. - - Probe the running U-Boot to determine the address of the first bank - of RAM. This is useful for tests that test reading/writing RAM, or - load/save files that aren't associated with some standard address - typically represented in an environment variable such as - ${kernel_addr_r}. The value is cached so that it only needs to be - actively read once. - - Args: - ubman: A console connection to U-Boot. - - Returns: - The address of U-Boot's first RAM bank, as an integer. - """ - - global ram_base - if ubman.config.buildconfig.get('config_cmd_bdi', 'n') != 'y': - pytest.skip('bdinfo command not supported') - if ram_base == -1: - pytest.skip('Previously failed to find RAM bank start') - if ram_base is not None: - return ram_base - - with ubman.log.section('find_ram_base'): - response = ubman.run_command('bdinfo') - for l in response.split('\n'): - if '-> start' in l or 'memstart =' in l: - ram_base = int(l.split('=')[1].strip(), 16) - break - if ram_base is None: - ram_base = -1 - raise Exception('Failed to find RAM bank start in `bdinfo`') - - # We don't want ram_base to be zero as some functions test if the given - # address is NULL (0). Besides, on some RISC-V targets the low memory - # is protected that prevents S-mode U-Boot from access. - # Let's add 2MiB then (size of an ARM LPAE/v8 section). - - ram_base += 1024 * 1024 * 2 - - return ram_base - -class PersistentFileHelperCtxMgr(object): - """A context manager for Python's "with" statement, which ensures that any - generated file is deleted (and hence regenerated) if its mtime is older - than the mtime of the Python module which generated it, and gets an mtime - newer than the mtime of the Python module which generated after it is - generated. Objects of this type should be created by factory function - persistent_file_helper rather than directly.""" - - def __init__(self, log, filename): - """Initialize a new object. - - Args: - log: The Logfile object to log to. - filename: The filename of the generated file. - - Returns: - Nothing. - """ - - self.log = log - self.filename = filename - - def __enter__(self): - frame = inspect.stack()[1] - module = inspect.getmodule(frame[0]) - self.module_filename = module.__file__ - self.module_timestamp = os.path.getmtime(self.module_filename) - - if os.path.exists(self.filename): - filename_timestamp = os.path.getmtime(self.filename) - if filename_timestamp < self.module_timestamp: - self.log.action('Removing stale generated file ' + - self.filename) - pathlib.Path(self.filename).unlink() - - def __exit__(self, extype, value, traceback): - if extype: - try: - pathlib.Path(self.filename).unlink() - except Exception: - pass - return - logged = False - for _ in range(20): - filename_timestamp = os.path.getmtime(self.filename) - if filename_timestamp > self.module_timestamp: - break - if not logged: - self.log.action( - 'Waiting for generated file timestamp to increase') - logged = True - os.utime(self.filename) - time.sleep(0.1) - -def persistent_file_helper(u_boot_log, filename): - """Manage the timestamps and regeneration of a persistent generated - file. This function creates a context manager for Python's "with" - statement - - Usage: - with persistent_file_helper(ubman.log, filename): - code to generate the file, if it's missing. - - Args: - u_boot_log: ubman.log. - filename: The filename of the generated file. - - Returns: - A context manager object. - """ - - return PersistentFileHelperCtxMgr(u_boot_log, filename) - -def crc32(ubman, address, count): - """Helper function used to compute the CRC32 value of a section of RAM. - - Args: - ubman: A U-Boot console connection. - address: Address where data starts. - count: Amount of data to use for calculation. - - Returns: - CRC32 value - """ - - bcfg = ubman.config.buildconfig - has_cmd_crc32 = bcfg.get('config_cmd_crc32', 'n') == 'y' - assert has_cmd_crc32, 'Cannot compute crc32 without CONFIG_CMD_CRC32.' - output = ubman.run_command('crc32 %08x %x' % (address, count)) - - m = re.search('==> ([0-9a-fA-F]{8})$', output) - assert m, 'CRC32 operation failed.' - - return m.group(1) - -def waitpid(pid, timeout=60, kill=False): - """Wait a process to terminate by its PID - - This is an alternative to a os.waitpid(pid, 0) call that works on - processes that aren't children of the python process. - - Args: - pid: PID of a running process. - timeout: Time in seconds to wait. - kill: Whether to forcibly kill the process after timeout. - - Returns: - True, if the process ended on its own. - False, if the process was killed by this function. - - Raises: - TimeoutError, if the process is still running after timeout. - """ - try: - for _ in range(timeout): - os.kill(pid, 0) - time.sleep(1) - - if kill: - os.kill(pid, signal.SIGKILL) - return False - - except ProcessLookupError: - return True - - raise TimeoutError( - "Process with PID {} did not terminate after {} seconds." - .format(pid, timeout) - ) diff --git a/test/py/utils.py b/test/py/utils.py new file mode 100644 index 00000000000..ca80e4b0b0a --- /dev/null +++ b/test/py/utils.py @@ -0,0 +1,382 @@ +# SPDX-License-Identifier: GPL-2.0 +# Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved. + +""" +Utility code shared across multiple tests. +""" + +import hashlib +import inspect +import os +import os.path +import pathlib +import signal +import sys +import time +import re +import pytest + +def md5sum_data(data): + """Calculate the MD5 hash of some data. + + Args: + data: The data to hash. + + Returns: + The hash of the data, as a binary string. + """ + + h = hashlib.md5() + h.update(data) + return h.digest() + +def md5sum_file(fn, max_length=None): + """Calculate the MD5 hash of the contents of a file. + + Args: + fn: The filename of the file to hash. + max_length: The number of bytes to hash. If the file has more + bytes than this, they will be ignored. If None or omitted, the + entire file will be hashed. + + Returns: + The hash of the file content, as a binary string. + """ + + with open(fn, 'rb') as fh: + if max_length: + params = [max_length] + else: + params = [] + data = fh.read(*params) + return md5sum_data(data) + +class PersistentRandomFile: + """Generate and store information about a persistent file containing + random data.""" + + def __init__(self, ubman, fn, size): + """Create or process the persistent file. + + If the file does not exist, it is generated. + + If the file does exist, its content is hashed for later comparison. + + These files are always located in the "persistent data directory" of + the current test run. + + Args: + ubman: A console connection to U-Boot. + fn: The filename (without path) to create. + size: The desired size of the file in bytes. + + Returns: + Nothing. + """ + + self.fn = fn + + self.abs_fn = ubman.config.persistent_data_dir + '/' + fn + + if os.path.exists(self.abs_fn): + ubman.log.action('Persistent data file ' + self.abs_fn + + ' already exists') + self.content_hash = md5sum_file(self.abs_fn) + else: + ubman.log.action('Generating ' + self.abs_fn + + ' (random, persistent, %d bytes)' % size) + data = os.urandom(size) + with open(self.abs_fn, 'wb') as fh: + fh.write(data) + self.content_hash = md5sum_data(data) + +def attempt_to_open_file(fn): + """Attempt to open a file, without throwing exceptions. + + Any errors (exceptions) that occur during the attempt to open the file + are ignored. This is useful in order to test whether a file (in + particular, a device node) exists and can be successfully opened, in order + to poll for e.g. USB enumeration completion. + + Args: + fn: The filename to attempt to open. + + Returns: + An open file handle to the file, or None if the file could not be + opened. + """ + + try: + return open(fn, 'rb') + except: + return None + +def wait_until_open_succeeds(fn): + """Poll until a file can be opened, or a timeout occurs. + + Continually attempt to open a file, and return when this succeeds, or + raise an exception after a timeout. + + Args: + fn: The filename to attempt to open. + + Returns: + An open file handle to the file. + """ + + for i in range(100): + fh = attempt_to_open_file(fn) + if fh: + return fh + time.sleep(0.1) + raise Exception('File could not be opened') + +def wait_until_file_open_fails(fn, ignore_errors): + """Poll until a file cannot be opened, or a timeout occurs. + + Continually attempt to open a file, and return when this fails, or + raise an exception after a timeout. + + Args: + fn: The filename to attempt to open. + ignore_errors: Indicate whether to ignore timeout errors. If True, the + function will simply return if a timeout occurs, otherwise an + exception will be raised. + + Returns: + Nothing. + """ + + for _ in range(100): + fh = attempt_to_open_file(fn) + if not fh: + return + fh.close() + time.sleep(0.1) + if ignore_errors: + return + raise Exception('File can still be opened') + +def run_and_log(ubman, cmd, ignore_errors=False, stdin=None, env=None): + """Run a command and log its output. + + Args: + ubman: A console connection to U-Boot. + cmd: The command to run, as an array of argv[], or a string. + If a string, note that it is split up so that quoted spaces + will not be preserved. E.g. "fred and" becomes ['"fred', 'and"'] + ignore_errors: Indicate whether to ignore errors. If True, the function + will simply return if the command cannot be executed or exits with + an error code, otherwise an exception will be raised if such + problems occur. + stdin: Input string to pass to the command as stdin (or None) + env: Environment to use, or None to use the current one + + Returns: + The output as a string. + """ + if isinstance(cmd, str): + cmd = cmd.split() + runner = ubman.log.get_runner(cmd[0], sys.stdout) + output = runner.run(cmd, ignore_errors=ignore_errors, stdin=stdin, env=env) + runner.close() + return output + +def run_and_log_expect_exception(ubman, cmd, retcode, msg): + """Run a command that is expected to fail. + + This runs a command and checks that it fails with the expected return code + and exception method. If not, an exception is raised. + + Args: + ubman: A console connection to U-Boot. + cmd: The command to run, as an array of argv[]. + retcode: Expected non-zero return code from the command. + msg: String that should be contained within the command's output. + """ + try: + runner = ubman.log.get_runner(cmd[0], sys.stdout) + runner.run(cmd) + except Exception: + assert retcode == runner.exit_status + assert msg in runner.output + else: + raise Exception("Expected an exception with retcode %d message '%s'," + "but it was not raised" % (retcode, msg)) + finally: + runner.close() + +ram_base = None +def find_ram_base(ubman): + """Find the running U-Boot's RAM location. + + Probe the running U-Boot to determine the address of the first bank + of RAM. This is useful for tests that test reading/writing RAM, or + load/save files that aren't associated with some standard address + typically represented in an environment variable such as + ${kernel_addr_r}. The value is cached so that it only needs to be + actively read once. + + Args: + ubman: A console connection to U-Boot. + + Returns: + The address of U-Boot's first RAM bank, as an integer. + """ + + global ram_base + if ubman.config.buildconfig.get('config_cmd_bdi', 'n') != 'y': + pytest.skip('bdinfo command not supported') + if ram_base == -1: + pytest.skip('Previously failed to find RAM bank start') + if ram_base is not None: + return ram_base + + with ubman.log.section('find_ram_base'): + response = ubman.run_command('bdinfo') + for l in response.split('\n'): + if '-> start' in l or 'memstart =' in l: + ram_base = int(l.split('=')[1].strip(), 16) + break + if ram_base is None: + ram_base = -1 + raise Exception('Failed to find RAM bank start in `bdinfo`') + + # We don't want ram_base to be zero as some functions test if the given + # address is NULL (0). Besides, on some RISC-V targets the low memory + # is protected that prevents S-mode U-Boot from access. + # Let's add 2MiB then (size of an ARM LPAE/v8 section). + + ram_base += 1024 * 1024 * 2 + + return ram_base + +class PersistentFileHelperCtxMgr(object): + """A context manager for Python's "with" statement, which ensures that any + generated file is deleted (and hence regenerated) if its mtime is older + than the mtime of the Python module which generated it, and gets an mtime + newer than the mtime of the Python module which generated after it is + generated. Objects of this type should be created by factory function + persistent_file_helper rather than directly.""" + + def __init__(self, log, filename): + """Initialize a new object. + + Args: + log: The Logfile object to log to. + filename: The filename of the generated file. + + Returns: + Nothing. + """ + + self.log = log + self.filename = filename + + def __enter__(self): + frame = inspect.stack()[1] + module = inspect.getmodule(frame[0]) + self.module_filename = module.__file__ + self.module_timestamp = os.path.getmtime(self.module_filename) + + if os.path.exists(self.filename): + filename_timestamp = os.path.getmtime(self.filename) + if filename_timestamp < self.module_timestamp: + self.log.action('Removing stale generated file ' + + self.filename) + pathlib.Path(self.filename).unlink() + + def __exit__(self, extype, value, traceback): + if extype: + try: + pathlib.Path(self.filename).unlink() + except Exception: + pass + return + logged = False + for _ in range(20): + filename_timestamp = os.path.getmtime(self.filename) + if filename_timestamp > self.module_timestamp: + break + if not logged: + self.log.action( + 'Waiting for generated file timestamp to increase') + logged = True + os.utime(self.filename) + time.sleep(0.1) + +def persistent_file_helper(u_boot_log, filename): + """Manage the timestamps and regeneration of a persistent generated + file. This function creates a context manager for Python's "with" + statement + + Usage: + with persistent_file_helper(ubman.log, filename): + code to generate the file, if it's missing. + + Args: + u_boot_log: ubman.log. + filename: The filename of the generated file. + + Returns: + A context manager object. + """ + + return PersistentFileHelperCtxMgr(u_boot_log, filename) + +def crc32(ubman, address, count): + """Helper function used to compute the CRC32 value of a section of RAM. + + Args: + ubman: A U-Boot console connection. + address: Address where data starts. + count: Amount of data to use for calculation. + + Returns: + CRC32 value + """ + + bcfg = ubman.config.buildconfig + has_cmd_crc32 = bcfg.get('config_cmd_crc32', 'n') == 'y' + assert has_cmd_crc32, 'Cannot compute crc32 without CONFIG_CMD_CRC32.' + output = ubman.run_command('crc32 %08x %x' % (address, count)) + + m = re.search('==> ([0-9a-fA-F]{8})$', output) + assert m, 'CRC32 operation failed.' + + return m.group(1) + +def waitpid(pid, timeout=60, kill=False): + """Wait a process to terminate by its PID + + This is an alternative to a os.waitpid(pid, 0) call that works on + processes that aren't children of the python process. + + Args: + pid: PID of a running process. + timeout: Time in seconds to wait. + kill: Whether to forcibly kill the process after timeout. + + Returns: + True, if the process ended on its own. + False, if the process was killed by this function. + + Raises: + TimeoutError, if the process is still running after timeout. + """ + try: + for _ in range(timeout): + os.kill(pid, 0) + time.sleep(1) + + if kill: + os.kill(pid, signal.SIGKILL) + return False + + except ProcessLookupError: + return True + + raise TimeoutError( + "Process with PID {} did not terminate after {} seconds." + .format(pid, timeout) + ) -- cgit v1.3.1