Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 108 additions & 116 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse";
import type { ClickHouse, TaskRunInsertArray, PayloadInsertArray } from "@internal/clickhouse";
import { TASK_RUN_INDEX, PAYLOAD_INDEX } from "@internal/clickhouse";
import { type RedisOptions } from "@internal/redis";
import {
LogicalReplicationClient,
Expand Down Expand Up @@ -81,7 +82,7 @@ type TaskRunInsert = {
export type RunsReplicationServiceEvents = {
message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }];
batchFlushed: [
{ flushId: string; taskRunInserts: TaskRunV2[]; payloadInserts: RawTaskRunPayloadV1[] }
{ flushId: string; taskRunInserts: TaskRunInsertArray[]; payloadInserts: PayloadInsertArray[] },
];
};

Expand Down Expand Up @@ -171,12 +172,9 @@ export class RunsReplicationService {
description: "Insert retry attempts",
});

this._eventsProcessedCounter = this._meter.createCounter(
"runs_replication.events_processed",
{
description: "Replication events processed (inserts, updates, deletes)",
}
);
this._eventsProcessedCounter = this._meter.createCounter("runs_replication.events_processed", {
description: "Replication events processed (inserts, updates, deletes)",
});

this._flushDurationHistogram = this._meter.createHistogram(
"runs_replication.flush_duration_ms",
Expand Down Expand Up @@ -578,32 +576,46 @@ export class RunsReplicationService {

const taskRunInserts = preparedInserts
.map(({ taskRunInsert }) => taskRunInsert)
.filter(Boolean)
.filter((x): x is TaskRunInsertArray => Boolean(x))
// batch inserts in clickhouse are more performant if the items
// are pre-sorted by the primary key
.sort((a, b) => {
if (a.organization_id !== b.organization_id) {
return a.organization_id < b.organization_id ? -1 : 1;
const aOrgId = a[TASK_RUN_INDEX.organization_id] as string;
const bOrgId = b[TASK_RUN_INDEX.organization_id] as string;
if (aOrgId !== bOrgId) {
return aOrgId < bOrgId ? -1 : 1;
}
if (a.project_id !== b.project_id) {
return a.project_id < b.project_id ? -1 : 1;
const aProjId = a[TASK_RUN_INDEX.project_id] as string;
const bProjId = b[TASK_RUN_INDEX.project_id] as string;
if (aProjId !== bProjId) {
return aProjId < bProjId ? -1 : 1;
}
if (a.environment_id !== b.environment_id) {
return a.environment_id < b.environment_id ? -1 : 1;
const aEnvId = a[TASK_RUN_INDEX.environment_id] as string;
const bEnvId = b[TASK_RUN_INDEX.environment_id] as string;
if (aEnvId !== bEnvId) {
return aEnvId < bEnvId ? -1 : 1;
}
if (a.created_at !== b.created_at) {
return a.created_at - b.created_at;
const aCreatedAt = a[TASK_RUN_INDEX.created_at] as number;
const bCreatedAt = b[TASK_RUN_INDEX.created_at] as number;
if (aCreatedAt !== bCreatedAt) {
return aCreatedAt - bCreatedAt;
}
return a.run_id < b.run_id ? -1 : 1;
const aRunId = a[TASK_RUN_INDEX.run_id] as string;
const bRunId = b[TASK_RUN_INDEX.run_id] as string;
if (aRunId === bRunId) return 0;
return aRunId < bRunId ? -1 : 1;
});

const payloadInserts = preparedInserts
.map(({ payloadInsert }) => payloadInsert)
.filter(Boolean)
.filter((x): x is PayloadInsertArray => Boolean(x))
// batch inserts in clickhouse are more performant if the items
// are pre-sorted by the primary key
.sort((a, b) => {
return a.run_id < b.run_id ? -1 : 1;
const aRunId = a[PAYLOAD_INDEX.run_id] as string;
const bRunId = b[PAYLOAD_INDEX.run_id] as string;
if (aRunId === bRunId) return 0;
return aRunId < bRunId ? -1 : 1;
});

span.setAttribute("task_run_inserts", taskRunInserts.length);
Expand Down Expand Up @@ -633,7 +645,6 @@ export class RunsReplicationService {
this.logger.error("Error inserting task run inserts", {
error: taskRunError,
flushId,
runIds: taskRunInserts.map((r) => r.run_id),
});
recordSpanError(span, taskRunError);
}
Expand All @@ -642,7 +653,6 @@ export class RunsReplicationService {
this.logger.error("Error inserting payload inserts", {
error: payloadError,
flushId,
runIds: payloadInserts.map((r) => r.run_id),
});
recordSpanError(span, payloadError);
}
Expand Down Expand Up @@ -760,26 +770,24 @@ export class RunsReplicationService {
#getClickhouseInsertSettings() {
if (this._insertStrategy === "insert") {
return {};
} else if (this._insertStrategy === "insert_async") {
return {
async_insert: 1 as const,
async_insert_max_data_size: "1000000",
async_insert_busy_timeout_ms: 1000,
wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const),
};
}

return {
async_insert: 1 as const,
async_insert_max_data_size: "1000000",
async_insert_busy_timeout_ms: 1000,
wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const),
};
}
Comment on lines 770 to 781
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

ClickHouse async_insert_max_data_size setting type format string or number

💡 Result:

async_insert_max_data_size is a numeric setting (bytes). Set it as an integer (e.g. 10485760). It’s the max number of bytes of buffered data before flush and is passed like async_insert_max_data_size=10485760. [1][2]

Sources

  • ClickHouse settings (async_insert_max_data_size = maximum size in bytes). [1]
  • Kafka connector docs showing usage and example (async_insert_max_data_size=10485760). [2]

🏁 Script executed:

# Check how the ClickHouse client is being used and if there are type definitions
cd apps/webapp && rg -A 5 -B 5 "async_insert_max_data_size" --type ts --type tsx

Repository: triggerdotdev/trigger.dev

Length of output: 96


🏁 Script executed:

# Look for ClickHouse client package and its version
rg "clickhouse|@clickhouse" package.json --type json -A 2

Repository: triggerdotdev/trigger.dev

Length of output: 51


🏁 Script executed:

# Search for async_insert_max_data_size usage with correct syntax
cd apps/webapp && rg "async_insert_max_data_size" -A 3 -B 3

Repository: triggerdotdev/trigger.dev

Length of output: 1387


🏁 Script executed:

# Find package.json files and check for ClickHouse client
fd package.json | head -5 | xargs rg "clickhouse|@clickhouse" -l

Repository: triggerdotdev/trigger.dev

Length of output: 93


🏁 Script executed:

# Search for any ClickHouse-related imports in runsReplicationService.server.ts
head -100 apps/webapp/app/services/runsReplicationService.server.ts | rg -i "clickhouse|import"

Repository: triggerdotdev/trigger.dev

Length of output: 755


🏁 Script executed:

# Find @internal/clickhouse type definitions
fd -t f "clickhouse" -e "ts" -e "d.ts" | grep -E "@internal|internal" | head -20

Repository: triggerdotdev/trigger.dev

Length of output: 119


🏁 Script executed:

# Search for @internal/clickhouse exports
rg "@internal/clickhouse" --type ts -A 2 | head -50

Repository: triggerdotdev/trigger.dev

Length of output: 4152


🏁 Script executed:

# Look for ClickHouse type definitions in packages directory
find . -path "*/internal/clickhouse*" -type f 2>/dev/null | head -20

Repository: triggerdotdev/trigger.dev

Length of output: 51


🏁 Script executed:

# Search for how settings object is used with ClickHouse client
rg "ClickHouse.*insert.*async_insert" --type ts -B 5 -A 10 | head -100

Repository: triggerdotdev/trigger.dev

Length of output: 51


🏁 Script executed:

# Check the ClickHouse client implementation
cat -n internal-packages/clickhouse/src/client/client.ts | head -150

Repository: triggerdotdev/trigger.dev

Length of output: 5686


🏁 Script executed:

# Look for insert method and how settings are used
rg "async_insert" internal-packages/clickhouse/src -B 5 -A 5 --type ts

Repository: triggerdotdev/trigger.dev

Length of output: 33912


🏁 Script executed:

# Find the type definition for insert settings
fd -t f -e ts -e d.ts internal-packages/clickhouse/src | xargs rg "async_insert|InsertSettings" -B 2 -A 2

Repository: triggerdotdev/trigger.dev

Length of output: 19004


🏁 Script executed:

# Find the @clickhouse/client package info
fd package.json -x cat {} \; 2>/dev/null | grep -A 10 -B 10 "@clickhouse/client" | head -30

Repository: triggerdotdev/trigger.dev

Length of output: 851


🏁 Script executed:

# Check if node_modules has type definitions
find . -path "*/node_modules/@clickhouse/client*" -name "*.d.ts" 2>/dev/null | head -5

Repository: triggerdotdev/trigger.dev

Length of output: 51


🏁 Script executed:

# Look for BaseQueryParams type in the codebase
rg "BaseQueryParams|InsertParams" -B 2 -A 5 --type ts | head -50

Repository: triggerdotdev/trigger.dev

Length of output: 3880


🌐 Web query:

@clickhouse/client npm package async_insert_max_data_size parameter type string or number

💡 Result:

The setting async_insert_max_data_size is a ClickHouse server UInt64 setting; in the @clickhouse/client TypeScript typings it appears in ClickHouseSettings (as async_insert_max_data_size?: UInt64) — effectively a number in the client. [1][2]

Sources:

  • settings.d.ts (shows async_insert_max_data_size?: UInt64). [1]
  • client-common index/export of ClickHouseSettings. [2]

Change async_insert_max_data_size from string to number.

The ClickHouse setting async_insert_max_data_size must be a numeric value (UInt64 bytes), not a string. TypeScript types from @clickhouse/client expect a number, and ClickHouse documentation specifies a numeric format. Update to: async_insert_max_data_size: 1000000 (remove quotes).

🤖 Prompt for AI Agents
In @apps/webapp/app/services/runsReplicationService.server.ts around lines 770 -
781, In getClickhouseInsertSettings() update the async_insert_max_data_size
value from a string to a numeric literal: change async_insert_max_data_size:
"1000000" to async_insert_max_data_size: 1000000 so the ClickHouse setting and
@clickhouse/client TypeScript types use a number; leave the rest of the returned
object and wait_for_async_insert logic unchanged.


async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) {
async #insertTaskRunInserts(taskRunInserts: TaskRunInsertArray[], attempt: number) {
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
taskRunInserts,
{
const [insertError, insertResult] =
await this.options.clickhouse.taskRuns.insertCompactArrays(taskRunInserts, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
}
);
});

if (insertError) {
this.logger.error("Error inserting task run inserts attempt", {
Expand All @@ -795,16 +803,14 @@ export class RunsReplicationService {
});
}

async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[], attempt: number) {
async #insertPayloadInserts(payloadInserts: PayloadInsertArray[], attempt: number) {
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads(
payloadInserts,
{
const [insertError, insertResult] =
await this.options.clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
}
);
});

if (insertError) {
this.logger.error("Error inserting payload inserts attempt", {
Expand All @@ -822,25 +828,15 @@ export class RunsReplicationService {

async #prepareRunInserts(
batchedRun: TaskRunInsert
): Promise<{ taskRunInsert?: TaskRunV2; payloadInsert?: RawTaskRunPayloadV1 }> {
): Promise<{ taskRunInsert?: TaskRunInsertArray; payloadInsert?: PayloadInsertArray }> {
this.logger.debug("Preparing run", {
batchedRun,
});

const { run, _version, event } = batchedRun;

if (!run.environmentType) {
return {
taskRunInsert: undefined,
payloadInsert: undefined,
};
}

if (!run.organizationId) {
return {
taskRunInsert: undefined,
payloadInsert: undefined,
};
if (!run.environmentType || !run.organizationId) {
return {};
}

if (event === "update" || event === "delete" || this._disablePayloadInsert) {
Expand All @@ -852,21 +848,15 @@ export class RunsReplicationService {
_version
);

return {
taskRunInsert,
payloadInsert: undefined,
};
return { taskRunInsert };
}

const [taskRunInsert, payloadInsert] = await Promise.all([
this.#prepareTaskRunInsert(run, run.organizationId, run.environmentType, event, _version),
this.#preparePayloadInsert(run, _version),
]);

return {
taskRunInsert,
payloadInsert,
};
return { taskRunInsert, payloadInsert };
}

async #prepareTaskRunInsert(
Expand All @@ -875,66 +865,68 @@ export class RunsReplicationService {
environmentType: string,
event: "insert" | "update" | "delete",
_version: bigint
): Promise<TaskRunV2> {
): Promise<TaskRunInsertArray> {
const output = await this.#prepareJson(run.output, run.outputType);

return {
environment_id: run.runtimeEnvironmentId,
organization_id: organizationId,
project_id: run.projectId,
run_id: run.id,
updated_at: run.updatedAt.getTime(),
created_at: run.createdAt.getTime(),
status: run.status,
environment_type: environmentType,
friendly_id: run.friendlyId,
engine: run.engine,
task_identifier: run.taskIdentifier,
queue: run.queue,
span_id: run.spanId,
trace_id: run.traceId,
error: { data: run.error },
attempt: run.attemptNumber ?? 1,
schedule_id: run.scheduleId ?? "",
batch_id: run.batchId ?? "",
completed_at: run.completedAt?.getTime(),
started_at: run.startedAt?.getTime(),
executed_at: run.executedAt?.getTime(),
delay_until: run.delayUntil?.getTime(),
queued_at: run.queuedAt?.getTime(),
expired_at: run.expiredAt?.getTime(),
usage_duration_ms: run.usageDurationMs,
cost_in_cents: run.costInCents,
base_cost_in_cents: run.baseCostInCents,
tags: run.runTags ?? [],
task_version: run.taskVersion ?? "",
sdk_version: run.sdkVersion ?? "",
cli_version: run.cliVersion ?? "",
machine_preset: run.machinePreset ?? "",
root_run_id: run.rootTaskRunId ?? "",
parent_run_id: run.parentTaskRunId ?? "",
depth: run.depth,
is_test: run.isTest,
idempotency_key: run.idempotencyKey ?? "",
expiration_ttl: run.ttl ?? "",
output,
concurrency_key: run.concurrencyKey ?? "",
bulk_action_group_ids: run.bulkActionGroupIds ?? [],
worker_queue: run.masterQueue,
max_duration_in_seconds: run.maxDurationInSeconds ?? undefined,
_version: _version.toString(),
_is_deleted: event === "delete" ? 1 : 0,
};
// Return array matching TASK_RUN_COLUMNS order
return [
run.runtimeEnvironmentId, // environment_id
organizationId, // organization_id
run.projectId, // project_id
run.id, // run_id
run.updatedAt.getTime(), // updated_at
run.createdAt.getTime(), // created_at
run.status, // status
environmentType, // environment_type
run.friendlyId, // friendly_id
run.attemptNumber ?? 1, // attempt
run.engine, // engine
run.taskIdentifier, // task_identifier
run.queue, // queue
run.scheduleId ?? "", // schedule_id
run.batchId ?? "", // batch_id
run.completedAt?.getTime() ?? null, // completed_at
run.startedAt?.getTime() ?? null, // started_at
run.executedAt?.getTime() ?? null, // executed_at
run.delayUntil?.getTime() ?? null, // delay_until
run.queuedAt?.getTime() ?? null, // queued_at
run.expiredAt?.getTime() ?? null, // expired_at
run.usageDurationMs ?? 0, // usage_duration_ms
run.costInCents ?? 0, // cost_in_cents
run.baseCostInCents ?? 0, // base_cost_in_cents
output, // output
{ data: run.error }, // error
run.runTags ?? [], // tags
run.taskVersion ?? "", // task_version
run.sdkVersion ?? "", // sdk_version
run.cliVersion ?? "", // cli_version
run.machinePreset ?? "", // machine_preset
run.rootTaskRunId ?? "", // root_run_id
run.parentTaskRunId ?? "", // parent_run_id
run.depth ?? 0, // depth
run.spanId, // span_id
run.traceId, // trace_id
run.idempotencyKey ?? "", // idempotency_key
run.ttl ?? "", // expiration_ttl
run.isTest ?? false, // is_test
_version.toString(), // _version
event === "delete" ? 1 : 0, // _is_deleted
run.concurrencyKey ?? "", // concurrency_key
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
run.masterQueue ?? "", // worker_queue
run.maxDurationInSeconds ?? null, // max_duration_in_seconds
];
}

async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<RawTaskRunPayloadV1> {
async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<PayloadInsertArray> {
const payload = await this.#prepareJson(run.payload, run.payloadType);

return {
run_id: run.id,
created_at: run.createdAt.getTime(),
payload,
};
// Return array matching PAYLOAD_COLUMNS order
return [
run.id, // run_id
run.createdAt.getTime(), // created_at
payload, // payload
];
}

async #prepareJson(
Expand Down
Loading