Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions tests/nvme_copy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

"""

import json

from nvme_test import TestNVMe, to_decimal


Expand Down Expand Up @@ -101,15 +99,21 @@ def _get_current_ns_pif(self):
result = self.run_cmd(id_ns_cmd)
if result.returncode != 0:
return 0
flbas = int(json.loads(result.stdout).get("flbas", 0))
id_ns_data = self.parse_json_output(result.stdout, "nvme id-ns")
flbas = int(id_ns_data.get("flbas", 0))
lbaf_idx = (flbas & 0xF) | (((flbas >> 5) & 0x3) << 4)

nvm_id_ns_cmd = f"{self.nvme_bin} nvm-id-ns {self.ns1} --output-format=json"
result = self.run_cmd(nvm_id_ns_cmd)
if result.returncode != 0:
return 0
elbafs = json.loads(result.stdout).get("elbafs", [])
nvm_id_ns_data = self.parse_json_output(result.stdout, "nvme nvm-id-ns")
elbafs = nvm_id_ns_data.get("elbafs", [])
self.assertIsInstance(elbafs, list,
f"ERROR : nvm-id-ns returned invalid elbafs type: {type(elbafs).__name__}")
if lbaf_idx < len(elbafs):
self.assertIsInstance(elbafs[lbaf_idx], dict,
f"ERROR : invalid elbaf entry: {elbafs[lbaf_idx]!r}")
return elbafs[lbaf_idx].get("pif", 0)
return 0

Expand Down Expand Up @@ -144,8 +148,13 @@ def _find_64b_guard_lbaf_index(self):
result = self.run_cmd(nvm_id_ns_cmd)
if result.returncode != 0:
return None
elbafs = json.loads(result.stdout).get("elbafs", [])
nvm_id_ns_data = self.parse_json_output(result.stdout, "nvme nvm-id-ns")
elbafs = nvm_id_ns_data.get("elbafs", [])
self.assertIsInstance(elbafs, list,
f"ERROR : nvm-id-ns returned invalid elbafs type: {type(elbafs).__name__}")
for i, elbaf in enumerate(elbafs):
self.assertIsInstance(elbaf, dict,
f"ERROR : invalid elbaf entry: {elbaf!r}")
if elbaf.get("pif", 0) == 2: # NVME_NVM_PIF_64B_GUARD = 2
return i
return None
Expand Down Expand Up @@ -215,7 +224,7 @@ def _enable_cdfe_for_format(self, desc_format):
result = self.run_cmd(get_features_cmd)
self.assertEqual(result.returncode, 0,
"ERROR : nvme feat host-behavior-support failed")
data = json.loads(result.stdout)
data = self.parse_json_output(result.stdout, "nvme feat host-behavior-support")
fields = data.get("Feature: 0x16", [{}])[0]
current_cdfe = (
(0x4 if fields.get("Copy Descriptor Format 2h Enable (CDF2E)") == "True" else 0) |
Expand Down
13 changes: 10 additions & 3 deletions tests/nvme_format_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
- Delete Namespace.
"""

import json
import logging
import math

Expand Down Expand Up @@ -111,8 +110,10 @@ def attach_detach_primary_ns(self):
f"--namespace-id={self.default_nsid} --output-format=json"
result = self.run_cmd(id_ns_cmd)
self.assertEqual(result.returncode, 0, "ERROR : nvme id-ns failed")
json_output = json.loads(result.stdout)
self.lba_format_list = json_output['lbafs']
json_output = self.parse_json_output(result.stdout, "nvme id-ns")
self.lba_format_list = json_output.get('lbafs', [])
self.assertIsInstance(self.lba_format_list, list,
f"ERROR : lbafs must be a list, got {type(self.lba_format_list).__name__}")
self.assertTrue(len(self.lba_format_list) > 0,
"ERROR : nvme id-ns could not find any lba formats")
self.assertEqual(self.detach_ns(self.ctrl_id, self.default_nsid), 0)
Expand All @@ -127,6 +128,12 @@ def test_format_ns(self):
print("##### Testing lba formats:")
# iterate through all supported format
for flbas, lba_format in enumerate(self.lba_format_list):
self.assertIsInstance(lba_format, dict,
f"ERROR : lba format entry must be dict, got {type(lba_format).__name__}: {lba_format!r}")
self.assertIn('ds', lba_format,
f"ERROR : lba format entry missing ds: {lba_format!r}")
self.assertIn('ms', lba_format,
f"ERROR : lba format entry missing ms: {lba_format!r}")
ds = lba_format['ds']
ms = lba_format['ms']
print(f"\nlba format {str(flbas)}"
Expand Down
104 changes: 82 additions & 22 deletions tests/nvme_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,34 @@ def run_cmd(self, cmd, stdin_data=None):
logger.debug(result.stderr)
return result

def parse_json_output(self, output, context, expected_type=dict):
"""Parse JSON output and fail test clearly on malformed or wrong-typed data.

context should identify the command/action that produced output.
Pass expected_type=None to skip type validation.
"""
try:
data = json.loads(output)
except (TypeError, json.JSONDecodeError) as exc:
self.fail(f"ERROR : invalid JSON from {context}: {exc}; output={output!r}")

if expected_type is not None and not isinstance(data, expected_type):
self.fail(
"ERROR : unexpected JSON type from "
f"{context}: expected {expected_type.__name__}, got {type(data).__name__}"
)
return data

def json_get(self, data, key, default=None, context="JSON output", required=False):
"""Return key from JSON dict and optionally fail if key is missing."""
if not isinstance(data, dict):
self.fail(
f"ERROR : expected JSON object for {context}, got {type(data).__name__}"
)
if required and key not in data:
self.fail(f"ERROR : missing key '{key}' in {context}: {data!r}")
return data.get(key, default)

def exec_cmd(self, cmd):
""" Wrapper for executing a shell command and return the result. """
return self.run_cmd(cmd).returncode
Expand Down Expand Up @@ -237,10 +265,18 @@ def get_ctrl_id(self):
"--output-format=json"
result = self.run_cmd(get_ctrl_id)
self.assertEqual(result.returncode, 0, "ERROR : nvme list-ctrl failed")
json_output = json.loads(result.stdout)
self.assertTrue(len(json_output['ctrl_list']) > 0,
json_output = self.parse_json_output(result.stdout, "nvme list-ctrl")
ctrl_list = self.json_get(json_output, 'ctrl_list', context="nvme list-ctrl", required=True)
self.assertIsInstance(ctrl_list, list,
"ERROR : nvme list-ctrl returned invalid ctrl_list type")
self.assertTrue(len(ctrl_list) > 0,
"ERROR : nvme list-ctrl could not find ctrl")
return str(json_output['ctrl_list'][0]['ctrl_id'])
first_ctrl = ctrl_list[0]
self.assertIsInstance(first_ctrl, dict,
"ERROR : nvme list-ctrl returned invalid controller entry")
self.assertIn('ctrl_id', first_ctrl,
f"ERROR : nvme list-ctrl missing ctrl_id: {first_ctrl!r}")
return str(first_ctrl['ctrl_id'])

def get_ns_mgmt_support(self):
"""
Expand Down Expand Up @@ -278,9 +314,16 @@ def get_nsid_list(self):
"--output-format=json"
result = self.run_cmd(ns_list_cmd)
self.assertEqual(result.returncode, 0, "ERROR : nvme list namespace failed")
json_output = json.loads(result.stdout)

for ns in json_output['nsid_list']:
json_output = self.parse_json_output(result.stdout, "nvme list-ns")

nsid_list = self.json_get(json_output, 'nsid_list', context="nvme list-ns", required=True)
self.assertIsInstance(nsid_list, list,
"ERROR : nvme list-ns returned invalid nsid_list type")
for ns in nsid_list:
self.assertIsInstance(ns, dict,
f"ERROR : nvme list-ns returned invalid namespace entry: {ns!r}")
self.assertIn('nsid', ns,
f"ERROR : nvme list-ns entry missing nsid: {ns!r}")
ns_list.append(ns['nsid'])

return ns_list
Expand All @@ -296,8 +339,10 @@ def get_max_ns(self):
"--output-format=json"
result = self.run_cmd(max_ns_cmd)
self.assertEqual(result.returncode, 0, "ERROR : reading maximum namespace count failed")
json_output = json.loads(result.stdout)
return int(json_output['nn'])
json_output = self.parse_json_output(result.stdout, "nvme id-ctrl")
nn = self.json_get(json_output, 'nn', context="nvme id-ctrl", required=True)
self.assertIsNotNone(nn, "ERROR : reading maximum namespace count failed")
return int(nn)

def get_lba_status_supported(self):
""" Check if 'Get LBA Status' command is supported by the device
Expand All @@ -320,9 +365,13 @@ def _get_active_lbaf_index(self):
"--output-format=json"
result = self.run_cmd(nvme_id_ns_cmd)
self.assertEqual(result.returncode, 0, "ERROR : reading id-ns")
json_output = json.loads(result.stdout)
json_output = self.parse_json_output(result.stdout, "nvme id-ns")
for lbaf in json_output.get('lbafs', []):
self.assertIsInstance(lbaf, dict,
f"ERROR : id-ns returned invalid lbaf entry: {lbaf!r}")
if lbaf.get('in_use') == 1:
self.assertIn('lbaf', lbaf,
f"ERROR : id-ns lbaf entry missing lbaf index: {lbaf!r}")
return int(lbaf['lbaf'])
return 0

Expand All @@ -339,7 +388,7 @@ def _get_ns_dps(self):
"--output-format=json"
result = self.run_cmd(nvme_id_ns_cmd)
self.assertEqual(result.returncode, 0, "ERROR : reading id-ns")
json_output = json.loads(result.stdout)
json_output = self.parse_json_output(result.stdout, "nvme id-ns")
return int(json_output.get('dps', 0))

def _get_pif(self):
Expand All @@ -361,7 +410,7 @@ def _get_pif(self):
"--output-format=json"
result = self.run_cmd(nvme_id_ns_cmd)
self.assertEqual(result.returncode, 0, "ERROR : reading id-ns")
json_output = json.loads(result.stdout)
json_output = self.parse_json_output(result.stdout, "nvme id-ns")
dps = int(json_output.get('dps', 0))
return (dps >> 3) & 0x7

Expand All @@ -375,7 +424,7 @@ def _is_metadata_ext(self):
"--output-format=json"
result = self.run_cmd(nvme_id_ns_cmd)
self.assertEqual(result.returncode, 0, "ERROR : reading id-ns")
json_output = json.loads(result.stdout)
json_output = self.parse_json_output(result.stdout, "nvme id-ns")
flbas = int(json_output.get('flbas', 0))
return bool(flbas & (1 << 4))

Expand All @@ -390,10 +439,17 @@ def get_lba_format_size(self):
"--output-format=json"
result = self.run_cmd(nvme_id_ns_cmd)
self.assertEqual(result.returncode, 0, "ERROR : reading id-ns")
json_output = json.loads(result.stdout)
self.assertTrue(len(json_output['lbafs']) > self.flbas,
"Error : could not match the given flbas to an existing lbaf")
lbaf_json = json_output['lbafs'][int(self.flbas)]
json_output = self.parse_json_output(result.stdout, "nvme id-ns")
lbafs = self.json_get(json_output, 'lbafs', context="nvme id-ns", required=True)
self.assertIsInstance(lbafs, list,
f"ERROR : id-ns returned invalid lbafs type, expected list, got {type(lbafs).__name__}")
self.assertTrue(len(lbafs) > self.flbas,
"ERROR : could not match the given flbas to an existing lbaf")
lbaf_json = lbafs[int(self.flbas)]
self.assertIsInstance(lbaf_json, dict,
f"ERROR : id-ns returned invalid lbaf entry, expected dict, got {type(lbaf_json).__name__}")
self.assertIn('ms', lbaf_json, "ERROR : id-ns lbaf missing 'ms'")
self.assertIn('ds', lbaf_json, "ERROR : id-ns lbaf missing 'ds'")
ms = int(lbaf_json['ms'])
ds_expo = int(lbaf_json['ds'])
ds = (1 << ds_expo) if ds_expo > 0 else 0
Expand All @@ -419,7 +475,7 @@ def get_id_ctrl_field_value(self, field):
"--output-format=json"
result = self.run_cmd(id_ctrl_cmd)
self.assertEqual(result.returncode, 0, "ERROR : reading id-ctrl failed")
json_output = json.loads(result.stdout)
json_output = self.parse_json_output(result.stdout, "nvme id-ctrl")
self.assertTrue(field in json_output,
f"ERROR : reading field '{field}' failed")
return str(json_output[field])
Expand All @@ -435,7 +491,7 @@ def get_id_ns_field_value(self, field):
"--output-format=json"
result = self.run_cmd(id_ns_cmd)
self.assertEqual(result.returncode, 0, "ERROR : reading id-ns failed")
json_output = json.loads(result.stdout)
json_output = self.parse_json_output(result.stdout, "nvme id-ns")
self.assertTrue(field in json_output,
f"ERROR : reading field '{field}' failed")
return str(json_output[field])
Expand Down Expand Up @@ -463,8 +519,11 @@ def delete_all_ns(self):
"--output-format=json"
result = self.run_cmd(list_ns_cmd)
self.assertEqual(result.returncode, 0, "ERROR : nvme list-ns failed")
json_output = json.loads(result.stdout)
self.assertEqual(len(json_output['nsid_list']), 0,
json_output = self.parse_json_output(result.stdout, "nvme list-ns")
nsid_list = self.json_get(json_output, 'nsid_list', context="nvme list-ns", required=True)
self.assertIsInstance(nsid_list, list,
"ERROR : nvme list-ns returned invalid nsid_list type")
self.assertEqual(len(nsid_list), 0,
"ERROR : deleting all namespace failed")

def create_ns(self, nsze, ncap, flbas, dps):
Expand Down Expand Up @@ -496,8 +555,9 @@ def create_and_validate_ns(self, nsid, nsze, ncap, flbas, dps):
"""
err, stdout = self.create_ns(nsze, ncap, flbas, dps)
if err == 0:
json_output = json.loads(stdout)
self.assertEqual(int(json_output['nsid']), nsid,
json_output = self.parse_json_output(stdout, "nvme create-ns")
created_nsid = self.json_get(json_output, "nsid", "nvme create-ns", required=True)
self.assertEqual(int(created_nsid), nsid,
"ERROR : create namespace failed")
id_ns_cmd = f"{self.nvme_bin} id-ns {self.ctrl} " + \
f"--namespace-id={str(nsid)}"
Expand Down
Loading