Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/forty-houses-doubt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": minor
"@trigger.dev/core": minor
---

Add GCRA rate limiting algorithm for task queue management
6 changes: 6 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,12 @@ const EnvironmentSchema = z
RUN_ENGINE_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),
RUN_ENGINE_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"),

/**
* Disable queue rate limiting (useful for development and testing).
* When set to "1", rate limit checks on queues will be bypassed.
*/
TRIGGER_DISABLE_QUEUE_RATE_LIMITS: z.string().default("0"),

RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"),
RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS: z.string().default("0"),
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1),
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ export class RunEngineTriggerTaskService {
sdkVersion: lockedToBackgroundWorker?.sdkVersion,
cliVersion: lockedToBackgroundWorker?.cliVersion,
concurrencyKey: body.options?.concurrencyKey,
rateLimitKey: body.options?.rateLimitKey,
queue: queueName,
lockedQueueId,
workerQueue,
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ function createRunEngine() {
scanJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS,
processMarkedJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS,
},
disableRateLimits: env.TRIGGER_DISABLE_QUEUE_RATE_LIMITS === "1",
},
runLock: {
redis: {
Expand Down
30 changes: 30 additions & 0 deletions apps/webapp/app/v3/runQueue.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,21 @@ import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { marqs } from "./marqs/index.server";
import { engine } from "./runEngine.server";

// Re-export pure utility function from durations.ts (testable without env deps)
export { parseDurationToMs } from "./utils/durations";

//This allows us to update MARQS and the RunQueue

/** Rate limit configuration for a queue */
export type QueueRateLimitConfig = {
/** Maximum number of requests allowed in the period */
limit: number;
/** Time window in milliseconds */
periodMs: number;
/** Optional burst allowance (defaults to limit) */
burst?: number;
};

/** Updates MARQS and the RunQueue limits */
export async function updateEnvConcurrencyLimits(
environment: AuthenticatedEnvironment,
Expand Down Expand Up @@ -42,3 +55,20 @@ export async function removeQueueConcurrencyLimits(
engine.runQueue.removeQueueConcurrencyLimits(environment, queueName),
]);
}

/** Updates the rate limit configuration for a queue in Redis */
export async function updateQueueRateLimitConfig(
environment: AuthenticatedEnvironment,
queueName: string,
config: QueueRateLimitConfig
) {
await engine.runQueue.setQueueRateLimitConfig(environment, queueName, config);
}

/** Removes the rate limit configuration for a queue from Redis */
export async function removeQueueRateLimitConfig(
environment: AuthenticatedEnvironment,
queueName: string
) {
await engine.runQueue.removeQueueRateLimitConfig(environment, queueName);
}
68 changes: 67 additions & 1 deletion apps/webapp/app/v3/services/createBackgroundWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@ import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
import type { BackgroundWorker, TaskQueue, TaskQueueType } from "@trigger.dev/database";
import cronstrue from "cronstrue";
import { Prisma, PrismaClientOrTransaction } from "~/db.server";
import { env } from "~/env.server";
import { sanitizeQueueName } from "~/models/taskQueue.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { generateFriendlyId } from "../friendlyIdentifiers";
import {
parseDurationToMs,
removeQueueConcurrencyLimits,
removeQueueRateLimitConfig,
updateEnvConcurrencyLimits,
updateQueueConcurrencyLimits,
updateQueueRateLimitConfig,
type QueueRateLimitConfig,
} from "../runQueue.server";
import { calculateNextBuildVersion } from "../utils/calculateNextBuildVersion";
import { clampMaxDuration } from "../utils/maxDuration";
Expand Down Expand Up @@ -255,6 +260,7 @@ async function createWorkerTask(
{
name: task.queue?.name ?? `task/${task.id}`,
concurrencyLimit: task.queue?.concurrencyLimit,
rateLimit: task.queue?.rateLimit,
},
task.id,
task.queue?.name ? "NAMED" : "VIRTUAL",
Expand Down Expand Up @@ -364,9 +370,27 @@ async function createWorkerQueue(
? Math.max(Math.min(queue.concurrencyLimit, environment.maximumConcurrencyLimit), 0)
: queue.concurrencyLimit;

let rateLimitConfig: QueueRateLimitConfig | null = null;
if (queue.rateLimit) {
try {
rateLimitConfig = {
limit: queue.rateLimit.limit,
periodMs: parseDurationToMs(queue.rateLimit.period),
burst: queue.rateLimit.burst,
};
} catch (error) {
logger.error("createWorkerQueue: invalid rate limit period format", {
queueName,
rateLimit: queue.rateLimit,
error,
});
}
}

const taskQueue = await upsertWorkerQueueRecord(
queueName,
baseConcurrencyLimit ?? null,
rateLimitConfig,
orderableName,
queueType,
worker,
Expand Down Expand Up @@ -397,8 +421,36 @@ async function createWorkerQueue(
});
await removeQueueConcurrencyLimits(environment, taskQueue.name);
}

// Handle rate limit config sync to Redis
if (env.TRIGGER_DISABLE_QUEUE_RATE_LIMITS === "1") {
// Rate limiting disabled: remove any existing config from Redis
// This ensures clean state when toggling the flag
logger.debug("createWorkerQueue: rate limiting disabled by env flag, removing config", {
workerId: worker.id,
taskQueue: taskQueue.name,
orgId: environment.organizationId,
projectId: environment.projectId,
environmentId: environment.id,
});
await removeQueueRateLimitConfig(environment, taskQueue.name);
} else if (rateLimitConfig) {
// Rate limiting enabled and config exists: sync to Redis
logger.debug("createWorkerQueue: updating rate limit config", {
workerId: worker.id,
taskQueue: taskQueue.name,
orgId: environment.organizationId,
projectId: environment.projectId,
environmentId: environment.id,
rateLimitConfig,
});
await updateQueueRateLimitConfig(environment, taskQueue.name, rateLimitConfig);
} else {
// Rate limiting enabled but no config: remove any stale config
await removeQueueRateLimitConfig(environment, taskQueue.name);
}
} else {
logger.debug("createWorkerQueue: queue is paused, not updating concurrency limit", {
logger.debug("createWorkerQueue: queue is paused, not updating limits", {
workerId: worker.id,
taskQueue,
orgId: environment.organizationId,
Expand All @@ -413,6 +465,7 @@ async function createWorkerQueue(
async function upsertWorkerQueueRecord(
queueName: string,
concurrencyLimit: number | null,
rateLimitConfig: QueueRateLimitConfig | null,
orderableName: string,
queueType: TaskQueueType,
worker: BackgroundWorker,
Expand All @@ -431,6 +484,15 @@ async function upsertWorkerQueueRecord(
},
});

// Serialize rate limit config for storage (or null to clear)
const rateLimitData = rateLimitConfig
? {
limit: rateLimitConfig.limit,
periodMs: rateLimitConfig.periodMs,
burst: rateLimitConfig.burst,
}
: Prisma.JsonNull;

if (!taskQueue) {
taskQueue = await prisma.taskQueue.create({
data: {
Expand All @@ -439,6 +501,7 @@ async function upsertWorkerQueueRecord(
name: queueName,
orderableName,
concurrencyLimit,
rateLimit: rateLimitData,
runtimeEnvironmentId: worker.runtimeEnvironmentId,
projectId: worker.projectId,
type: queueType,
Expand All @@ -463,6 +526,8 @@ async function upsertWorkerQueueRecord(
// If overridden, keep current limit and update base; otherwise update limit normally
concurrencyLimit: hasOverride ? undefined : concurrencyLimit,
concurrencyLimitBase: hasOverride ? concurrencyLimit : undefined,
// Always update rate limit config (not overrideable for now)
rateLimit: rateLimitData,
},
});
}
Expand All @@ -474,6 +539,7 @@ async function upsertWorkerQueueRecord(
return await upsertWorkerQueueRecord(
queueName,
concurrencyLimit,
rateLimitConfig,
orderableName,
queueType,
worker,
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/services/enqueueRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export type EnqueueRunOptions = {
env: AuthenticatedEnvironment;
run: TaskRun;
dependentRun?: { queue: string; id: string };
rateLimitKey?: string;
};

export type EnqueueRunResult =
Expand All @@ -22,6 +23,7 @@ export async function enqueueRun({
env,
run,
dependentRun,
rateLimitKey,
}: EnqueueRunOptions): Promise<EnqueueRunResult> {
// If this is a triggerAndWait or batchTriggerAndWait,
// we need to add the parent run to the reserve concurrency set
Expand All @@ -39,6 +41,8 @@ export async function enqueueRun({
projectId: env.projectId,
environmentId: env.id,
environmentType: env.type,
// Include rateLimitKey in message payload for dequeue-time checks
rateLimitKey,
},
run.concurrencyKey ?? undefined,
run.queueTimestamp ?? undefined,
Expand Down
32 changes: 32 additions & 0 deletions apps/webapp/app/v3/utils/durations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Parse a duration string (e.g., "1s", "100ms", "5m", "1h", "1d") to milliseconds.
* @throws Error if the duration string is invalid
*/
export function parseDurationToMs(duration: string): number {
const match = duration.match(/^(\d+(?:\.\d+)?)(ms|s|m|h|d)$/);

if (!match) {
throw new Error(
`Invalid duration string: "${duration}". Expected format: number + unit (ms, s, m, h, d)`
);
}

const [, value, unit] = match;
const numValue = parseFloat(value);

switch (unit) {
case "ms":
return Math.round(numValue);
case "s":
return Math.round(numValue * 1000);
case "m":
return Math.round(numValue * 60 * 1000);
case "h":
return Math.round(numValue * 60 * 60 * 1000);
case "d":
return Math.round(numValue * 24 * 60 * 60 * 1000);
default:
throw new Error(`Unknown duration unit: ${unit}`);
}
}

47 changes: 47 additions & 0 deletions apps/webapp/test/parseDurationToMs.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { describe, it, expect } from "vitest";
import { parseDurationToMs } from "~/v3/utils/durations";

describe("parseDurationToMs", () => {
it("parses milliseconds", () => {
expect(parseDurationToMs("100ms")).toBe(100);
expect(parseDurationToMs("1500ms")).toBe(1500);
expect(parseDurationToMs("0ms")).toBe(0);
});

it("parses seconds", () => {
expect(parseDurationToMs("1s")).toBe(1000);
expect(parseDurationToMs("30s")).toBe(30000);
expect(parseDurationToMs("1.5s")).toBe(1500);
expect(parseDurationToMs("0.5s")).toBe(500);
});

it("parses minutes", () => {
expect(parseDurationToMs("1m")).toBe(60000);
expect(parseDurationToMs("5m")).toBe(300000);
expect(parseDurationToMs("0.5m")).toBe(30000);
});

it("parses hours", () => {
expect(parseDurationToMs("1h")).toBe(3600000);
expect(parseDurationToMs("24h")).toBe(86400000);
expect(parseDurationToMs("0.5h")).toBe(1800000);
});

it("parses days", () => {
expect(parseDurationToMs("1d")).toBe(86400000);
expect(parseDurationToMs("7d")).toBe(604800000);
});

it("throws on invalid format", () => {
expect(() => parseDurationToMs("invalid")).toThrow();
expect(() => parseDurationToMs("1x")).toThrow();
expect(() => parseDurationToMs("")).toThrow();
expect(() => parseDurationToMs("ms")).toThrow();
expect(() => parseDurationToMs("10")).toThrow();
});

it("throws on negative values (invalid regex)", () => {
expect(() => parseDurationToMs("-1s")).toThrow();
});
});

1 change: 1 addition & 0 deletions internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ model TaskRun {
priorityMs Int @default(0)

concurrencyKey String?
rateLimitKey String?

delayUntil DateTime?
queuedAt DateTime?
Expand Down
5 changes: 5 additions & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ export class RunEngine {
masterQueueConsumersIntervalMs: options.queue?.masterQueueConsumersIntervalMs,
processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs,
dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds,
disableRateLimits: options.queue?.disableRateLimits,
meter: options.meter,
});

Expand Down Expand Up @@ -405,6 +406,7 @@ export class RunEngine {
sdkVersion,
cliVersion,
concurrencyKey,
rateLimitKey,
workerQueue,
queue,
lockedQueueId,
Expand Down Expand Up @@ -554,6 +556,7 @@ export class RunEngine {
sdkVersion,
cliVersion,
concurrencyKey,
rateLimitKey,
queue,
lockedQueueId,
workerQueue,
Expand Down Expand Up @@ -668,6 +671,7 @@ export class RunEngine {

if (taskRun.delayUntil) {
// Schedule the run to be enqueued at the delayUntil time
// Note: rateLimitKey is not passed for delayed runs - it will need to be stored on the run if needed
await this.delayedRunSystem.scheduleDelayedRunEnqueuing({
runId: taskRun.id,
delayUntil: taskRun.delayUntil,
Expand Down Expand Up @@ -705,6 +709,7 @@ export class RunEngine {
runnerId,
tx: prisma,
skipRunLock: true,
rateLimitKey,
});
}

Expand Down
Loading