diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 1595799c..43d56ee0 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -1,7 +1,10 @@ package org.hypertrace.core.documentstore; import static org.hypertrace.core.documentstore.utils.Utils.readFileFromResource; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -10,11 +13,22 @@ import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import org.hypertrace.core.documentstore.expression.impl.ConstantExpression; +import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression; +import org.hypertrace.core.documentstore.expression.impl.RelationalExpression; +import org.hypertrace.core.documentstore.expression.operators.RelationalOperator; +import org.hypertrace.core.documentstore.model.options.ReturnDocumentType; +import org.hypertrace.core.documentstore.model.options.UpdateOptions; +import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; +import org.hypertrace.core.documentstore.model.subdoc.UpdateOperator; import org.hypertrace.core.documentstore.postgres.PostgresDatastore; +import org.hypertrace.core.documentstore.query.Query; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -322,6 +336,256 @@ void testUpdateWithCondition() { } } + @Nested + @DisplayName("SubDocument Update Operations") + class SubDocUpdateTests { + + @Nested + @DisplayName("SET Operator Tests") + class SetOperatorTests { + + @Test + @DisplayName("Should update top-level column with SET operator") + void testUpdateTopLevelColumn() throws Exception { + // Update the price of item with id = 1 + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("price", 999)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + Optional result = flatCollection.update(query, updates, options); + + assertTrue(result.isPresent()); + JsonNode resultJson = OBJECT_MAPPER.readTree(result.get().toJson()); + assertEquals(999, resultJson.get("price").asInt()); + + // Verify in database + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"price\" FROM \"%s\" WHERE \"id\" = '1'", FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals(999, rs.getInt("price")); + } + } + + @Test + @DisplayName("Should update multiple top-level columns in single update") + void testUpdateMultipleColumns() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("2"))) + .build(); + + List updates = + List.of(SubDocumentUpdate.of("price", 555), SubDocumentUpdate.of("quantity", 100)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + Optional result = flatCollection.update(query, updates, options); + + assertTrue(result.isPresent()); + JsonNode resultJson = OBJECT_MAPPER.readTree(result.get().toJson()); + assertEquals(555, resultJson.get("price").asInt()); + assertEquals(100, resultJson.get("quantity").asInt()); + } + + @Test + @DisplayName("Should update nested path in JSONB column") + void testUpdateNestedJsonbPath() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("3"))) + .build(); + + // Update props.brand nested path + List updates = + List.of(SubDocumentUpdate.of("props.brand", "UpdatedBrand")); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + Optional result = flatCollection.update(query, updates, options); + + assertTrue(result.isPresent()); + JsonNode resultJson = OBJECT_MAPPER.readTree(result.get().toJson()); + assertNotNull(resultJson.get("props")); + assertEquals("UpdatedBrand", resultJson.get("props").get("brand").asText()); + } + + @Test + @DisplayName("Should return BEFORE_UPDATE document") + void testUpdateReturnsBeforeDocument() throws Exception { + // First get the current price + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("4"))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("price", 777)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.BEFORE_UPDATE).build(); + + Optional result = flatCollection.update(query, updates, options); + + assertTrue(result.isPresent()); + JsonNode resultJson = OBJECT_MAPPER.readTree(result.get().toJson()); + // Should return the old price (5 from initial data), not the new one (777) + assertEquals(5, resultJson.get("price").asInt()); + + // But database should have the new value + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"price\" FROM \"%s\" WHERE \"id\" = '4'", FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals(777, rs.getInt("price")); + } + } + } + + @Test + @DisplayName("Should return empty when no document matches query") + void testUpdateNoMatch() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("9999"))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("price", 100)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + Optional result = flatCollection.update(query, updates, options); + + assertTrue(result.isEmpty()); + } + + @Test + @DisplayName("Should throw IOException when column does not exist") + void testUpdateNonExistentColumn() { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("_id"), + RelationalOperator.EQ, + ConstantExpression.of(1))) + .build(); + + List updates = + List.of(SubDocumentUpdate.of("nonexistent_column", "value")); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + assertThrows(IOException.class, () -> flatCollection.update(query, updates, options)); + } + + @Test + @DisplayName("Should throw IOException when nested path on non-JSONB column") + void testUpdateNestedPathOnNonJsonbColumn() { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("_id"), + RelationalOperator.EQ, + ConstantExpression.of(1))) + .build(); + + // "item" is TEXT, not JSONB - nested path should fail + List updates = List.of(SubDocumentUpdate.of("item.nested", "value")); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + assertThrows(IOException.class, () -> flatCollection.update(query, updates, options)); + } + + @Test + @DisplayName("Should throw IOException for unsupported operator") + void testUpdateUnsupportedOperator() { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("_id"), + RelationalOperator.EQ, + ConstantExpression.of(1))) + .build(); + + // UNSET is not supported yet + List updates = + List.of( + SubDocumentUpdate.builder() + .subDocument("price") + .operator(UpdateOperator.UNSET) + .build()); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + assertThrows(IOException.class, () -> flatCollection.update(query, updates, options)); + } + + @Test + @DisplayName("Should throw UnsupportedOperationException for bulkUpdate") + void testBulkUpdate() { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("price"), + RelationalOperator.GT, + ConstantExpression.of(5))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("price", 100)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + assertThrows( + UnsupportedOperationException.class, + () -> flatCollection.bulkUpdate(query, updates, options)); + } + } + @Nested @DisplayName("Drop Operations") class DropTests { diff --git a/document-store/src/integrationTest/resources/query/pg_flat_collection_insert.json b/document-store/src/integrationTest/resources/query/pg_flat_collection_insert.json index 2ca68949..895a951c 100644 --- a/document-store/src/integrationTest/resources/query/pg_flat_collection_insert.json +++ b/document-store/src/integrationTest/resources/query/pg_flat_collection_insert.json @@ -1,14 +1,14 @@ { "statements": [ - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n1, 'Soap', 10, 2, '2014-03-01T08:00:00Z', true,\n'{\"hygiene\", \"personal-care\", \"premium\"}',\n'{\"Hygiene\", \"PersonalCare\"}',\n'{\"colors\": [\"Blue\", \"Green\"], \"brand\": \"Dettol\", \"size\": \"M\", \"product-code\": \"SOAP-DET-001\", \"source-loc\": [\"warehouse-A\", \"store-1\"], \"seller\": {\"name\": \"Metro Chemicals Pvt. Ltd.\", \"address\": {\"city\": \"Mumbai\", \"pincode\": 400004}}}',\nNULL,\n'{1, 2, 3}',\n'{4.5, 9.2}',\n'{true, false}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n2, 'Mirror', 20, 1, '2014-03-01T09:00:00Z', true,\n'{\"home-decor\", \"reflective\", \"glass\"}',\n'{\"HomeDecor\"}',\nNULL,\nNULL,\n'{10, 20}',\n'{1.5, 2.5, 3.5}',\n'{false, false}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n3, 'Shampoo', 5, 10, '2014-03-15T09:00:00Z', true,\n'{\"hair-care\", \"personal-care\", \"premium\", \"herbal\"}',\n'{\"HairCare\", \"PersonalCare\"}',\n'{\"colors\": [\"Black\"], \"brand\": \"Sunsilk\", \"size\": \"L\", \"product-code\": \"SHAMP-SUN-003\", \"source-loc\": [\"warehouse-B\", \"store-2\", \"online\"], \"seller\": {\"name\": \"Metro Chemicals Pvt. Ltd.\", \"address\": {\"city\": \"Mumbai\", \"pincode\": 400004}}}',\nNULL,\n'{5, 10, 15}',\n'{3.14, 2.71}',\n'{true, false, true}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n4, 'Shampoo', 5, 20, '2014-04-04T11:21:39.736Z', false,\n'{\"hair-care\", \"budget\", \"bulk\"}',\n'{\"HairCare\"}',\nNULL,\nNULL,\n'{1, 2}',\n'{5.0, 10.0}',\n'{true, true}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n5, 'Soap', 20, 5, '2014-04-04T21:23:13.331Z', true,\n'{\"hygiene\", \"antibacterial\", \"family-pack\"}',\n'{\"Hygiene\"}',\n'{\"colors\": [\"Orange\", \"Blue\"], \"brand\": \"Lifebuoy\", \"size\": \"S\", \"product-code\": \"SOAP-LIF-005\", \"source-loc\": [\"warehouse-C\"], \"seller\": {\"name\": \"Hans and Co.\", \"address\": {\"city\": \"Kolkata\", \"pincode\": 700007}}}',\nNULL,\n'{3, 6, 9}',\n'{7.5}',\n'{false}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n6, 'Comb', 7.5, 5, '2015-06-04T05:08:13Z', true,\n'{\"grooming\", \"plastic\", \"essential\"}',\n'{\"Grooming\"}',\nNULL,\nNULL,\n'{20, 30}',\n'{6.0, 8.0}',\n'{true, false}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n7, 'Comb', 7.5, 10, '2015-09-10T08:43:00Z', false,\n'{\"grooming\", \"bulk\", \"wholesale\"}',\n'{\"Grooming\"}',\n'{\"colors\": [], \"product-code\": null, \"source-loc\": null, \"seller\": {\"name\": \"Go Go Plastics\", \"address\": {\"city\": \"Kolkata\", \"pincode\": 700007}}}',\nNULL,\n'{10}',\n'{3.0}',\n'{false, false, false}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n8, 'Soap', 10, 5, '2016-02-06T20:20:13Z', true,\n'{\"hygiene\", \"budget\", \"basic\"}',\n'{\"Hygiene\"}',\nNULL,\nNULL,\n'{1, 10, 20}',\n'{2.5, 5.0}',\n'{true}'\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n9, 'Bottle', 15, 3, '2016-03-01T10:00:00Z', false,\nNULL,\nNULL,\nNULL,\nNULL,\nNULL,\nNULL,\nNULL\n)", - "INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n10, 'Cup', 8, 2, '2016-04-01T10:00:00Z', true,\n'{}',\n'{}',\nNULL,\nNULL,\nNULL,\nNULL,\nNULL\n)" + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'1', 'Soap', 10, 2, '2014-03-01T08:00:00Z', true,\n'{\"hygiene\", \"personal-care\", \"premium\"}',\n'{\"Hygiene\", \"PersonalCare\"}',\n'{\"colors\": [\"Blue\", \"Green\"], \"brand\": \"Dettol\", \"size\": \"M\", \"product-code\": \"SOAP-DET-001\", \"source-loc\": [\"warehouse-A\", \"store-1\"], \"seller\": {\"name\": \"Metro Chemicals Pvt. Ltd.\", \"address\": {\"city\": \"Mumbai\", \"pincode\": 400004}}}',\nNULL,\n'{1, 2, 3}',\n'{4.5, 9.2}',\n'{true, false}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'2', 'Mirror', 20, 1, '2014-03-01T09:00:00Z', true,\n'{\"home-decor\", \"reflective\", \"glass\"}',\n'{\"HomeDecor\"}',\nNULL,\nNULL,\n'{10, 20}',\n'{1.5, 2.5, 3.5}',\n'{false, false}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'3', 'Shampoo', 5, 10, '2014-03-15T09:00:00Z', true,\n'{\"hair-care\", \"personal-care\", \"premium\", \"herbal\"}',\n'{\"HairCare\", \"PersonalCare\"}',\n'{\"colors\": [\"Black\"], \"brand\": \"Sunsilk\", \"size\": \"L\", \"product-code\": \"SHAMP-SUN-003\", \"source-loc\": [\"warehouse-B\", \"store-2\", \"online\"], \"seller\": {\"name\": \"Metro Chemicals Pvt. Ltd.\", \"address\": {\"city\": \"Mumbai\", \"pincode\": 400004}}}',\nNULL,\n'{5, 10, 15}',\n'{3.14, 2.71}',\n'{true, false, true}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'4', 'Shampoo', 5, 20, '2014-04-04T11:21:39.736Z', false,\n'{\"hair-care\", \"budget\", \"bulk\"}',\n'{\"HairCare\"}',\nNULL,\nNULL,\n'{1, 2}',\n'{5.0, 10.0}',\n'{true, true}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'5', 'Soap', 20, 5, '2014-04-04T21:23:13.331Z', true,\n'{\"hygiene\", \"antibacterial\", \"family-pack\"}',\n'{\"Hygiene\"}',\n'{\"colors\": [\"Orange\", \"Blue\"], \"brand\": \"Lifebuoy\", \"size\": \"S\", \"product-code\": \"SOAP-LIF-005\", \"source-loc\": [\"warehouse-C\"], \"seller\": {\"name\": \"Hans and Co.\", \"address\": {\"city\": \"Kolkata\", \"pincode\": 700007}}}',\nNULL,\n'{3, 6, 9}',\n'{7.5}',\n'{false}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'6', 'Comb', 7.5, 5, '2015-06-04T05:08:13Z', true,\n'{\"grooming\", \"plastic\", \"essential\"}',\n'{\"Grooming\"}',\nNULL,\nNULL,\n'{20, 30}',\n'{6.0, 8.0}',\n'{true, false}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'7', 'Comb', 7.5, 10, '2015-09-10T08:43:00Z', false,\n'{\"grooming\", \"bulk\", \"wholesale\"}',\n'{\"Grooming\"}',\n'{\"colors\": [], \"product-code\": null, \"source-loc\": null, \"seller\": {\"name\": \"Go Go Plastics\", \"address\": {\"city\": \"Kolkata\", \"pincode\": 700007}}}',\nNULL,\n'{10}',\n'{3.0}',\n'{false, false, false}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'8', 'Soap', 10, 5, '2016-02-06T20:20:13Z', true,\n'{\"hygiene\", \"budget\", \"basic\"}',\n'{\"Hygiene\"}',\nNULL,\nNULL,\n'{1, 10, 20}',\n'{2.5, 5.0}',\n'{true}'\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'9', 'Bottle', 15, 3, '2016-03-01T10:00:00Z', false,\nNULL,\nNULL,\nNULL,\nNULL,\nNULL,\nNULL,\nNULL\n)", + "INSERT INTO \"myTestFlat\" (\n\"id\", \"item\", \"price\", \"quantity\", \"date\", \"in_stock\", \"tags\", \"categoryTags\", \"props\", \"sales\", \"numbers\", \"scores\", \"flags\"\n) VALUES (\n'10', 'Cup', 8, 2, '2016-04-01T10:00:00Z', true,\n'{}',\n'{}',\nNULL,\nNULL,\nNULL,\nNULL,\nNULL\n)" ] } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 747b63bd..e4ead453 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -1,6 +1,17 @@ package org.hypertrace.core.documentstore.postgres; +import static org.hypertrace.core.documentstore.model.options.ReturnDocumentType.AFTER_UPDATE; +import static org.hypertrace.core.documentstore.model.options.ReturnDocumentType.BEFORE_UPDATE; +import static org.hypertrace.core.documentstore.model.options.ReturnDocumentType.NONE; +import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.SET; + import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -12,14 +23,22 @@ import org.hypertrace.core.documentstore.CloseableIterator; import org.hypertrace.core.documentstore.CreateResult; import org.hypertrace.core.documentstore.Document; +import org.hypertrace.core.documentstore.DocumentType; import org.hypertrace.core.documentstore.Filter; import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.UpdateResult; import org.hypertrace.core.documentstore.model.options.QueryOptions; +import org.hypertrace.core.documentstore.model.options.ReturnDocumentType; import org.hypertrace.core.documentstore.model.options.UpdateOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; +import org.hypertrace.core.documentstore.model.subdoc.UpdateOperator; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; import org.hypertrace.core.documentstore.postgres.query.v1.transformer.FlatPostgresFieldTransformer; +import org.hypertrace.core.documentstore.postgres.update.FlatUpdateContext; +import org.hypertrace.core.documentstore.postgres.update.parser.FlatSetOperatorParser; +import org.hypertrace.core.documentstore.postgres.update.parser.FlatUpdateOperatorParser; import org.hypertrace.core.documentstore.query.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +56,9 @@ public class FlatPostgresCollection extends PostgresCollection { private static final String WRITE_NOT_SUPPORTED = "Write operations are not supported for flat collections yet!"; + private static final Map OPERATOR_PARSERS = + Map.of(SET, new FlatSetOperatorParser()); + private final PostgresLazyilyLoadedSchemaRegistry schemaRegistry; FlatPostgresCollection( @@ -174,10 +196,163 @@ public UpdateResult update(Key key, Document document, Filter condition) throws @Override public Optional update( org.hypertrace.core.documentstore.query.Query query, - java.util.Collection updates, + Collection updates, UpdateOptions updateOptions) throws IOException { - throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); + + if (updates == null || updates.isEmpty()) { + throw new IOException("Updates collection cannot be null or empty"); + } + + String tableName = tableIdentifier.getTableName(); + + try (Connection connection = client.getTransactionalConnection()) { + try { + // 1. Validate all columns exist and operators are supported + validateUpdates(updates, tableName); + + // 2. Get before-document if needed + Optional beforeDoc = Optional.empty(); + ReturnDocumentType returnType = updateOptions.getReturnDocumentType(); + if (returnType == BEFORE_UPDATE || returnType == AFTER_UPDATE) { + beforeDoc = selectFirstDocument(connection, query); + } + + if (beforeDoc.isEmpty() && returnType != NONE) { + connection.commit(); + return Optional.empty(); + } + + // 3. Build and execute UPDATE + executeUpdate(connection, query, updates, tableName); + + // 4. Resolve return document based on options + Document returnDoc = null; + if (returnType == BEFORE_UPDATE) { + returnDoc = beforeDoc.orElse(null); + } else if (returnType == AFTER_UPDATE) { + returnDoc = selectFirstDocument(connection, query).orElse(null); + } + + connection.commit(); + return Optional.ofNullable(returnDoc); + + } catch (Exception e) { + connection.rollback(); + throw e; + } + } catch (SQLException e) { + LOGGER.error("SQLException during update operation", e); + throw new IOException(e); + } + } + + private void validateUpdates(Collection updates, String tableName) + throws IOException { + for (SubDocumentUpdate update : updates) { + UpdateOperator operator = update.getOperator(); + + // Check operator is supported + if (!OPERATOR_PARSERS.containsKey(operator)) { + throw new IOException("Unsupported update operator: " + operator); + } + + // Check column exists + String path = update.getSubDocument().getPath(); + String rootColumn = path.contains(".") ? path.split("\\.")[0] : path; + + Optional colMeta = + schemaRegistry.getColumnOrRefresh(tableName, rootColumn); + + if (colMeta.isEmpty()) { + throw new IOException("Column not found in schema: " + rootColumn); + } + + // For nested paths, root column must be JSONB + if (path.contains(".") && colMeta.get().getPostgresType() != PostgresDataType.JSONB) { + throw new IOException( + "Nested path updates require JSONB column, but column '" + + rootColumn + + "' is of type: " + + colMeta.get().getPostgresType()); + } + } + } + + private Optional selectFirstDocument(Connection connection, Query query) + throws SQLException, IOException { + PostgresQueryParser parser = createParser(query); + String selectQuery = parser.buildSelectQueryForUpdate(); + + try (PreparedStatement ps = + queryExecutor.buildPreparedStatement( + selectQuery, parser.getParamsBuilder().build(), connection)) { + return getFirstDocumentForFlat(ps.executeQuery()); + } + } + + private Optional getFirstDocumentForFlat(ResultSet resultSet) throws IOException { + CloseableIterator iterator = + new PostgresResultIteratorWithBasicTypes(resultSet, DocumentType.FLAT); + return getFirstDocument(iterator); + } + + private void executeUpdate( + Connection connection, Query query, Collection updates, String tableName) + throws SQLException { + + // Build WHERE clause + PostgresQueryParser filterParser = createParser(query); + String filterClause = filterParser.buildFilterClause(); + Params filterParams = filterParser.getParamsBuilder().build(); + + // Build SET clause fragments + List setFragments = new ArrayList<>(); + List params = new ArrayList<>(); + + for (SubDocumentUpdate update : updates) { + String path = update.getSubDocument().getPath(); + String rootColumn = path.contains(".") ? path.split("\\.")[0] : path; + String[] nestedPath = + path.contains(".") ? path.substring(path.indexOf(".") + 1).split("\\.") : new String[0]; + + PostgresColumnMetadata colMeta = + schemaRegistry.getColumnOrRefresh(tableName, rootColumn).orElseThrow(); + + FlatUpdateContext context = + FlatUpdateContext.builder() + .columnName(rootColumn) + .nestedPath(nestedPath) + .columnType(colMeta.getPostgresType()) + .value(update.getSubDocumentValue()) + .params(params) + .build(); + + FlatUpdateOperatorParser operatorParser = OPERATOR_PARSERS.get(update.getOperator()); + String fragment = operatorParser.parse(context); + setFragments.add(fragment); + } + + // Build final UPDATE SQL + String sql = + String.format( + "UPDATE %s SET %s %s", tableIdentifier, String.join(", ", setFragments), filterClause); + + LOGGER.debug("Executing update SQL: {}", sql); + + try (PreparedStatement ps = connection.prepareStatement(sql)) { + int idx = 1; + // Add SET clause params + for (Object param : params) { + ps.setObject(idx++, param); + } + // Add WHERE clause params + for (Object param : filterParams.getObjectParams().values()) { + ps.setObject(idx++, param); + } + int rowsUpdated = ps.executeUpdate(); + LOGGER.debug("Rows updated: {}", rowsUpdated); + } } @Override diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/FlatUpdateContext.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/FlatUpdateContext.java new file mode 100644 index 00000000..5537c974 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/FlatUpdateContext.java @@ -0,0 +1,43 @@ +package org.hypertrace.core.documentstore.postgres.update; + +import java.util.List; +import lombok.Builder; +import lombok.Value; +import org.hypertrace.core.documentstore.model.subdoc.SubDocumentValue; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; + +/** + * Context object containing all information needed to generate SQL for a single field update in + * flat collections. + */ +@Value +@Builder +public class FlatUpdateContext { + /** The column name in the database (e.g., "price", "props") */ + String columnName; + + /** + * The nested path within a JSONB column, empty array for top-level columns. For example, for + * "props.seller.name", columnName would be "props" and nestedPath would be ["seller", "name"]. + */ + String[] nestedPath; + + /** The PostgreSQL data type of the column */ + PostgresDataType columnType; + + /** The value to set/update */ + SubDocumentValue value; + + /** Accumulator for prepared statement parameters (mutable) */ + List params; + + /** Returns true if this is a top-level column update (no nested path) */ + public boolean isTopLevel() { + return nestedPath == null || nestedPath.length == 0; + } + + /** Returns true if the column is a JSONB type */ + public boolean isJsonbColumn() { + return columnType == PostgresDataType.JSONB; + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java new file mode 100644 index 00000000..3558ca2d --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatSetOperatorParser.java @@ -0,0 +1,88 @@ +package org.hypertrace.core.documentstore.postgres.update.parser; + +import org.hypertrace.core.documentstore.model.subdoc.MultiValuedNestedSubDocumentValue; +import org.hypertrace.core.documentstore.model.subdoc.MultiValuedPrimitiveSubDocumentValue; +import org.hypertrace.core.documentstore.model.subdoc.NestedSubDocumentValue; +import org.hypertrace.core.documentstore.model.subdoc.NullSubDocumentValue; +import org.hypertrace.core.documentstore.model.subdoc.PrimitiveSubDocumentValue; +import org.hypertrace.core.documentstore.model.subdoc.visitor.SubDocumentValueVisitor; +import org.hypertrace.core.documentstore.postgres.update.FlatUpdateContext; + +/** + * Parser for the SET operator in flat collections. + * + *

Handles two cases: + * + *

    + *
  • Top-level columns: {@code SET "column" = ?} + *
  • Nested JSONB paths: {@code SET "column" = jsonb_set(COALESCE("column", '{}'), '{path}', + * to_jsonb(?))} + *
+ */ +public class FlatSetOperatorParser implements FlatUpdateOperatorParser { + + /** Visitor to extract raw values from SubDocumentValue for use in prepared statements. */ + private static final SubDocumentValueVisitor VALUE_EXTRACTOR = + new SubDocumentValueVisitor<>() { + @Override + public Object visit(PrimitiveSubDocumentValue value) { + return value.getValue(); + } + + @Override + public Object visit(MultiValuedPrimitiveSubDocumentValue value) { + return value.getValues(); + } + + @Override + public Object visit(NestedSubDocumentValue value) { + return value.getJsonValue(); + } + + @Override + public Object visit(MultiValuedNestedSubDocumentValue value) { + return value.getJsonValues(); + } + + @Override + public Object visit(NullSubDocumentValue value) { + return null; + } + }; + + @Override + public String parse(FlatUpdateContext context) { + if (context.isTopLevel()) { + return parseTopLevel(context); + } else { + return parseNestedJsonb(context); + } + } + + private String parseTopLevel(FlatUpdateContext context) { + context.getParams().add(context.getValue().accept(VALUE_EXTRACTOR)); + return String.format("\"%s\" = ?", context.getColumnName()); + } + + private String parseNestedJsonb(FlatUpdateContext context) { + String jsonPath = buildJsonPath(context.getNestedPath()); + Object value = context.getValue().accept(VALUE_EXTRACTOR); + + context.getParams().add(jsonPath); + context.getParams().add(value); + + // Use jsonb_set with COALESCE to handle null columns + // to_jsonb(?) converts the value to proper JSONB format + return String.format( + "\"%s\" = jsonb_set(COALESCE(\"%s\", '{}'), ?::text[], to_jsonb(?))", + context.getColumnName(), context.getColumnName()); + } + + /** + * Builds a PostgreSQL text array path from nested path components. For example, ["seller", + * "name"] becomes "{seller,name}" + */ + private String buildJsonPath(String[] nestedPath) { + return "{" + String.join(",", nestedPath) + "}"; + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatUpdateOperatorParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatUpdateOperatorParser.java new file mode 100644 index 00000000..be4d3e24 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/parser/FlatUpdateOperatorParser.java @@ -0,0 +1,26 @@ +package org.hypertrace.core.documentstore.postgres.update.parser; + +import org.hypertrace.core.documentstore.postgres.update.FlatUpdateContext; + +/** + * Parser interface for converting SubDocumentUpdate operations to SQL fragments for flat + * collections. + * + *

Each implementation handles a specific {@link + * org.hypertrace.core.documentstore.model.subdoc.UpdateOperator} and generates the appropriate SQL + * SET clause fragment. + */ +public interface FlatUpdateOperatorParser { + + /** + * Generates SQL SET clause fragment for this operator. + * + *

For top-level columns, this typically produces: {@code "column" = ?} + * + *

For nested JSONB paths, this produces: {@code "column" = jsonb_set(...)} + * + * @param context The update context containing column info, value, and parameter accumulator + * @return SQL fragment to be used in SET clause + */ + String parse(FlatUpdateContext context); +}