diff --git a/packages/db-ivm/tests/operators/groupBy.test.ts b/packages/db-ivm/tests/operators/groupBy.test.ts index fbe50fb33..4957bd112 100644 --- a/packages/db-ivm/tests/operators/groupBy.test.ts +++ b/packages/db-ivm/tests/operators/groupBy.test.ts @@ -13,6 +13,32 @@ import { } from '../../src/operators/groupBy.js' import { output } from '../../src/operators/index.js' +/** + * Helper to track all messages (inserts/deletes) emitted by the groupBy operator. + * This is useful for debugging issues where the operator might emit incorrect + * sequences of operations. + */ +function createMessageTracker() { + const allMessages: Array<{ + key: string + value: Record + multiplicity: number + }> = [] + + return { + track: (message: MultiSet) => { + for (const [item, multiplicity] of message.getInner()) { + const [key, value] = item + allMessages.push({ key, value, multiplicity }) + } + }, + getMessages: () => allMessages, + clear: () => { + allMessages.length = 0 + }, + } +} + describe(`Operators`, () => { describe(`GroupBy operation`, () => { test(`with no aggregate`, () => { @@ -961,6 +987,327 @@ describe(`Operators`, () => { expect(result).toEqual(expectedUpdateResult) }) + test(`incremental updates should emit paired delete+insert for aggregate changes`, () => { + // This test verifies that when an aggregate value changes due to incremental updates, + // the groupBy operator correctly emits BOTH a delete for the old value AND an insert + // for the new value. This is critical for downstream consumers that track state. + // + // Bug scenario: When multiple items with the same groupBy key are added incrementally, + // the pipeline might emit only an insert without the corresponding delete, causing + // "already exists" errors in downstream collections. + + const graph = new D2() + const input = graph.newInput<{ + id: string + category: string + amount: number + }>() + const tracker = createMessageTracker() + + input.pipe( + groupBy((data) => ({ category: data.category }), { + total: sum((data) => data.amount), + count: count(), + }), + output((message) => { + tracker.track(message) + }), + ) + + graph.finalize() + + // Initial data: one item for category A + input.sendData( + new MultiSet([[{ id: `1`, category: `A`, amount: 10 }, 1]]), + ) + graph.run() + + // Verify initial state + const initialMessages = tracker.getMessages() + expect(initialMessages).toHaveLength(1) + expect(initialMessages[0]?.multiplicity).toBe(1) // Insert + expect(initialMessages[0]?.value).toMatchObject({ + category: `A`, + total: 10, + count: 1, + }) + + tracker.clear() + + // Incremental update: add another item with same category + // This should emit BOTH a delete for the old aggregate AND an insert for the new one + input.sendData( + new MultiSet([[{ id: `2`, category: `A`, amount: 20 }, 1]]), + ) + graph.run() + + const updateMessages = tracker.getMessages() + + // Should have exactly 2 messages: one delete (-1) and one insert (+1) + expect(updateMessages).toHaveLength(2) + + // Find the delete and insert messages + const deleteMsg = updateMessages.find((m) => m.multiplicity === -1) + const insertMsg = updateMessages.find((m) => m.multiplicity === 1) + + // Verify we have both a delete and an insert + expect(deleteMsg).toBeDefined() + expect(insertMsg).toBeDefined() + + // The delete should be for the old aggregate value + expect(deleteMsg?.value).toMatchObject({ + category: `A`, + total: 10, + count: 1, + }) + + // The insert should be for the new aggregate value + expect(insertMsg?.value).toMatchObject({ + category: `A`, + total: 30, + count: 2, + }) + }) + + test(`rapid incremental updates should always emit paired delete+insert`, () => { + // This test simulates rapid sequential updates that might trigger edge cases + // in the reduce operator's state tracking. + + const graph = new D2() + const input = graph.newInput<{ + id: string + language: string + }>() + const tracker = createMessageTracker() + + input.pipe( + groupBy((data) => ({ language: data.language }), { + count: count(), + }), + output((message) => { + tracker.track(message) + }), + ) + + graph.finalize() + + // Initial item + input.sendData(new MultiSet([[{ id: `1`, language: `en` }, 1]])) + graph.run() + + expect(tracker.getMessages()).toHaveLength(1) + expect(tracker.getMessages()[0]?.multiplicity).toBe(1) + expect(tracker.getMessages()[0]?.value).toMatchObject({ + language: `en`, + count: 1, + }) + + // Perform multiple rapid incremental updates + for (let i = 2; i <= 5; i++) { + tracker.clear() + + input.sendData(new MultiSet([[{ id: `${i}`, language: `en` }, 1]])) + graph.run() + + const messages = tracker.getMessages() + + // Each update should produce exactly 2 messages: delete old, insert new + expect(messages).toHaveLength(2) + + const deleteMsg = messages.find((m) => m.multiplicity === -1) + const insertMsg = messages.find((m) => m.multiplicity === 1) + + expect(deleteMsg).toBeDefined() + expect(insertMsg).toBeDefined() + + // Old count should be i-1, new count should be i + expect(deleteMsg?.value).toMatchObject({ language: `en`, count: i - 1 }) + expect(insertMsg?.value).toMatchObject({ language: `en`, count: i }) + } + }) + + test(`multiple groups with interleaved updates should emit correct delete+insert pairs`, () => { + // This test verifies that when multiple groups are updated in the same batch, + // each group gets the correct delete+insert pair. + + const graph = new D2() + const input = graph.newInput<{ + id: string + language: string + }>() + const tracker = createMessageTracker() + + input.pipe( + groupBy((data) => ({ language: data.language }), { + count: count(), + }), + output((message) => { + tracker.track(message) + }), + ) + + graph.finalize() + + // Initial data: one item for each language + input.sendData( + new MultiSet([ + [{ id: `1`, language: `en` }, 1], + [{ id: `2`, language: `ru` }, 1], + [{ id: `3`, language: `fr` }, 1], + ]), + ) + graph.run() + + // Should have 3 groups with count 1 each + expect(tracker.getMessages()).toHaveLength(3) + const enInsert = tracker + .getMessages() + .find((m) => m.key === `{"language":"en"}`) + const ruInsert = tracker + .getMessages() + .find((m) => m.key === `{"language":"ru"}`) + const frInsert = tracker + .getMessages() + .find((m) => m.key === `{"language":"fr"}`) + expect(enInsert?.multiplicity).toBe(1) + expect(ruInsert?.multiplicity).toBe(1) + expect(frInsert?.multiplicity).toBe(1) + expect(enInsert?.value.count).toBe(1) + expect(ruInsert?.value.count).toBe(1) + expect(frInsert?.value.count).toBe(1) + + tracker.clear() + + // Add items to two groups in the same batch + input.sendData( + new MultiSet([ + [{ id: `4`, language: `en` }, 1], + [{ id: `5`, language: `ru` }, 1], + ]), + ) + graph.run() + + const updateMessages = tracker.getMessages() + + // Should have 4 messages: delete+insert for en, delete+insert for ru + expect(updateMessages).toHaveLength(4) + + // Check en group + const enDelete = updateMessages.find( + (m) => m.key === `{"language":"en"}` && m.multiplicity === -1, + ) + const enUpdate = updateMessages.find( + (m) => m.key === `{"language":"en"}` && m.multiplicity === 1, + ) + expect(enDelete).toBeDefined() + expect(enUpdate).toBeDefined() + expect(enDelete?.value.count).toBe(1) + expect(enUpdate?.value.count).toBe(2) + + // Check ru group + const ruDelete = updateMessages.find( + (m) => m.key === `{"language":"ru"}` && m.multiplicity === -1, + ) + const ruUpdate = updateMessages.find( + (m) => m.key === `{"language":"ru"}` && m.multiplicity === 1, + ) + expect(ruDelete).toBeDefined() + expect(ruUpdate).toBeDefined() + expect(ruDelete?.value.count).toBe(1) + expect(ruUpdate?.value.count).toBe(2) + + // Check that fr group was NOT affected (no messages for it) + const frMessages = updateMessages.filter( + (m) => m.key === `{"language":"fr"}`, + ) + expect(frMessages).toHaveLength(0) + }) + + test(`verify message accumulation - deletes and inserts should pair correctly`, () => { + // This test verifies that when processing incremental updates, + // the D2 pipeline emits properly paired delete and insert messages + // that can be accumulated by key in downstream processing. + // + // This is the exact scenario where the bug was reported: + // "the D2 pipeline might emit an insert for an updated aggregate + // without a corresponding delete" + + const graph = new D2() + const input = graph.newInput<{ + id: string + language: string + }>() + + // Track all raw messages and their multiplicities + const allMessages: Array<{ + key: string + value: Record + multiplicity: number + }> = [] + + input.pipe( + groupBy((data) => ({ language: data.language }), { + count: count(), + }), + output((message) => { + for (const [item, multiplicity] of message.getInner()) { + const [key, value] = item + allMessages.push({ key, value, multiplicity }) + } + }), + ) + + graph.finalize() + + // Step 1: Initial insert + input.sendData(new MultiSet([[{ id: `event1`, language: `ru` }, 1]])) + graph.run() + + // Should have exactly 1 message: insert with count 1 + expect(allMessages).toHaveLength(1) + expect(allMessages[0]?.multiplicity).toBe(1) + expect(allMessages[0]?.value.count).toBe(1) + + // Clear for next step + allMessages.length = 0 + + // Step 2: Second insert to same group + input.sendData(new MultiSet([[{ id: `event2`, language: `ru` }, 1]])) + graph.run() + + // Simulate how the db package accumulates changes by key + const changesByKey = new Map< + string, + { inserts: number; deletes: number; value: any } + >() + + for (const msg of allMessages) { + const existing = changesByKey.get(msg.key) || { + inserts: 0, + deletes: 0, + value: null, + } + if (msg.multiplicity > 0) { + existing.inserts += msg.multiplicity + existing.value = msg.value + } else if (msg.multiplicity < 0) { + existing.deletes += Math.abs(msg.multiplicity) + } + changesByKey.set(msg.key, existing) + } + + // For the "ru" key, we should have 1 delete and 1 insert + const ruChanges = changesByKey.get(`{"language":"ru"}`) + expect(ruChanges).toBeDefined() + + // CRITICAL: Both deletes and inserts should be present + // If only inserts are present (deletes === 0), this would cause + // the "already exists" error in the live query collection + expect(ruChanges?.deletes).toBe(1) + expect(ruChanges?.inserts).toBe(1) + expect(ruChanges?.value.count).toBe(2) + }) + test(`group removal and re-addition with multiple aggregates`, () => { const graph = new D2() const input = graph.newInput<{ diff --git a/packages/db/tests/query/group-by-incremental-test.test.ts b/packages/db/tests/query/group-by-incremental-test.test.ts new file mode 100644 index 000000000..e423e0499 --- /dev/null +++ b/packages/db/tests/query/group-by-incremental-test.test.ts @@ -0,0 +1,622 @@ +/** + * Tests for groupBy incremental updates to investigate the bug where + * the D2 pipeline might emit an insert without a corresponding delete. + */ +import { describe, expect, test } from 'vitest' +import { createLiveQueryCollection } from '../../src/query/index.js' +import { createCollection } from '../../src/collection/index.js' +import { mockSyncCollectionOptionsNoInitialState } from '../utils.js' +import { count, sum } from '../../src/query/builder/functions.js' +import { DuplicateKeySyncError } from '../../src/errors.js' + +type Event = { + id: string + language: string + amount?: number +} + +/** + * Helper to create a collection that's ready for testing. + */ +async function createReadyCollection(opts: { + id: string + getKey: (item: T) => string | number +}) { + const collection = createCollection( + mockSyncCollectionOptionsNoInitialState(opts), + ) + + const preloadPromise = collection.preload() + collection.utils.begin() + collection.utils.commit() + collection.utils.markReady() + await preloadPromise + + return collection +} + +describe(`GroupBy Incremental Updates Investigation`, () => { + describe(`D2 output tracing`, () => { + test(`trace accumulated changes for groupBy incremental update`, async () => { + // This test verifies that D2 emits paired delete+insert for aggregate updates + // by checking the accumulated changes passed to applyChanges + + const eventsCollection = await createReadyCollection({ + id: `events-trace`, + getKey: (event) => event.id, + }) + + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Insert first event + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event1`, language: `ru` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(1) + + // Insert second event - D2 should emit delete for {count:1} and insert for {count:2} + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event2`, language: `ru` }, + }) + eventsCollection.utils.commit() + + // Verify the result + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(2) + + // This test passing means D2 is correctly emitting paired delete+insert + // which gets accumulated into a single update in applyChanges + }) + }) + + describe(`Direct bug reproduction`, () => { + test(`simulating D2 emitting only insert (without delete) for live query should throw DuplicateKeySyncError`, async () => { + // This test directly simulates the bug scenario: + // D2 emits an insert for a key that already exists, without a preceding delete + // For live queries without custom getKey (like groupBy), this triggers the bug + // + // We need to use LIVE_QUERY_INTERNAL to mark this as a live query + + const { LIVE_QUERY_INTERNAL } = await import( + `../../src/query/live/internal.js` + ) + + type GroupResult = { + language: string + count: number + } + + let writeInsertForExistingKey: (() => void) | undefined + + const collection = createCollection({ + id: `direct-bug-repro`, + getKey: (item) => item.language, + sync: { + sync: ({ begin, write, commit, markReady }) => { + // First: insert initial value + begin() + write({ + type: `insert`, + value: { language: `ru`, count: 1 }, + }) + commit() + markReady() + + // Capture the write function to use later + writeInsertForExistingKey = () => { + begin() + // This insert is for an existing key with a DIFFERENT value + // Without a preceding delete, this should throw DuplicateKeySyncError + write({ + type: `insert`, + value: { language: `ru`, count: 2 }, + }) + commit() + } + }, + }, + startSync: true, + // Mark this as a live query with custom getKey (which should throw error) + utils: { + [LIVE_QUERY_INTERNAL]: { + hasCustomGetKey: true, // Has custom getKey, so should throw + hasJoins: false, + getBuilder: () => null, + }, + } as any, + }) + + await collection.preload() + + // Initial state + expect(collection.size).toBe(1) + expect(collection.get(`ru`)?.count).toBe(1) + + // Now try to insert for the existing key without a delete + // This should throw because we're inserting a duplicate key with different value + // and this has custom getKey set to true + expect(() => writeInsertForExistingKey!()).toThrow(DuplicateKeySyncError) + }) + + test(`inserting same value for existing key should convert to update (not throw)`, async () => { + // When the new value is deepEquals to the existing value, + // the insert should be converted to an update (not throw) + + type GroupResult = { + language: string + count: number + } + + let writeInsertForExistingKey: (() => void) | undefined + + const collection = createCollection({ + id: `same-value-repro`, + getKey: (item) => item.language, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { language: `ru`, count: 1 }, + }) + commit() + markReady() + + writeInsertForExistingKey = () => { + begin() + // Same value - should be converted to update + write({ + type: `insert`, + value: { language: `ru`, count: 1 }, + }) + commit() + } + }, + }, + startSync: true, + }) + + await collection.preload() + + expect(collection.size).toBe(1) + expect(collection.get(`ru`)?.count).toBe(1) + + // This should NOT throw because the value is the same + expect(() => writeInsertForExistingKey!()).not.toThrow() + }) + }) + + test(`basic incremental update with same groupBy key`, async () => { + const eventsCollection = await createReadyCollection({ + id: `events-basic-inc`, + getKey: (event) => event.id, + }) + + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Initially empty + expect(languageCounts.size).toBe(0) + + // Insert first event with language="ru" + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event1`, language: `ru` }, + }) + eventsCollection.utils.commit() + + // After first insert, should have count 1 + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(1) + + // Insert second event with same language="ru" but different id + // This is where the bug was reported - should NOT throw "already exists" + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event2`, language: `ru` }, + }) + eventsCollection.utils.commit() + + // After second insert, should have count 2 + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(2) + }) + + test(`multiple incremental updates to same group`, async () => { + const eventsCollection = await createReadyCollection({ + id: `events-multi-inc`, + getKey: (event) => event.id, + }) + + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Add 5 events incrementally + for (let i = 1; i <= 5; i++) { + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event${i}`, language: `en` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`en`)?.count).toBe(i) + } + }) + + test(`incremental updates with sum aggregate`, async () => { + const eventsCollection = await createReadyCollection({ + id: `events-sum-inc`, + getKey: (event) => event.id, + }) + + const languageTotals = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + total: sum(events.amount), + count: count(events.id), + })), + }) + + // First event + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event1`, language: `ru`, amount: 10 }, + }) + eventsCollection.utils.commit() + + expect(languageTotals.get(`ru`)?.total).toBe(10) + expect(languageTotals.get(`ru`)?.count).toBe(1) + + // Second event + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event2`, language: `ru`, amount: 20 }, + }) + eventsCollection.utils.commit() + + expect(languageTotals.get(`ru`)?.total).toBe(30) + expect(languageTotals.get(`ru`)?.count).toBe(2) + + // Third event + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event3`, language: `ru`, amount: 15 }, + }) + eventsCollection.utils.commit() + + expect(languageTotals.get(`ru`)?.total).toBe(45) + expect(languageTotals.get(`ru`)?.count).toBe(3) + }) + + test(`multiple groups with incremental updates`, async () => { + const eventsCollection = await createReadyCollection({ + id: `events-multi-group-inc`, + getKey: (event) => event.id, + }) + + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Add events to different groups incrementally + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event1`, language: `en` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`en`)?.count).toBe(1) + + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event2`, language: `ru` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.size).toBe(2) + expect(languageCounts.get(`en`)?.count).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(1) + + // Now add more to each group + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event3`, language: `en` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.get(`en`)?.count).toBe(2) + expect(languageCounts.get(`ru`)?.count).toBe(1) + + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event4`, language: `ru` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.get(`en`)?.count).toBe(2) + expect(languageCounts.get(`ru`)?.count).toBe(2) + }) + + test(`batch then incremental updates`, async () => { + const eventsCollection = await createReadyCollection({ + id: `events-batch-then-inc`, + getKey: (event) => event.id, + }) + + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Batch insert + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event1`, language: `ru` }, + }) + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event2`, language: `ru` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(2) + + // Then incremental + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event3`, language: `ru` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.get(`ru`)?.count).toBe(3) + }) + + test(`groupBy with subquery (matching bug report pattern)`, async () => { + // This test mimics the exact pattern from the bug report: + // A groupBy result is used as a source for another query with orderBy/limit + type WikiEvent = { + id: string + language: string + } + + const eventsCollection = await createReadyCollection({ + id: `events-subquery`, + getKey: (event) => event.id, + }) + + // Create the groupBy query that counts events by language + // This is used as a subquery + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Create the outer query that orders by count and limits + const topLanguages = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ stats: languageCounts }) + .orderBy(({ stats }) => stats.count, `desc`) + .limit(5), + }) + + // Initially empty + expect(topLanguages.size).toBe(0) + + // Insert first event with language="ru" + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event1`, language: `ru` }, + }) + eventsCollection.utils.commit() + + // Should have one language with count 1 + expect(topLanguages.size).toBe(1) + const firstResult = [...topLanguages.values()][0] + expect(firstResult?.language).toBe(`ru`) + expect(firstResult?.count).toBe(1) + + // Insert second event with same language="ru" but different id + // This is where the bug would occur + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event2`, language: `ru` }, + }) + eventsCollection.utils.commit() + + // Should still have one language, but with count 2 + expect(topLanguages.size).toBe(1) + const secondResult = [...topLanguages.values()][0] + expect(secondResult?.language).toBe(`ru`) + expect(secondResult?.count).toBe(2) + + // Add more events to different languages + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event3`, language: `en` }, + }) + eventsCollection.utils.commit() + + expect(topLanguages.size).toBe(2) + + // Add another Russian event + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event4`, language: `ru` }, + }) + eventsCollection.utils.commit() + + // Russian should now have count 3 + const results = [...topLanguages.values()] + const ruResult = results.find((r) => r.language === `ru`) + const enResult = results.find((r) => r.language === `en`) + expect(ruResult?.count).toBe(3) + expect(enResult?.count).toBe(1) + }) + + test(`groupBy with rapid sequential inserts`, async () => { + // Test rapid sequential inserts that might trigger race conditions + const eventsCollection = await createReadyCollection({ + id: `events-rapid`, + getKey: (event) => event.id, + }) + + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Rapidly insert multiple events with the same language + for (let i = 0; i < 10; i++) { + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event-${i}`, language: `ru` }, + }) + eventsCollection.utils.commit() + } + + // Should have accumulated all counts + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(10) + }) + + test(`groupBy with multiple events in single batch`, async () => { + // Test inserting multiple events with same groupBy key in a single batch + const eventsCollection = await createReadyCollection({ + id: `events-batch-same`, + getKey: (event) => event.id, + }) + + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Insert multiple events in a single batch + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event1`, language: `ru` }, + }) + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event2`, language: `ru` }, + }) + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event3`, language: `ru` }, + }) + eventsCollection.utils.commit() + + // Should have one group with count 3 + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(3) + + // Then add more incrementally + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event4`, language: `ru` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.get(`ru`)?.count).toBe(4) + }) +})