diff --git a/meson_options.txt b/meson_options.txt index f1978d17df..53ec80b0e8 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -77,6 +77,12 @@ option( value : false, description: 'Run tests against real hardware' ) +option( + 'nvme-bin', + type : 'string', + value : '', + description: 'Path to nvme binary used for end-to-end tests (default: use the binary from the build tree)' +) option( 'tests', type : 'boolean', diff --git a/tests/README b/tests/README index 1ca4bd6db3..a4c6d2fb75 100644 --- a/tests/README +++ b/tests/README @@ -49,3 +49,21 @@ Running testcases with framework 2. Running all the testcases (in the build root directory) :- $ meson test -C .build + +Selecting the nvme binary +-------------------------- + By default the tests use the nvme binary built in the current build tree. + To test against a different version (e.g. a distribution-provided binary) + two mechanisms are available: + + a. Environment variable (highest priority, overrides config.json): + $ NVME_BIN=/usr/bin/nvme python3 tests/tap_runner.py --start-dir tests nvme_id_ctrl_test + + b. Meson option (for meson-driven test runs): + $ meson setup .build -Dnvme-bin=/usr/bin/nvme + $ meson test -C .build + + c. config.json entry (lower priority than NVME_BIN): + Add "nvme_bin": "/usr/bin/nvme" to tests/config.json. + + Priority order: NVME_BIN env var > config.json nvme_bin > build-tree binary diff --git a/tests/meson.build b/tests/meson.build index 12c1501920..4ba987adac 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -41,6 +41,18 @@ python_module = import('python') python = python_module.find_installation('python3') +# Determine which nvme binary to expose to the end-to-end tests. +# The -Dnvme-bin option lets callers point at a distribution-provided (or +# any other) binary without touching config.json: +# meson setup .build -Dnvme-bin=/usr/bin/nvme +# When the option is not set the binary produced by the current build is used. +nvme_bin_opt = get_option('nvme-bin') +if nvme_bin_opt != '' + nvme_bin_for_test = nvme_bin_opt +else + nvme_bin_for_test = meson.project_build_root() / 'nvme' +endif + foreach t : tests t_name = t.split('.')[0] test( @@ -51,7 +63,10 @@ foreach t : tests '--start-dir', meson.current_source_dir(), t_name, ], - env: ['PATH=' + meson.project_build_root() + ':/usr/bin:/usr/sbin'], + env: [ + 'PATH=' + meson.project_build_root() + ':/usr/bin:/usr/sbin', + 'NVME_BIN=' + nvme_bin_for_test, + ], timeout: 500, protocol: 'tap', is_parallel: false, diff --git a/tests/nvme_copy_test.py b/tests/nvme_copy_test.py index f63df24647..415efc6315 100644 --- a/tests/nvme_copy_test.py +++ b/tests/nvme_copy_test.py @@ -22,8 +22,6 @@ """ -import json - from nvme_test import TestNVMe, to_decimal @@ -101,15 +99,21 @@ def _get_current_ns_pif(self): result = self.run_cmd(id_ns_cmd) if result.returncode != 0: return 0 - flbas = int(json.loads(result.stdout).get("flbas", 0)) + id_ns_data = self.parse_json_output(result.stdout, "nvme id-ns") + flbas = int(id_ns_data.get("flbas", 0)) lbaf_idx = (flbas & 0xF) | (((flbas >> 5) & 0x3) << 4) nvm_id_ns_cmd = f"{self.nvme_bin} nvm-id-ns {self.ns1} --output-format=json" result = self.run_cmd(nvm_id_ns_cmd) if result.returncode != 0: return 0 - elbafs = json.loads(result.stdout).get("elbafs", []) + nvm_id_ns_data = self.parse_json_output(result.stdout, "nvme nvm-id-ns") + elbafs = nvm_id_ns_data.get("elbafs", []) + self.assertIsInstance(elbafs, list, + f"ERROR : nvm-id-ns returned invalid elbafs type: {type(elbafs).__name__}") if lbaf_idx < len(elbafs): + self.assertIsInstance(elbafs[lbaf_idx], dict, + f"ERROR : invalid elbaf entry: {elbafs[lbaf_idx]!r}") return elbafs[lbaf_idx].get("pif", 0) return 0 @@ -144,8 +148,13 @@ def _find_64b_guard_lbaf_index(self): result = self.run_cmd(nvm_id_ns_cmd) if result.returncode != 0: return None - elbafs = json.loads(result.stdout).get("elbafs", []) + nvm_id_ns_data = self.parse_json_output(result.stdout, "nvme nvm-id-ns") + elbafs = nvm_id_ns_data.get("elbafs", []) + self.assertIsInstance(elbafs, list, + f"ERROR : nvm-id-ns returned invalid elbafs type: {type(elbafs).__name__}") for i, elbaf in enumerate(elbafs): + self.assertIsInstance(elbaf, dict, + f"ERROR : invalid elbaf entry: {elbaf!r}") if elbaf.get("pif", 0) == 2: # NVME_NVM_PIF_64B_GUARD = 2 return i return None @@ -215,7 +224,7 @@ def _enable_cdfe_for_format(self, desc_format): result = self.run_cmd(get_features_cmd) self.assertEqual(result.returncode, 0, "ERROR : nvme feat host-behavior-support failed") - data = json.loads(result.stdout) + data = self.parse_json_output(result.stdout, "nvme feat host-behavior-support") fields = data.get("Feature: 0x16", [{}])[0] current_cdfe = ( (0x4 if fields.get("Copy Descriptor Format 2h Enable (CDF2E)") == "True" else 0) | diff --git a/tests/nvme_format_test.py b/tests/nvme_format_test.py index 82d31731fb..1f32f45d4a 100644 --- a/tests/nvme_format_test.py +++ b/tests/nvme_format_test.py @@ -37,7 +37,6 @@ - Delete Namespace. """ -import json import logging import math @@ -111,8 +110,10 @@ def attach_detach_primary_ns(self): f"--namespace-id={self.default_nsid} --output-format=json" result = self.run_cmd(id_ns_cmd) self.assertEqual(result.returncode, 0, "ERROR : nvme id-ns failed") - json_output = json.loads(result.stdout) - self.lba_format_list = json_output['lbafs'] + json_output = self.parse_json_output(result.stdout, "nvme id-ns") + self.lba_format_list = json_output.get('lbafs', []) + self.assertIsInstance(self.lba_format_list, list, + f"ERROR : lbafs must be a list, got {type(self.lba_format_list).__name__}") self.assertTrue(len(self.lba_format_list) > 0, "ERROR : nvme id-ns could not find any lba formats") self.assertEqual(self.detach_ns(self.ctrl_id, self.default_nsid), 0) @@ -127,6 +128,12 @@ def test_format_ns(self): print("##### Testing lba formats:") # iterate through all supported format for flbas, lba_format in enumerate(self.lba_format_list): + self.assertIsInstance(lba_format, dict, + f"ERROR : lba format entry must be dict, got {type(lba_format).__name__}: {lba_format!r}") + self.assertIn('ds', lba_format, + f"ERROR : lba format entry missing ds: {lba_format!r}") + self.assertIn('ms', lba_format, + f"ERROR : lba format entry missing ms: {lba_format!r}") ds = lba_format['ds'] ms = lba_format['ms'] print(f"\nlba format {str(flbas)}" diff --git a/tests/nvme_test.py b/tests/nvme_test.py index 98f6b2daea..ba0938ade7 100644 --- a/tests/nvme_test.py +++ b/tests/nvme_test.py @@ -168,6 +168,16 @@ def load_config(self): if not logging.getLogger().handlers: logging.basicConfig(format='%(message)s', stream=sys.stdout) logging.getLogger().setLevel(log_level) + + # Environment variable takes the highest priority, overriding both + # the default and any value from config.json. This allows running + # the tests against a distribution-provided (or any other) binary + # without modifying config.json: + # NVME_BIN=/usr/bin/nvme python3 tests/tap_runner.py ... + env_nvme_bin = os.environ.get('NVME_BIN') + if env_nvme_bin: + self.nvme_bin = env_nvme_bin + logger.debug("Using nvme binary '%s'", self.nvme_bin) if self.clear_log_dir is True: @@ -208,6 +218,34 @@ def run_cmd(self, cmd, stdin_data=None): logger.debug(result.stderr) return result + def parse_json_output(self, output, context, expected_type=dict): + """Parse JSON output and fail test clearly on malformed or wrong-typed data. + + context should identify the command/action that produced output. + Pass expected_type=None to skip type validation. + """ + try: + data = json.loads(output) + except (TypeError, json.JSONDecodeError) as exc: + self.fail(f"ERROR : invalid JSON from {context}: {exc}; output={output!r}") + + if expected_type is not None and not isinstance(data, expected_type): + self.fail( + "ERROR : unexpected JSON type from " + f"{context}: expected {expected_type.__name__}, got {type(data).__name__}" + ) + return data + + def json_get(self, data, key, default=None, context="JSON output", required=False): + """Return key from JSON dict and optionally fail if key is missing.""" + if not isinstance(data, dict): + self.fail( + f"ERROR : expected JSON object for {context}, got {type(data).__name__}" + ) + if required and key not in data: + self.fail(f"ERROR : missing key '{key}' in {context}: {data!r}") + return data.get(key, default) + def exec_cmd(self, cmd): """ Wrapper for executing a shell command and return the result. """ return self.run_cmd(cmd).returncode @@ -237,10 +275,18 @@ def get_ctrl_id(self): "--output-format=json" result = self.run_cmd(get_ctrl_id) self.assertEqual(result.returncode, 0, "ERROR : nvme list-ctrl failed") - json_output = json.loads(result.stdout) - self.assertTrue(len(json_output['ctrl_list']) > 0, + json_output = self.parse_json_output(result.stdout, "nvme list-ctrl") + ctrl_list = self.json_get(json_output, 'ctrl_list', context="nvme list-ctrl", required=True) + self.assertIsInstance(ctrl_list, list, + "ERROR : nvme list-ctrl returned invalid ctrl_list type") + self.assertTrue(len(ctrl_list) > 0, "ERROR : nvme list-ctrl could not find ctrl") - return str(json_output['ctrl_list'][0]['ctrl_id']) + first_ctrl = ctrl_list[0] + self.assertIsInstance(first_ctrl, dict, + "ERROR : nvme list-ctrl returned invalid controller entry") + self.assertIn('ctrl_id', first_ctrl, + f"ERROR : nvme list-ctrl missing ctrl_id: {first_ctrl!r}") + return str(first_ctrl['ctrl_id']) def get_ns_mgmt_support(self): """ @@ -278,9 +324,16 @@ def get_nsid_list(self): "--output-format=json" result = self.run_cmd(ns_list_cmd) self.assertEqual(result.returncode, 0, "ERROR : nvme list namespace failed") - json_output = json.loads(result.stdout) - - for ns in json_output['nsid_list']: + json_output = self.parse_json_output(result.stdout, "nvme list-ns") + + nsid_list = self.json_get(json_output, 'nsid_list', context="nvme list-ns", required=True) + self.assertIsInstance(nsid_list, list, + "ERROR : nvme list-ns returned invalid nsid_list type") + for ns in nsid_list: + self.assertIsInstance(ns, dict, + f"ERROR : nvme list-ns returned invalid namespace entry: {ns!r}") + self.assertIn('nsid', ns, + f"ERROR : nvme list-ns entry missing nsid: {ns!r}") ns_list.append(ns['nsid']) return ns_list @@ -296,8 +349,10 @@ def get_max_ns(self): "--output-format=json" result = self.run_cmd(max_ns_cmd) self.assertEqual(result.returncode, 0, "ERROR : reading maximum namespace count failed") - json_output = json.loads(result.stdout) - return int(json_output['nn']) + json_output = self.parse_json_output(result.stdout, "nvme id-ctrl") + nn = self.json_get(json_output, 'nn', context="nvme id-ctrl", required=True) + self.assertIsNotNone(nn, "ERROR : reading maximum namespace count failed") + return int(nn) def get_lba_status_supported(self): """ Check if 'Get LBA Status' command is supported by the device @@ -320,9 +375,13 @@ def _get_active_lbaf_index(self): "--output-format=json" result = self.run_cmd(nvme_id_ns_cmd) self.assertEqual(result.returncode, 0, "ERROR : reading id-ns") - json_output = json.loads(result.stdout) + json_output = self.parse_json_output(result.stdout, "nvme id-ns") for lbaf in json_output.get('lbafs', []): + self.assertIsInstance(lbaf, dict, + f"ERROR : id-ns returned invalid lbaf entry: {lbaf!r}") if lbaf.get('in_use') == 1: + self.assertIn('lbaf', lbaf, + f"ERROR : id-ns lbaf entry missing lbaf index: {lbaf!r}") return int(lbaf['lbaf']) return 0 @@ -339,7 +398,7 @@ def _get_ns_dps(self): "--output-format=json" result = self.run_cmd(nvme_id_ns_cmd) self.assertEqual(result.returncode, 0, "ERROR : reading id-ns") - json_output = json.loads(result.stdout) + json_output = self.parse_json_output(result.stdout, "nvme id-ns") return int(json_output.get('dps', 0)) def _get_pif(self): @@ -361,7 +420,7 @@ def _get_pif(self): "--output-format=json" result = self.run_cmd(nvme_id_ns_cmd) self.assertEqual(result.returncode, 0, "ERROR : reading id-ns") - json_output = json.loads(result.stdout) + json_output = self.parse_json_output(result.stdout, "nvme id-ns") dps = int(json_output.get('dps', 0)) return (dps >> 3) & 0x7 @@ -375,7 +434,7 @@ def _is_metadata_ext(self): "--output-format=json" result = self.run_cmd(nvme_id_ns_cmd) self.assertEqual(result.returncode, 0, "ERROR : reading id-ns") - json_output = json.loads(result.stdout) + json_output = self.parse_json_output(result.stdout, "nvme id-ns") flbas = int(json_output.get('flbas', 0)) return bool(flbas & (1 << 4)) @@ -390,10 +449,17 @@ def get_lba_format_size(self): "--output-format=json" result = self.run_cmd(nvme_id_ns_cmd) self.assertEqual(result.returncode, 0, "ERROR : reading id-ns") - json_output = json.loads(result.stdout) - self.assertTrue(len(json_output['lbafs']) > self.flbas, - "Error : could not match the given flbas to an existing lbaf") - lbaf_json = json_output['lbafs'][int(self.flbas)] + json_output = self.parse_json_output(result.stdout, "nvme id-ns") + lbafs = self.json_get(json_output, 'lbafs', context="nvme id-ns", required=True) + self.assertIsInstance(lbafs, list, + f"ERROR : id-ns returned invalid lbafs type, expected list, got {type(lbafs).__name__}") + self.assertTrue(len(lbafs) > self.flbas, + "ERROR : could not match the given flbas to an existing lbaf") + lbaf_json = lbafs[int(self.flbas)] + self.assertIsInstance(lbaf_json, dict, + f"ERROR : id-ns returned invalid lbaf entry, expected dict, got {type(lbaf_json).__name__}") + self.assertIn('ms', lbaf_json, "ERROR : id-ns lbaf missing 'ms'") + self.assertIn('ds', lbaf_json, "ERROR : id-ns lbaf missing 'ds'") ms = int(lbaf_json['ms']) ds_expo = int(lbaf_json['ds']) ds = (1 << ds_expo) if ds_expo > 0 else 0 @@ -419,7 +485,7 @@ def get_id_ctrl_field_value(self, field): "--output-format=json" result = self.run_cmd(id_ctrl_cmd) self.assertEqual(result.returncode, 0, "ERROR : reading id-ctrl failed") - json_output = json.loads(result.stdout) + json_output = self.parse_json_output(result.stdout, "nvme id-ctrl") self.assertTrue(field in json_output, f"ERROR : reading field '{field}' failed") return str(json_output[field]) @@ -435,7 +501,7 @@ def get_id_ns_field_value(self, field): "--output-format=json" result = self.run_cmd(id_ns_cmd) self.assertEqual(result.returncode, 0, "ERROR : reading id-ns failed") - json_output = json.loads(result.stdout) + json_output = self.parse_json_output(result.stdout, "nvme id-ns") self.assertTrue(field in json_output, f"ERROR : reading field '{field}' failed") return str(json_output[field]) @@ -463,8 +529,11 @@ def delete_all_ns(self): "--output-format=json" result = self.run_cmd(list_ns_cmd) self.assertEqual(result.returncode, 0, "ERROR : nvme list-ns failed") - json_output = json.loads(result.stdout) - self.assertEqual(len(json_output['nsid_list']), 0, + json_output = self.parse_json_output(result.stdout, "nvme list-ns") + nsid_list = self.json_get(json_output, 'nsid_list', context="nvme list-ns", required=True) + self.assertIsInstance(nsid_list, list, + "ERROR : nvme list-ns returned invalid nsid_list type") + self.assertEqual(len(nsid_list), 0, "ERROR : deleting all namespace failed") def create_ns(self, nsze, ncap, flbas, dps): @@ -483,6 +552,39 @@ def create_ns(self, nsze, ncap, flbas, dps): result = self.run_cmd(create_ns_cmd) return result.returncode, result.stdout + def _get_created_nsid(self, stdout): + """Extract namespace id from create-ns output (JSON or legacy text). + + Returns: + int: Namespace ID from `{"nsid": ...}` JSON output or from + legacy text output like "created nsid: X". + """ + try: + json_output = json.loads(stdout) + except json.JSONDecodeError: + match = re.search( + r"created nsid:\s*(\d+)", stdout, re.IGNORECASE + ) + self.assertIsNotNone( + match, + "ERROR : create-ns output missing nsid " + "(expected format: 'created nsid: '), " + f"got: {stdout!r}", + ) + return int(match.group(1)) + + self.assertIsInstance( + json_output, + dict, + f"ERROR : unexpected create-ns JSON output type: {type(json_output).__name__}", + ) + self.assertIn( + 'nsid', + json_output, + f"ERROR : create-ns JSON output missing nsid field: {json_output}", + ) + return int(json_output['nsid']) + def create_and_validate_ns(self, nsid, nsze, ncap, flbas, dps): """ Wrapper for creating and validating a namespace. - Args: @@ -496,8 +598,8 @@ def create_and_validate_ns(self, nsid, nsze, ncap, flbas, dps): """ err, stdout = self.create_ns(nsze, ncap, flbas, dps) if err == 0: - json_output = json.loads(stdout) - self.assertEqual(int(json_output['nsid']), nsid, + created_nsid = self._get_created_nsid(stdout) + self.assertEqual(created_nsid, nsid, "ERROR : create namespace failed") id_ns_cmd = f"{self.nvme_bin} id-ns {self.ctrl} " + \ f"--namespace-id={str(nsid)}"