Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
347 changes: 347 additions & 0 deletions packages/db-ivm/tests/operators/groupBy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,32 @@ import {
} from '../../src/operators/groupBy.js'
import { output } from '../../src/operators/index.js'

/**
* Helper to track all messages (inserts/deletes) emitted by the groupBy operator.
* This is useful for debugging issues where the operator might emit incorrect
* sequences of operations.
*/
function createMessageTracker() {
const allMessages: Array<{
key: string
value: Record<string, unknown>
multiplicity: number
}> = []

return {
track: (message: MultiSet<any>) => {
for (const [item, multiplicity] of message.getInner()) {
const [key, value] = item
allMessages.push({ key, value, multiplicity })
}
},
getMessages: () => allMessages,
clear: () => {
allMessages.length = 0
},
}
}

describe(`Operators`, () => {
describe(`GroupBy operation`, () => {
test(`with no aggregate`, () => {
Expand Down Expand Up @@ -961,6 +987,327 @@ describe(`Operators`, () => {
expect(result).toEqual(expectedUpdateResult)
})

test(`incremental updates should emit paired delete+insert for aggregate changes`, () => {
// This test verifies that when an aggregate value changes due to incremental updates,
// the groupBy operator correctly emits BOTH a delete for the old value AND an insert
// for the new value. This is critical for downstream consumers that track state.
//
// Bug scenario: When multiple items with the same groupBy key are added incrementally,
// the pipeline might emit only an insert without the corresponding delete, causing
// "already exists" errors in downstream collections.

const graph = new D2()
const input = graph.newInput<{
id: string
category: string
amount: number
}>()
const tracker = createMessageTracker()

input.pipe(
groupBy((data) => ({ category: data.category }), {
total: sum((data) => data.amount),
count: count(),
}),
output((message) => {
tracker.track(message)
}),
)

graph.finalize()

// Initial data: one item for category A
input.sendData(
new MultiSet([[{ id: `1`, category: `A`, amount: 10 }, 1]]),
)
graph.run()

// Verify initial state
const initialMessages = tracker.getMessages()
expect(initialMessages).toHaveLength(1)
expect(initialMessages[0]?.multiplicity).toBe(1) // Insert
expect(initialMessages[0]?.value).toMatchObject({
category: `A`,
total: 10,
count: 1,
})

tracker.clear()

// Incremental update: add another item with same category
// This should emit BOTH a delete for the old aggregate AND an insert for the new one
input.sendData(
new MultiSet([[{ id: `2`, category: `A`, amount: 20 }, 1]]),
)
graph.run()

const updateMessages = tracker.getMessages()

// Should have exactly 2 messages: one delete (-1) and one insert (+1)
expect(updateMessages).toHaveLength(2)

// Find the delete and insert messages
const deleteMsg = updateMessages.find((m) => m.multiplicity === -1)
const insertMsg = updateMessages.find((m) => m.multiplicity === 1)

// Verify we have both a delete and an insert
expect(deleteMsg).toBeDefined()
expect(insertMsg).toBeDefined()

// The delete should be for the old aggregate value
expect(deleteMsg?.value).toMatchObject({
category: `A`,
total: 10,
count: 1,
})

// The insert should be for the new aggregate value
expect(insertMsg?.value).toMatchObject({
category: `A`,
total: 30,
count: 2,
})
})

test(`rapid incremental updates should always emit paired delete+insert`, () => {
// This test simulates rapid sequential updates that might trigger edge cases
// in the reduce operator's state tracking.

const graph = new D2()
const input = graph.newInput<{
id: string
language: string
}>()
const tracker = createMessageTracker()

input.pipe(
groupBy((data) => ({ language: data.language }), {
count: count(),
}),
output((message) => {
tracker.track(message)
}),
)

graph.finalize()

// Initial item
input.sendData(new MultiSet([[{ id: `1`, language: `en` }, 1]]))
graph.run()

expect(tracker.getMessages()).toHaveLength(1)
expect(tracker.getMessages()[0]?.multiplicity).toBe(1)
expect(tracker.getMessages()[0]?.value).toMatchObject({
language: `en`,
count: 1,
})

// Perform multiple rapid incremental updates
for (let i = 2; i <= 5; i++) {
tracker.clear()

input.sendData(new MultiSet([[{ id: `${i}`, language: `en` }, 1]]))
graph.run()

const messages = tracker.getMessages()

// Each update should produce exactly 2 messages: delete old, insert new
expect(messages).toHaveLength(2)

const deleteMsg = messages.find((m) => m.multiplicity === -1)
const insertMsg = messages.find((m) => m.multiplicity === 1)

expect(deleteMsg).toBeDefined()
expect(insertMsg).toBeDefined()

// Old count should be i-1, new count should be i
expect(deleteMsg?.value).toMatchObject({ language: `en`, count: i - 1 })
expect(insertMsg?.value).toMatchObject({ language: `en`, count: i })
}
})

test(`multiple groups with interleaved updates should emit correct delete+insert pairs`, () => {
// This test verifies that when multiple groups are updated in the same batch,
// each group gets the correct delete+insert pair.

const graph = new D2()
const input = graph.newInput<{
id: string
language: string
}>()
const tracker = createMessageTracker()

input.pipe(
groupBy((data) => ({ language: data.language }), {
count: count(),
}),
output((message) => {
tracker.track(message)
}),
)

graph.finalize()

// Initial data: one item for each language
input.sendData(
new MultiSet([
[{ id: `1`, language: `en` }, 1],
[{ id: `2`, language: `ru` }, 1],
[{ id: `3`, language: `fr` }, 1],
]),
)
graph.run()

// Should have 3 groups with count 1 each
expect(tracker.getMessages()).toHaveLength(3)
const enInsert = tracker
.getMessages()
.find((m) => m.key === `{"language":"en"}`)
const ruInsert = tracker
.getMessages()
.find((m) => m.key === `{"language":"ru"}`)
const frInsert = tracker
.getMessages()
.find((m) => m.key === `{"language":"fr"}`)
expect(enInsert?.multiplicity).toBe(1)
expect(ruInsert?.multiplicity).toBe(1)
expect(frInsert?.multiplicity).toBe(1)
expect(enInsert?.value.count).toBe(1)
expect(ruInsert?.value.count).toBe(1)
expect(frInsert?.value.count).toBe(1)

tracker.clear()

// Add items to two groups in the same batch
input.sendData(
new MultiSet([
[{ id: `4`, language: `en` }, 1],
[{ id: `5`, language: `ru` }, 1],
]),
)
graph.run()

const updateMessages = tracker.getMessages()

// Should have 4 messages: delete+insert for en, delete+insert for ru
expect(updateMessages).toHaveLength(4)

// Check en group
const enDelete = updateMessages.find(
(m) => m.key === `{"language":"en"}` && m.multiplicity === -1,
)
const enUpdate = updateMessages.find(
(m) => m.key === `{"language":"en"}` && m.multiplicity === 1,
)
expect(enDelete).toBeDefined()
expect(enUpdate).toBeDefined()
expect(enDelete?.value.count).toBe(1)
expect(enUpdate?.value.count).toBe(2)

// Check ru group
const ruDelete = updateMessages.find(
(m) => m.key === `{"language":"ru"}` && m.multiplicity === -1,
)
const ruUpdate = updateMessages.find(
(m) => m.key === `{"language":"ru"}` && m.multiplicity === 1,
)
expect(ruDelete).toBeDefined()
expect(ruUpdate).toBeDefined()
expect(ruDelete?.value.count).toBe(1)
expect(ruUpdate?.value.count).toBe(2)

// Check that fr group was NOT affected (no messages for it)
const frMessages = updateMessages.filter(
(m) => m.key === `{"language":"fr"}`,
)
expect(frMessages).toHaveLength(0)
})

test(`verify message accumulation - deletes and inserts should pair correctly`, () => {
// This test verifies that when processing incremental updates,
// the D2 pipeline emits properly paired delete and insert messages
// that can be accumulated by key in downstream processing.
//
// This is the exact scenario where the bug was reported:
// "the D2 pipeline might emit an insert for an updated aggregate
// without a corresponding delete"

const graph = new D2()
const input = graph.newInput<{
id: string
language: string
}>()

// Track all raw messages and their multiplicities
const allMessages: Array<{
key: string
value: Record<string, unknown>
multiplicity: number
}> = []

input.pipe(
groupBy((data) => ({ language: data.language }), {
count: count(),
}),
output((message) => {
for (const [item, multiplicity] of message.getInner()) {
const [key, value] = item
allMessages.push({ key, value, multiplicity })
}
}),
)

graph.finalize()

// Step 1: Initial insert
input.sendData(new MultiSet([[{ id: `event1`, language: `ru` }, 1]]))
graph.run()

// Should have exactly 1 message: insert with count 1
expect(allMessages).toHaveLength(1)
expect(allMessages[0]?.multiplicity).toBe(1)
expect(allMessages[0]?.value.count).toBe(1)

// Clear for next step
allMessages.length = 0

// Step 2: Second insert to same group
input.sendData(new MultiSet([[{ id: `event2`, language: `ru` }, 1]]))
graph.run()

// Simulate how the db package accumulates changes by key
const changesByKey = new Map<
string,
{ inserts: number; deletes: number; value: any }
>()

for (const msg of allMessages) {
const existing = changesByKey.get(msg.key) || {
inserts: 0,
deletes: 0,
value: null,
}
if (msg.multiplicity > 0) {
existing.inserts += msg.multiplicity
existing.value = msg.value
} else if (msg.multiplicity < 0) {
existing.deletes += Math.abs(msg.multiplicity)
}
changesByKey.set(msg.key, existing)
}

// For the "ru" key, we should have 1 delete and 1 insert
const ruChanges = changesByKey.get(`{"language":"ru"}`)
expect(ruChanges).toBeDefined()

// CRITICAL: Both deletes and inserts should be present
// If only inserts are present (deletes === 0), this would cause
// the "already exists" error in the live query collection
expect(ruChanges?.deletes).toBe(1)
expect(ruChanges?.inserts).toBe(1)
expect(ruChanges?.value.count).toBe(2)
})

test(`group removal and re-addition with multiple aggregates`, () => {
const graph = new D2()
const input = graph.newInput<{
Expand Down
Loading
Loading