diff --git a/google/cloud/bigtable/row.py b/google/cloud/bigtable/row.py index 031e2b6b9..ab24a4ff1 100644 --- a/google/cloud/bigtable/row.py +++ b/google/cloud/bigtable/row.py @@ -14,6 +14,8 @@ """User-friendly container for Google Cloud Bigtable Row.""" +from google.api_core.exceptions import GoogleAPICallError + from google.cloud._helpers import _datetime_from_microseconds # type: ignore from google.cloud._helpers import _microseconds_from_datetime # type: ignore from google.cloud._helpers import _to_bytes # type: ignore @@ -21,6 +23,9 @@ from google.cloud.bigtable.data import mutations from google.cloud.bigtable.data import read_modify_write_rules as rmw_rules +from google.rpc import code_pb2 +from google.rpc import status_pb2 + MAX_MUTATIONS = 100000 """The maximum number of mutations that a row can accumulate.""" @@ -440,7 +445,9 @@ def delete_cells(self, column_family_id, columns, time_range=None): def commit(self): """Makes a ``MutateRow`` API request. - If no mutations have been created in the row, no request is made. + If no mutations have been created in the row, no request is made and a + :class:`~google.rpc.status_pb2.Status` with code INVALID_ARGUMENT is returned + instead. Mutations are applied atomically and in order, meaning that earlier mutations can be masked / negated by later ones. Cells already present @@ -459,14 +466,27 @@ def commit(self): :rtype: :class:`~google.rpc.status_pb2.Status` :returns: A response status (`google.rpc.status_pb2.Status`) representing success or failure of the row committed. - :raises: :exc:`~.table.TooManyMutationsError` if the number of - mutations is greater than 100,000. """ - response = self._table.mutate_rows([self]) - - self.clear() - - return response[0] + try: + self._table._table_impl.mutate_row(self.row_key, self._get_mutations()) + return status_pb2.Status(code=code_pb2.OK) + except GoogleAPICallError as e: + # If the RPC call returns an error, extract the error into a status object, if possible. + return status_pb2.Status( + code=e.grpc_status_code.value[0] + if e.grpc_status_code is not None + else code_pb2.UNKNOWN, + message=e.message, + details=e.details, + ) + except ValueError as e: + # _table_impl.mutate_row raises a ValueError if invalid arguments are provided. + return status_pb2.Status( + code=code_pb2.INVALID_ARGUMENT, + message=str(e), + ) + finally: + self.clear() def clear(self): """Removes all currently accumulated mutations on the current row. diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index e6a15d95e..1e267c058 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -219,6 +219,37 @@ def test_table_read_rows_filter_millis(data_table): row_data.consume_all() +def test_table_direct_row_commit(data_table, rows_to_delete): + from google.rpc import code_pb2 + + row = data_table.direct_row(ROW_KEY) + + # Test set cell + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME2, CELL_VAL1) + status = row.commit() + rows_to_delete.append(row) + assert status.code == code_pb2.Code.OK + row_data = data_table.read_row(ROW_KEY) + assert row_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == CELL_VAL1 + assert row_data.cells[COLUMN_FAMILY_ID1][COL_NAME2][0].value == CELL_VAL1 + + # Test delete cell + row.delete_cell(COLUMN_FAMILY_ID1, COL_NAME1) + status = row.commit() + assert status.code == code_pb2.Code.OK + row_data = data_table.read_row(ROW_KEY) + assert COL_NAME1 not in row_data.cells[COLUMN_FAMILY_ID1] + assert row_data.cells[COLUMN_FAMILY_ID1][COL_NAME2][0].value == CELL_VAL1 + + # Test delete row + row.delete() + status = row.commit() + assert status.code == code_pb2.Code.OK + row_data = data_table.read_row(ROW_KEY) + assert row_data is None + + def test_table_mutate_rows(data_table, rows_to_delete): row1 = data_table.direct_row(ROW_KEY) row1.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) @@ -1028,7 +1059,6 @@ def test_table_sample_row_keys(data_table, skip_on_emulator): def test_table_direct_row_input_errors(data_table, rows_to_delete): - from google.api_core.exceptions import InvalidArgument from google.cloud.bigtable.row import MAX_MUTATIONS row = data_table.direct_row(ROW_KEY) @@ -1054,20 +1084,18 @@ def test_table_direct_row_input_errors(data_table, rows_to_delete): with pytest.raises(TypeError): row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, FLOAT_CELL_VAL) - # Can't have more than MAX_MUTATIONS mutations, but only enforced after - # a row.commit + # Can't have more than MAX_MUTATIONS mutations, enforced on server side now. row.clear() for _ in range(0, MAX_MUTATIONS + 1): row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) - with pytest.raises(ValueError): - row.commit() + resp = row.commit() + assert resp.code == StatusCode.INVALID_ARGUMENT.value[0] - # Not having any mutations gives a server error (InvalidArgument), not - # enforced on the client side. + # Not having any mutations gives an INVALID_ARGUMENT row.clear() - with pytest.raises(InvalidArgument): - row.commit() + resp = row.commit() + assert resp.code == StatusCode.INVALID_ARGUMENT.value[0] def test_table_conditional_row_input_errors(data_table, rows_to_delete): diff --git a/tests/unit/v2_client/test_row.py b/tests/unit/v2_client/test_row.py index d5b80d6f8..47a521cd1 100644 --- a/tests/unit/v2_client/test_row.py +++ b/tests/unit/v2_client/test_row.py @@ -368,44 +368,138 @@ def test_direct_row_delete_cells_with_string_columns(): def test_direct_row_commit(): + from google.cloud.bigtable_v2.services.bigtable import BigtableClient + from google.rpc import code_pb2, status_pb2 + project_id = "project-id" row_key = b"row_key" table_name = "projects/more-stuff" + app_profile_id = "app_profile_id" column_family_id = "column_family_id" column = b"column" credentials = _make_credentials() client = _make_client(project=project_id, credentials=credentials, admin=True) - table = _Table(table_name, client=client) + table = _Table(table_name, client=client, app_profile_id=app_profile_id) row = _make_direct_row(row_key, table) value = b"bytes-value" + # Set mock + api = mock.create_autospec(BigtableClient) + response_pb = _MutateRowResponsePB() + api.mutate_row.side_effect = [response_pb] + client.table_data_client + client._table_data_client._gapic_client = api + # Perform the method and check the result. row.set_cell(column_family_id, column, value) - row.commit() - assert table.mutated_rows == [row] + response = row.commit() + assert row._mutations == [] + assert response == status_pb2.Status(code=code_pb2.OK) + call_args = api.mutate_row.call_args + assert app_profile_id == call_args.app_profile_id[0] def test_direct_row_commit_with_exception(): - from google.rpc import status_pb2 + from google.api_core.exceptions import InternalServerError + from google.cloud.bigtable_v2.services.bigtable import BigtableClient + from google.rpc import code_pb2, status_pb2 project_id = "project-id" row_key = b"row_key" table_name = "projects/more-stuff" + app_profile_id = "app_profile_id" column_family_id = "column_family_id" column = b"column" credentials = _make_credentials() client = _make_client(project=project_id, credentials=credentials, admin=True) - table = _Table(table_name, client=client) + table = _Table(table_name, client=client, app_profile_id=app_profile_id) row = _make_direct_row(row_key, table) value = b"bytes-value" + # Set mock + api = mock.create_autospec(BigtableClient) + exception_message = "Boom!" + exception = InternalServerError(exception_message) + api.mutate_row.side_effect = [exception] + client.table_data_client + client._table_data_client._gapic_client = api + # Perform the method and check the result. row.set_cell(column_family_id, column, value) result = row.commit() - expected = status_pb2.Status(code=0) - assert result == expected + assert row._mutations == [] + assert result == status_pb2.Status( + code=code_pb2.Code.INTERNAL, message=exception_message + ) + call_args = api.mutate_row.call_args + assert app_profile_id == call_args.app_profile_id[0] + + +def test_direct_row_commit_with_unknown_exception(): + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.bigtable_v2.services.bigtable import BigtableClient + from google.rpc import code_pb2, status_pb2 + + project_id = "project-id" + row_key = b"row_key" + table_name = "projects/more-stuff" + app_profile_id = "app_profile_id" + column_family_id = "column_family_id" + column = b"column" + + credentials = _make_credentials() + client = _make_client(project=project_id, credentials=credentials, admin=True) + table = _Table(table_name, client=client, app_profile_id=app_profile_id) + row = _make_direct_row(row_key, table) + value = b"bytes-value" + + # Set mock + api = mock.create_autospec(BigtableClient) + exception_message = "Boom!" + exception = GoogleAPICallError(message=exception_message) + api.mutate_row.side_effect = [exception] + client.table_data_client + client._table_data_client._gapic_client = api + + # Perform the method and check the result. + row.set_cell(column_family_id, column, value) + result = row.commit() + assert row._mutations == [] + assert result == status_pb2.Status( + code=code_pb2.Code.UNKNOWN, message=exception_message + ) + call_args = api.mutate_row.call_args + assert app_profile_id == call_args.app_profile_id[0] + + +def test_direct_row_commit_with_invalid_argument(): + from google.cloud.bigtable_v2.services.bigtable import BigtableClient + from google.rpc import code_pb2, status_pb2 + + project_id = "project-id" + row_key = b"row_key" + table_name = "projects/more-stuff" + app_profile_id = "app_profile_id" + + credentials = _make_credentials() + client = _make_client(project=project_id, credentials=credentials, admin=True) + table = _Table(table_name, client=client, app_profile_id=app_profile_id) + row = _make_direct_row(row_key, table) + + # Set mock + api = mock.create_autospec(BigtableClient) + client.table_data_client + client._table_data_client._gapic_client = api + + # Perform the method and check the result. + result = row.commit() + assert row._mutations == [] + assert result == status_pb2.Status( + code=code_pb2.Code.INVALID_ARGUMENT, message="No mutations provided" + ) + api.mutate_row.assert_not_called() def _make_conditional_row(*args, **kwargs): @@ -729,6 +823,12 @@ def test__parse_rmw_row_response(): assert expected_output == _parse_rmw_row_response(sample_input) +def _MutateRowResponsePB(): + from google.cloud.bigtable_v2.types import bigtable as messages_v2_pb2 + + return messages_v2_pb2.MutateRowResponse() + + def _CheckAndMutateRowResponsePB(*args, **kw): from google.cloud.bigtable_v2.types import bigtable as messages_v2_pb2 @@ -778,16 +878,9 @@ def __init__(self, name, client=None, app_profile_id=None): self._instance = _Instance(client) self._app_profile_id = app_profile_id self.client = client - self.mutated_rows = [] self._table_impl = self._instance._client._veneer_data_client.get_table( _INSTANCE_ID, self.name, app_profile_id=self._app_profile_id, ) - - def mutate_rows(self, rows): - from google.rpc import status_pb2 - - self.mutated_rows.extend(rows) - return [status_pb2.Status(code=0)]