diff --git a/.changeset/forty-houses-doubt.md b/.changeset/forty-houses-doubt.md new file mode 100644 index 0000000000..0f768b1dae --- /dev/null +++ b/.changeset/forty-houses-doubt.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/sdk": minor +"@trigger.dev/core": minor +--- + +Add GCRA rate limiting algorithm for task queue management diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 1cc0db0bf0..2986761b65 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -765,6 +765,12 @@ const EnvironmentSchema = z RUN_ENGINE_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"), RUN_ENGINE_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"), + /** + * Disable queue rate limiting (useful for development and testing). + * When set to "1", rate limit checks on queues will be bypassed. + */ + TRIGGER_DISABLE_QUEUE_RATE_LIMITS: z.string().default("0"), + RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"), RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS: z.string().default("0"), RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1), diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index ab32682811..6fbec81128 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -323,6 +323,7 @@ export class RunEngineTriggerTaskService { sdkVersion: lockedToBackgroundWorker?.sdkVersion, cliVersion: lockedToBackgroundWorker?.cliVersion, concurrencyKey: body.options?.concurrencyKey, + rateLimitKey: body.options?.rateLimitKey, queue: queueName, lockedQueueId, workerQueue, diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 5f88d5f6a4..65538adfaf 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -80,6 +80,7 @@ function createRunEngine() { scanJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS, processMarkedJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS, }, + disableRateLimits: env.TRIGGER_DISABLE_QUEUE_RATE_LIMITS === "1", }, runLock: { redis: { diff --git a/apps/webapp/app/v3/runQueue.server.ts b/apps/webapp/app/v3/runQueue.server.ts index e7aa13c5c5..b532d49340 100644 --- a/apps/webapp/app/v3/runQueue.server.ts +++ b/apps/webapp/app/v3/runQueue.server.ts @@ -2,8 +2,21 @@ import { type AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { marqs } from "./marqs/index.server"; import { engine } from "./runEngine.server"; +// Re-export pure utility function from durations.ts (testable without env deps) +export { parseDurationToMs } from "./utils/durations"; + //This allows us to update MARQS and the RunQueue +/** Rate limit configuration for a queue */ +export type QueueRateLimitConfig = { + /** Maximum number of requests allowed in the period */ + limit: number; + /** Time window in milliseconds */ + periodMs: number; + /** Optional burst allowance (defaults to limit) */ + burst?: number; +}; + /** Updates MARQS and the RunQueue limits */ export async function updateEnvConcurrencyLimits( environment: AuthenticatedEnvironment, @@ -42,3 +55,20 @@ export async function removeQueueConcurrencyLimits( engine.runQueue.removeQueueConcurrencyLimits(environment, queueName), ]); } + +/** Updates the rate limit configuration for a queue in Redis */ +export async function updateQueueRateLimitConfig( + environment: AuthenticatedEnvironment, + queueName: string, + config: QueueRateLimitConfig +) { + await engine.runQueue.setQueueRateLimitConfig(environment, queueName, config); +} + +/** Removes the rate limit configuration for a queue from Redis */ +export async function removeQueueRateLimitConfig( + environment: AuthenticatedEnvironment, + queueName: string +) { + await engine.runQueue.removeQueueRateLimitConfig(environment, queueName); +} diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index 41fbf2afe2..7e49b9d656 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -9,14 +9,19 @@ import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic"; import type { BackgroundWorker, TaskQueue, TaskQueueType } from "@trigger.dev/database"; import cronstrue from "cronstrue"; import { Prisma, PrismaClientOrTransaction } from "~/db.server"; +import { env } from "~/env.server"; import { sanitizeQueueName } from "~/models/taskQueue.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { + parseDurationToMs, removeQueueConcurrencyLimits, + removeQueueRateLimitConfig, updateEnvConcurrencyLimits, updateQueueConcurrencyLimits, + updateQueueRateLimitConfig, + type QueueRateLimitConfig, } from "../runQueue.server"; import { calculateNextBuildVersion } from "../utils/calculateNextBuildVersion"; import { clampMaxDuration } from "../utils/maxDuration"; @@ -255,6 +260,7 @@ async function createWorkerTask( { name: task.queue?.name ?? `task/${task.id}`, concurrencyLimit: task.queue?.concurrencyLimit, + rateLimit: task.queue?.rateLimit, }, task.id, task.queue?.name ? "NAMED" : "VIRTUAL", @@ -364,9 +370,27 @@ async function createWorkerQueue( ? Math.max(Math.min(queue.concurrencyLimit, environment.maximumConcurrencyLimit), 0) : queue.concurrencyLimit; + let rateLimitConfig: QueueRateLimitConfig | null = null; + if (queue.rateLimit) { + try { + rateLimitConfig = { + limit: queue.rateLimit.limit, + periodMs: parseDurationToMs(queue.rateLimit.period), + burst: queue.rateLimit.burst, + }; + } catch (error) { + logger.error("createWorkerQueue: invalid rate limit period format", { + queueName, + rateLimit: queue.rateLimit, + error, + }); + } + } + const taskQueue = await upsertWorkerQueueRecord( queueName, baseConcurrencyLimit ?? null, + rateLimitConfig, orderableName, queueType, worker, @@ -397,8 +421,36 @@ async function createWorkerQueue( }); await removeQueueConcurrencyLimits(environment, taskQueue.name); } + + // Handle rate limit config sync to Redis + if (env.TRIGGER_DISABLE_QUEUE_RATE_LIMITS === "1") { + // Rate limiting disabled: remove any existing config from Redis + // This ensures clean state when toggling the flag + logger.debug("createWorkerQueue: rate limiting disabled by env flag, removing config", { + workerId: worker.id, + taskQueue: taskQueue.name, + orgId: environment.organizationId, + projectId: environment.projectId, + environmentId: environment.id, + }); + await removeQueueRateLimitConfig(environment, taskQueue.name); + } else if (rateLimitConfig) { + // Rate limiting enabled and config exists: sync to Redis + logger.debug("createWorkerQueue: updating rate limit config", { + workerId: worker.id, + taskQueue: taskQueue.name, + orgId: environment.organizationId, + projectId: environment.projectId, + environmentId: environment.id, + rateLimitConfig, + }); + await updateQueueRateLimitConfig(environment, taskQueue.name, rateLimitConfig); + } else { + // Rate limiting enabled but no config: remove any stale config + await removeQueueRateLimitConfig(environment, taskQueue.name); + } } else { - logger.debug("createWorkerQueue: queue is paused, not updating concurrency limit", { + logger.debug("createWorkerQueue: queue is paused, not updating limits", { workerId: worker.id, taskQueue, orgId: environment.organizationId, @@ -413,6 +465,7 @@ async function createWorkerQueue( async function upsertWorkerQueueRecord( queueName: string, concurrencyLimit: number | null, + rateLimitConfig: QueueRateLimitConfig | null, orderableName: string, queueType: TaskQueueType, worker: BackgroundWorker, @@ -431,6 +484,15 @@ async function upsertWorkerQueueRecord( }, }); + // Serialize rate limit config for storage (or null to clear) + const rateLimitData = rateLimitConfig + ? { + limit: rateLimitConfig.limit, + periodMs: rateLimitConfig.periodMs, + burst: rateLimitConfig.burst, + } + : Prisma.JsonNull; + if (!taskQueue) { taskQueue = await prisma.taskQueue.create({ data: { @@ -439,6 +501,7 @@ async function upsertWorkerQueueRecord( name: queueName, orderableName, concurrencyLimit, + rateLimit: rateLimitData, runtimeEnvironmentId: worker.runtimeEnvironmentId, projectId: worker.projectId, type: queueType, @@ -463,6 +526,8 @@ async function upsertWorkerQueueRecord( // If overridden, keep current limit and update base; otherwise update limit normally concurrencyLimit: hasOverride ? undefined : concurrencyLimit, concurrencyLimitBase: hasOverride ? concurrencyLimit : undefined, + // Always update rate limit config (not overrideable for now) + rateLimit: rateLimitData, }, }); } @@ -474,6 +539,7 @@ async function upsertWorkerQueueRecord( return await upsertWorkerQueueRecord( queueName, concurrencyLimit, + rateLimitConfig, orderableName, queueType, worker, diff --git a/apps/webapp/app/v3/services/enqueueRun.server.ts b/apps/webapp/app/v3/services/enqueueRun.server.ts index cb091d70d9..9b3114c88a 100644 --- a/apps/webapp/app/v3/services/enqueueRun.server.ts +++ b/apps/webapp/app/v3/services/enqueueRun.server.ts @@ -7,6 +7,7 @@ export type EnqueueRunOptions = { env: AuthenticatedEnvironment; run: TaskRun; dependentRun?: { queue: string; id: string }; + rateLimitKey?: string; }; export type EnqueueRunResult = @@ -22,6 +23,7 @@ export async function enqueueRun({ env, run, dependentRun, + rateLimitKey, }: EnqueueRunOptions): Promise { // If this is a triggerAndWait or batchTriggerAndWait, // we need to add the parent run to the reserve concurrency set @@ -39,6 +41,8 @@ export async function enqueueRun({ projectId: env.projectId, environmentId: env.id, environmentType: env.type, + // Include rateLimitKey in message payload for dequeue-time checks + rateLimitKey, }, run.concurrencyKey ?? undefined, run.queueTimestamp ?? undefined, diff --git a/apps/webapp/app/v3/utils/durations.ts b/apps/webapp/app/v3/utils/durations.ts new file mode 100644 index 0000000000..ff7a1745d0 --- /dev/null +++ b/apps/webapp/app/v3/utils/durations.ts @@ -0,0 +1,32 @@ +/** + * Parse a duration string (e.g., "1s", "100ms", "5m", "1h", "1d") to milliseconds. + * @throws Error if the duration string is invalid + */ +export function parseDurationToMs(duration: string): number { + const match = duration.match(/^(\d+(?:\.\d+)?)(ms|s|m|h|d)$/); + + if (!match) { + throw new Error( + `Invalid duration string: "${duration}". Expected format: number + unit (ms, s, m, h, d)` + ); + } + + const [, value, unit] = match; + const numValue = parseFloat(value); + + switch (unit) { + case "ms": + return Math.round(numValue); + case "s": + return Math.round(numValue * 1000); + case "m": + return Math.round(numValue * 60 * 1000); + case "h": + return Math.round(numValue * 60 * 60 * 1000); + case "d": + return Math.round(numValue * 24 * 60 * 60 * 1000); + default: + throw new Error(`Unknown duration unit: ${unit}`); + } +} + diff --git a/apps/webapp/test/parseDurationToMs.test.ts b/apps/webapp/test/parseDurationToMs.test.ts new file mode 100644 index 0000000000..6dbe1476b4 --- /dev/null +++ b/apps/webapp/test/parseDurationToMs.test.ts @@ -0,0 +1,47 @@ +import { describe, it, expect } from "vitest"; +import { parseDurationToMs } from "~/v3/utils/durations"; + +describe("parseDurationToMs", () => { + it("parses milliseconds", () => { + expect(parseDurationToMs("100ms")).toBe(100); + expect(parseDurationToMs("1500ms")).toBe(1500); + expect(parseDurationToMs("0ms")).toBe(0); + }); + + it("parses seconds", () => { + expect(parseDurationToMs("1s")).toBe(1000); + expect(parseDurationToMs("30s")).toBe(30000); + expect(parseDurationToMs("1.5s")).toBe(1500); + expect(parseDurationToMs("0.5s")).toBe(500); + }); + + it("parses minutes", () => { + expect(parseDurationToMs("1m")).toBe(60000); + expect(parseDurationToMs("5m")).toBe(300000); + expect(parseDurationToMs("0.5m")).toBe(30000); + }); + + it("parses hours", () => { + expect(parseDurationToMs("1h")).toBe(3600000); + expect(parseDurationToMs("24h")).toBe(86400000); + expect(parseDurationToMs("0.5h")).toBe(1800000); + }); + + it("parses days", () => { + expect(parseDurationToMs("1d")).toBe(86400000); + expect(parseDurationToMs("7d")).toBe(604800000); + }); + + it("throws on invalid format", () => { + expect(() => parseDurationToMs("invalid")).toThrow(); + expect(() => parseDurationToMs("1x")).toThrow(); + expect(() => parseDurationToMs("")).toThrow(); + expect(() => parseDurationToMs("ms")).toThrow(); + expect(() => parseDurationToMs("10")).toThrow(); + }); + + it("throws on negative values (invalid regex)", () => { + expect(() => parseDurationToMs("-1s")).toThrow(); + }); +}); + diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 28c8332966..4d88ae7285 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -660,6 +660,7 @@ model TaskRun { priorityMs Int @default(0) concurrencyKey String? + rateLimitKey String? delayUntil DateTime? queuedAt DateTime? diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 1b53d6378d..11c6c353f0 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -181,6 +181,7 @@ export class RunEngine { masterQueueConsumersIntervalMs: options.queue?.masterQueueConsumersIntervalMs, processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs, dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds, + disableRateLimits: options.queue?.disableRateLimits, meter: options.meter, }); @@ -405,6 +406,7 @@ export class RunEngine { sdkVersion, cliVersion, concurrencyKey, + rateLimitKey, workerQueue, queue, lockedQueueId, @@ -554,6 +556,7 @@ export class RunEngine { sdkVersion, cliVersion, concurrencyKey, + rateLimitKey, queue, lockedQueueId, workerQueue, @@ -668,6 +671,7 @@ export class RunEngine { if (taskRun.delayUntil) { // Schedule the run to be enqueued at the delayUntil time + // Note: rateLimitKey is not passed for delayed runs - it will need to be stored on the run if needed await this.delayedRunSystem.scheduleDelayedRunEnqueuing({ runId: taskRun.id, delayUntil: taskRun.delayUntil, @@ -705,6 +709,7 @@ export class RunEngine { runnerId, tx: prisma, skipRunLock: true, + rateLimitKey, }); } diff --git a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts index 395e44727c..e5eccc7674 100644 --- a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts @@ -34,6 +34,7 @@ export class EnqueueSystem { workerId, runnerId, skipRunLock, + rateLimitKey, }: { run: TaskRun; env: MinimalAuthenticatedEnvironment; @@ -53,6 +54,7 @@ export class EnqueueSystem { workerId?: string; runnerId?: string; skipRunLock?: boolean; + rateLimitKey?: string; }) { const prisma = tx ?? this.$.prisma; @@ -81,6 +83,12 @@ export class EnqueueSystem { const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs; + // IMPORTANT: Use provided rateLimitKey or fall back to the one stored on the run. + // This ensures re-enqueued runs (checkpoint, delay, waitpoint, pendingVersion) + // maintain their original rate limit bucket. Future callers should rely on this + // fallback rather than passing rateLimitKey explicitly for re-enqueue scenarios. + const effectiveRateLimitKey = rateLimitKey ?? run.rateLimitKey ?? undefined; + await this.$.runQueue.enqueueMessage({ env, workerQueue, @@ -93,6 +101,7 @@ export class EnqueueSystem { environmentType: env.type, queue: run.queue, concurrencyKey: run.concurrencyKey ?? undefined, + rateLimitKey: effectiveRateLimitKey, timestamp, attempt: 0, }, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 3b2ae8c9a1..b4334eb24c 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -63,6 +63,7 @@ export type RunEngineOptions = { scanJitterInMs?: number; processMarkedJitterInMs?: number; }; + disableRateLimits?: boolean; }; runLock: { redis: RedisOptions; @@ -133,6 +134,7 @@ export type TriggerParams = { sdkVersion?: string; cliVersion?: string; concurrencyKey?: string; + rateLimitKey?: string; workerQueue?: string; queue: string; lockedQueueId?: string; diff --git a/internal-packages/run-engine/src/index.ts b/internal-packages/run-engine/src/index.ts index 3f96045c13..be82af0c55 100644 --- a/internal-packages/run-engine/src/index.ts +++ b/internal-packages/run-engine/src/index.ts @@ -20,3 +20,13 @@ export type { ProcessBatchItemCallback, BatchCompletionCallback, } from "./batch-queue/types.js"; + +// Rate limiter exports +export { GCRARateLimiter, configToGCRAParams, parseDurationToMs } from "./rate-limiter/index.js"; +export type { + GCRARateLimiterOptions, + GCRAParams, + QueueRateLimitConfig, + StoredQueueRateLimitConfig, + RateLimitResult, +} from "./rate-limiter/index.js"; diff --git a/internal-packages/run-engine/src/rate-limiter/gcra.ts b/internal-packages/run-engine/src/rate-limiter/gcra.ts new file mode 100644 index 0000000000..f506b03842 --- /dev/null +++ b/internal-packages/run-engine/src/rate-limiter/gcra.ts @@ -0,0 +1,250 @@ +import type { Redis } from "@internal/redis"; + +/** + * Configuration for queue rate limiting (input format). + */ +export interface QueueRateLimitConfig { + /** Maximum number of requests allowed within the period */ + limit: number; + /** Time period in milliseconds */ + periodMs: number; + /** Optional burst capacity (defaults to 1) */ + burst?: number; +} + +/** + * Stored configuration for queue rate limiting (includes pre-calculated GCRA params). + * This is what gets stored in Redis and read by the Lua dequeue script. + */ +export interface StoredQueueRateLimitConfig extends QueueRateLimitConfig, GCRAParams {} + +/** + * GCRA parameters calculated from QueueRateLimitConfig. + * These are stored in Redis for use by the Lua dequeue script. + */ +export interface GCRAParams { + /** The minimum interval between requests in milliseconds */ + emissionInterval: number; + /** The burst tolerance in milliseconds */ + burstTolerance: number; + /** Key expiration in milliseconds */ + keyExpiration: number; +} + +/** + * Options for configuring the RateLimiter. + */ +export interface GCRARateLimiterOptions { + /** An instance of Redis. */ + redis: Redis; + /** + * A string prefix to namespace keys in Redis. + * Defaults to "ratelimit:". + */ + keyPrefix?: string; + /** + * The minimum interval between requests (the emission interval) in milliseconds. + * For example, 1000 ms for one request per second. + */ + emissionInterval: number; + /** + * The burst tolerance in milliseconds. This represents how much "credit" can be + * accumulated to allow short bursts beyond the average rate. + * For example, if you want to allow 3 requests in a burst with an emission interval of 1000 ms, + * you might set this to 3000. + */ + burstTolerance: number; + /** + * Expiration for the Redis key in milliseconds. + * Defaults to the larger of 60 seconds or (emissionInterval + burstTolerance). + */ + keyExpiration?: number; +} + +/** + * The result of a rate limit check. + */ +export interface RateLimitResult { + /** Whether the request is allowed. */ + allowed: boolean; + /** + * If not allowed, this is the number of milliseconds the caller should wait + * before retrying. + */ + retryAfter?: number; +} + +/** + * Parse a duration string (e.g., "1s", "100ms", "5m", "1h", "1d") to milliseconds. + * @throws Error if the duration string is invalid + */ +export function parseDurationToMs(duration: string): number { + const match = duration.match(/^(\d+(?:\.\d+)?)(ms|s|m|h|d)$/); + + if (!match) { + throw new Error( + `Invalid duration string: "${duration}". Expected format: number + unit (ms, s, m, h, d)` + ); + } + + const [, value, unit] = match; + const numValue = parseFloat(value); + + switch (unit) { + case "ms": + return Math.round(numValue); + case "s": + return Math.round(numValue * 1000); + case "m": + return Math.round(numValue * 60 * 1000); + case "h": + return Math.round(numValue * 60 * 60 * 1000); + case "d": + return Math.round(numValue * 24 * 60 * 60 * 1000); + default: + throw new Error(`Unknown duration unit: ${unit}`); + } +} + +/** + * Convert a QueueRateLimitConfig to GCRA parameters. + * + * @example + * // 10 requests per minute with burst of 3 + * configToGCRAParams({ limit: 10, periodMs: 60000, burst: 3 }) + * // => { emissionInterval: 6000, burstTolerance: 12000, keyExpiration: 60000 } + */ +export function configToGCRAParams(config: QueueRateLimitConfig): GCRAParams { + const emissionInterval = Math.ceil(config.periodMs / config.limit); + const burst = config.burst ?? 1; + // burst-1 because GCRA allows 1 request immediately, then burst-1 more within tolerance + const burstTolerance = (burst - 1) * emissionInterval; + const keyExpiration = Math.max(60_000, emissionInterval + burstTolerance); + return { emissionInterval, burstTolerance, keyExpiration }; +} + +/** + * A rate limiter using Redis and the Generic Cell Rate Algorithm (GCRA). + * + * The GCRA is implemented using a Lua script that runs atomically in Redis. + * + * When a request comes in, the algorithm: + * - Retrieves the current "Theoretical Arrival Time" (TAT) from Redis (or initializes it if missing). + * - If the current time is greater than or equal to the TAT, the request is allowed and the TAT is updated to now + emissionInterval. + * - Otherwise, if the current time plus the burst tolerance is at least the TAT, the request is allowed and the TAT is incremented. + * - If neither condition is met, the request is rejected and a Retry-After value is returned. + */ +export class GCRARateLimiter { + private redis: Redis; + private keyPrefix: string; + private emissionInterval: number; + private burstTolerance: number; + private keyExpiration: number; + + constructor(options: GCRARateLimiterOptions) { + this.redis = options.redis; + this.keyPrefix = options.keyPrefix || "gcra:ratelimit:"; + this.emissionInterval = options.emissionInterval; + this.burstTolerance = options.burstTolerance; + // Default expiration: at least 60 seconds or the sum of emissionInterval and burstTolerance + this.keyExpiration = + options.keyExpiration || Math.max(60_000, this.emissionInterval + this.burstTolerance); + + // Define a custom Redis command 'gcra' that implements the GCRA algorithm. + // Using defineCommand ensures the Lua script is loaded once and run atomically. + this.redis.defineCommand("gcra", { + numberOfKeys: 1, + lua: ` +--[[ + GCRA Lua script + KEYS[1] - The rate limit key (e.g. "ratelimit:") + ARGV[1] - Current time in ms (number) + ARGV[2] - Emission interval in ms (number) + ARGV[3] - Burst tolerance in ms (number) + ARGV[4] - Key expiration in ms (number) + + Returns: { allowedFlag, value } + allowedFlag: 1 if allowed, 0 if rate-limited. + value: 0 when allowed; if not allowed, the number of ms to wait. +]]-- + +local key = KEYS[1] +local now = tonumber(ARGV[1]) +local emission_interval = tonumber(ARGV[2]) +local burst_tolerance = tonumber(ARGV[3]) +local expire = tonumber(ARGV[4]) + +-- Get the stored Theoretical Arrival Time (TAT) or default to 0. +local tat = tonumber(redis.call("GET", key) or 0) +if tat == 0 then + tat = now +end + +local allowed, new_tat, retry_after + +if now >= tat then + -- No delay: request is on schedule. + new_tat = now + emission_interval + allowed = true +elseif (now + burst_tolerance) >= tat then + -- Within burst capacity: allow request. + new_tat = tat + emission_interval + allowed = true +else + -- Request exceeds the allowed burst; calculate wait time. + allowed = false + retry_after = tat - (now + burst_tolerance) +end + +if allowed then + redis.call("SET", key, new_tat, "PX", expire) + return {1, 0} +else + return {0, retry_after} +end +`, + }); + } + + /** + * Checks whether a request associated with the given identifier is allowed. + * + * @param identifier A unique string identifying the subject of rate limiting (e.g. user ID, IP address, or domain). + * @returns A promise that resolves to a RateLimitResult. + * + * @example + * const result = await rateLimiter.check('user:12345'); + * if (!result.allowed) { + * // Tell the client to retry after result.retryAfter milliseconds. + * } + */ + async check(identifier: string): Promise { + const key = `${this.keyPrefix}${identifier}`; + const now = Date.now(); + + try { + // Call the custom 'gcra' command. + // The script returns an array: [allowedFlag, value] + // - allowedFlag: 1 if allowed; 0 if rejected. + // - value: 0 when allowed; if rejected, the number of ms to wait before retrying. + const result: [number, number] = await (this.redis as any).gcra( + key, + now, + this.emissionInterval, + this.burstTolerance, + this.keyExpiration + ); + const allowed = result[0] === 1; + if (allowed) { + return { allowed: true }; + } else { + return { allowed: false, retryAfter: result[1] }; + } + } catch (error) { + // In a production system you might log the error and either + // allow the request (fail open) or deny it (fail closed). + // Here we choose to propagate the error. + throw error; + } + } +} diff --git a/internal-packages/run-engine/src/rate-limiter/index.ts b/internal-packages/run-engine/src/rate-limiter/index.ts new file mode 100644 index 0000000000..4d0a7e020a --- /dev/null +++ b/internal-packages/run-engine/src/rate-limiter/index.ts @@ -0,0 +1,10 @@ +export { + GCRARateLimiter, + configToGCRAParams, + parseDurationToMs, + type GCRARateLimiterOptions, + type GCRAParams, + type QueueRateLimitConfig, + type StoredQueueRateLimitConfig, + type RateLimitResult, +} from "./gcra.js"; diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 5127ec3c75..aabe840964 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -35,6 +35,11 @@ import { attributesFromAuthenticatedEnv, MinimalAuthenticatedEnvironment, } from "../shared/index.js"; +import { + configToGCRAParams, + type QueueRateLimitConfig, + type StoredQueueRateLimitConfig, +} from "../rate-limiter/index.js"; import { InputPayload, OutputPayload, @@ -92,6 +97,7 @@ export type RunQueueOptions = { processMarkedJitterInMs?: number; callback: ConcurrencySweeperCallback; }; + disableRateLimits?: boolean; }; export interface ConcurrencySweeperCallback { @@ -323,6 +329,53 @@ export class RunQueue { return this.redis.del(this.keys.queueConcurrencyLimitKey(env, queue)); } + /** + * Set rate limit configuration for a queue in Redis. + * Config is stored as JSON with 7-day TTL (refreshed on deploy). + * Pre-calculates GCRA params so the Lua script doesn't need to parse duration strings. + */ + public async setQueueRateLimitConfig( + env: MinimalAuthenticatedEnvironment, + queue: string, + config: QueueRateLimitConfig + ) { + const key = this.keys.queueRateLimitConfigKey(env, queue); + const gcraParams = configToGCRAParams(config); + + // Store config with pre-calculated GCRA params for efficient Lua processing + const storedConfig = { + ...config, + ...gcraParams, + }; + + // Store with 7-day TTL, refreshed on each deploy + return this.redis.set(key, JSON.stringify(storedConfig), "EX", 86400 * 7); + } + + /** + * Remove rate limit configuration for a queue from Redis. + */ + public async removeQueueRateLimitConfig(env: MinimalAuthenticatedEnvironment, queue: string) { + return this.redis.del(this.keys.queueRateLimitConfigKey(env, queue)); + } + + /** + * Get rate limit configuration for a queue. + */ + public async getQueueRateLimitConfig( + env: MinimalAuthenticatedEnvironment, + queue: string + ): Promise { + const result = await this.redis.get(this.keys.queueRateLimitConfigKey(env, queue)); + if (!result) return undefined; + + try { + return JSON.parse(result) as StoredQueueRateLimitConfig; + } catch { + return undefined; + } + } + public async getQueueConcurrencyLimit(env: MinimalAuthenticatedEnvironment, queue: string) { const result = await this.redis.get(this.keys.queueConcurrencyLimitKey(env, queue)); @@ -1563,7 +1616,8 @@ export class RunQueue { String(this.options.defaultEnvConcurrency), String(this.options.defaultEnvConcurrencyBurstFactor ?? 1), this.options.redis.keyPrefix ?? "", - String(maxCount) + String(maxCount), + this.options.disableRateLimits ? "1" : "0" ); if (!result) { @@ -2337,6 +2391,34 @@ local defaultEnvConcurrencyLimit = ARGV[3] local defaultEnvConcurrencyBurstFactor = ARGV[4] local keyPrefix = ARGV[5] local maxCount = tonumber(ARGV[6] or '1') +local rateLimitDisabled = ARGV[7] or '0' + +-- GCRA Rate Limit Check Function +-- Returns: allowed (boolean), retryAfter (number or nil) +local function gcra_check(bucket_key, now, emission_interval, burst_tolerance, expire) + local tat = tonumber(redis.call("GET", bucket_key) or 0) + if tat == 0 then + tat = now + end + + if now >= tat then + -- No delay, request is on schedule + redis.call("SET", bucket_key, now + emission_interval, "PX", expire) + return true, 0 + elseif (now + burst_tolerance) >= tat then + -- Within burst capacity + redis.call("SET", bucket_key, tat + emission_interval, "PX", expire) + return true, 0 + else + -- Exceeded rate limit + return false, tat - (now + burst_tolerance) + end +end + +-- Helper to get base queue key (strip :ck:* suffix for rate limit keys) +local function get_base_queue(queue) + return string.gsub(queue, ":ck:[^:]+$", "") +end -- Check current env concurrency against the limit local envCurrentConcurrency = tonumber(redis.call('SCARD', envCurrentConcurrencyKey) or '0') @@ -2367,6 +2449,18 @@ if actualMaxCount <= 0 then return nil end +-- Pre-fetch rate limit config if rate limiting is enabled +local rateLimitConfig = nil +if rateLimitDisabled ~= '1' then + local baseQueue = get_base_queue(queueName) + -- Use keyPrefix when building rate limit keys (ioredis adds prefix to stored keys) + local rateLimitConfigKey = keyPrefix .. baseQueue .. ':rl:config' + local configJson = redis.call('GET', rateLimitConfigKey) + if configJson then + rateLimitConfig = cjson.decode(configJson) + end +end + -- Attempt to dequeue messages up to actualMaxCount local messages = redis.call('ZRANGEBYSCORE', queueKey, '-inf', currentTime, 'WITHSCORES', 'LIMIT', 0, actualMaxCount) @@ -2387,18 +2481,47 @@ for i = 1, #messages, 2 do local messagePayload = redis.call('GET', messageKey) if messagePayload then - -- Update concurrency - redis.call('ZREM', queueKey, messageId) - redis.call('ZREM', envQueueKey, messageId) - redis.call('SADD', queueCurrentConcurrencyKey, messageId) - redis.call('SADD', envCurrentConcurrencyKey, messageId) + -- Check rate limit BEFORE updating concurrency + local shouldDequeue = true - -- Add to results - table.insert(results, messageId) - table.insert(results, messageScore) - table.insert(results, messagePayload) + if rateLimitConfig then + local messageData = cjson.decode(messagePayload) + local rateLimitKey = messageData.rateLimitKey or 'global' + local baseQueue = get_base_queue(queueName) + -- Use keyPrefix for bucket key consistency + local bucketKey = keyPrefix .. baseQueue .. ':rl:' .. rateLimitKey + + local allowed, retryAfter = gcra_check( + bucketKey, + currentTime, + rateLimitConfig.emissionInterval, + rateLimitConfig.burstTolerance, + rateLimitConfig.keyExpiration + ) + + if not allowed then + -- Rate limited: re-score message with delay (retryAfter + jitter) + local jitter = math.random(0, 100) + local newScore = currentTime + retryAfter + jitter + redis.call('ZADD', queueKey, newScore, messageId) + shouldDequeue = false + end + end - dequeuedCount = dequeuedCount + 1 + if shouldDequeue then + -- Update concurrency + redis.call('ZREM', queueKey, messageId) + redis.call('ZREM', envQueueKey, messageId) + redis.call('SADD', queueCurrentConcurrencyKey, messageId) + redis.call('SADD', envCurrentConcurrencyKey, messageId) + + -- Add to results + table.insert(results, messageId) + table.insert(results, messageScore) + table.insert(results, messagePayload) + + dequeuedCount = dequeuedCount + 1 + end end end @@ -2766,6 +2889,7 @@ declare module "@internal/redis" { defaultEnvConcurrencyBurstFactor: string, keyPrefix: string, maxCount: string, + rateLimitDisabled: string, callback?: Callback ): Result; diff --git a/internal-packages/run-engine/src/run-queue/keyProducer.ts b/internal-packages/run-engine/src/run-queue/keyProducer.ts index cff3b78af7..afd8bcdbc1 100644 --- a/internal-packages/run-engine/src/run-queue/keyProducer.ts +++ b/internal-packages/run-engine/src/run-queue/keyProducer.ts @@ -17,6 +17,8 @@ const constants = { DEAD_LETTER_QUEUE_PART: "deadLetter", MASTER_QUEUE_PART: "masterQueue", WORKER_QUEUE_PART: "workerQueue", + RATE_LIMIT_PART: "rl", + RATE_LIMIT_CONFIG_PART: "rl:config", } as const; export class RunQueueFullKeyProducer implements RunQueueKeyProducer { @@ -301,6 +303,60 @@ export class RunQueueFullKeyProducer implements RunQueueKeyProducer { return `*:${constants.ENV_PART}:*:queue:*:${constants.CURRENT_CONCURRENCY_PART}`; } + /** + * Key for storing rate limit configuration for a queue. + * Pattern: {org:X}:proj:Y:env:Z:queue:Q:rl:config + */ + queueRateLimitConfigKey(env: RunQueueKeyProducerEnvironment, queue: string): string { + return [this.queueKeyBase(env, queue), constants.RATE_LIMIT_CONFIG_PART].join(":"); + } + + /** + * Key for the GCRA rate limit bucket for a queue. + * If rateLimitKey is provided, creates a separate bucket per key (per-tenant). + * Pattern: {org:X}:proj:Y:env:Z:queue:Q:rl[:key] + */ + queueRateLimitBucketKey( + env: RunQueueKeyProducerEnvironment, + queue: string, + rateLimitKey?: string + ): string { + const base = [this.queueKeyBase(env, queue), constants.RATE_LIMIT_PART].join(":"); + return rateLimitKey ? `${base}:${rateLimitKey}` : base; + } + + /** + * Get rate limit config key from a queue key. + * Strips concurrency key suffix if present. + */ + queueRateLimitConfigKeyFromQueue(queue: string): string { + // Remove concurrency key suffix to get base queue + const baseQueue = queue.replace(/:ck:.+$/, ""); + return `${baseQueue}:${constants.RATE_LIMIT_CONFIG_PART}`; + } + + /** + * Get rate limit bucket key from a queue key. + */ + queueRateLimitBucketKeyFromQueue(queue: string, rateLimitKey?: string): string { + // Remove concurrency key suffix to get base queue + const baseQueue = queue.replace(/:ck:.+$/, ""); + const base = `${baseQueue}:${constants.RATE_LIMIT_PART}`; + return rateLimitKey ? `${base}:${rateLimitKey}` : base; + } + + /** + * Helper to get the base queue key (without concurrency key). + */ + private queueKeyBase(env: RunQueueKeyProducerEnvironment, queue: string): string { + return [ + this.orgKeySection(env.organization.id), + this.projKeySection(env.project.id), + this.envKeySection(env.id), + this.queueSection(queue), + ].join(":"); + } + descriptorFromQueue(queue: string): QueueDescriptor { const parts = queue.split(":"); return { diff --git a/internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts b/internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts index bf4ed87f29..fb937568b3 100644 --- a/internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts @@ -126,4 +126,69 @@ describe("RunQueue.enqueueMessage", () => { await queue.quit(); } }); + + redisTest("should enqueue message with rateLimitKey", async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const messageWithRateLimit: InputPayload = { + runId: "r-ratelimit-test", + taskIdentifier: "task/my-task", + orgId: "o1234", + projectId: "p1234", + environmentId: "e4321", + environmentType: "DEVELOPMENT", + queue: "task/my-task", + timestamp: Date.now(), + attempt: 0, + rateLimitKey: "tenant-123", + }; + + // Initial queue length + const initialLength = await queue.lengthOfQueue( + authenticatedEnvDev, + messageWithRateLimit.queue + ); + expect(initialLength).toBe(0); + + // Enqueue message with rateLimitKey + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: messageWithRateLimit, + workerQueue: authenticatedEnvDev.id, + }); + + // Verify queue length increased + const newLength = await queue.lengthOfQueue(authenticatedEnvDev, messageWithRateLimit.queue); + expect(newLength).toBe(1); + + await setTimeout(1000); + + // Dequeue and verify rateLimitKey is preserved + const dequeued = await queue.dequeueMessageFromWorkerQueue( + "test_ratelimit", + authenticatedEnvDev.id + ); + assertNonNullable(dequeued); + expect(dequeued.messageId).toEqual(messageWithRateLimit.runId); + expect(dequeued.message.rateLimitKey).toEqual("tenant-123"); + } finally { + await queue.quit(); + } + }); }); diff --git a/internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts b/internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts index 8b980749ea..90cda1509d 100644 --- a/internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts @@ -359,4 +359,76 @@ describe("KeyProducer", () => { concurrencyKey: "c1234", }); }); + + // Rate Limit Key Tests + it("queueRateLimitConfigKey", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const key = keyProducer.queueRateLimitConfigKey( + { + id: "e1234", + type: "PRODUCTION", + project: { id: "p1234" }, + organization: { id: "o1234" }, + }, + "task/task-name" + ); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl:config"); + }); + + it("queueRateLimitBucketKey (without rateLimitKey)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const key = keyProducer.queueRateLimitBucketKey( + { + id: "e1234", + type: "PRODUCTION", + project: { id: "p1234" }, + organization: { id: "o1234" }, + }, + "task/task-name" + ); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl"); + }); + + it("queueRateLimitBucketKey (with rateLimitKey)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const key = keyProducer.queueRateLimitBucketKey( + { + id: "e1234", + type: "PRODUCTION", + project: { id: "p1234" }, + organization: { id: "o1234" }, + }, + "task/task-name", + "tenant-123" + ); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl:tenant-123"); + }); + + it("queueRateLimitConfigKeyFromQueue (strips concurrency key)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const queueKey = "{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:ck:c1234"; + const key = keyProducer.queueRateLimitConfigKeyFromQueue(queueKey); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl:config"); + }); + + it("queueRateLimitConfigKeyFromQueue (no concurrency key)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const queueKey = "{org:o1234}:proj:p1234:env:e1234:queue:task/task-name"; + const key = keyProducer.queueRateLimitConfigKeyFromQueue(queueKey); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl:config"); + }); + + it("queueRateLimitBucketKeyFromQueue (with rateLimitKey)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const queueKey = "{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:ck:c1234"; + const key = keyProducer.queueRateLimitBucketKeyFromQueue(queueKey, "tenant-456"); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl:tenant-456"); + }); + + it("queueRateLimitBucketKeyFromQueue (without rateLimitKey)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const queueKey = "{org:o1234}:proj:p1234:env:e1234:queue:task/task-name"; + const key = keyProducer.queueRateLimitBucketKeyFromQueue(queueKey); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:rl"); + }); }); diff --git a/internal-packages/run-engine/src/run-queue/tests/rateLimit.test.ts b/internal-packages/run-engine/src/run-queue/tests/rateLimit.test.ts new file mode 100644 index 0000000000..09d7635761 --- /dev/null +++ b/internal-packages/run-engine/src/run-queue/tests/rateLimit.test.ts @@ -0,0 +1,403 @@ +import { assertNonNullable, redisTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { Logger } from "@trigger.dev/core/logger"; +import { describe, expect, vi } from "vitest"; +import { setTimeout } from "node:timers/promises"; +import { FairQueueSelectionStrategy } from "../fairQueueSelectionStrategy.js"; +import { RunQueue } from "../index.js"; +import { RunQueueFullKeyProducer } from "../keyProducer.js"; +import { InputPayload } from "../types.js"; +import { Decimal } from "@trigger.dev/database"; + +const testOptions = { + name: "rq", + tracer: trace.getTracer("rq"), + workers: 1, + defaultEnvConcurrency: 25, + logger: new Logger("RunQueue", "warn"), + retryOptions: { + maxAttempts: 5, + factor: 1.1, + minTimeoutInMs: 100, + maxTimeoutInMs: 1_000, + randomize: true, + }, + keys: new RunQueueFullKeyProducer(), +}; + +const authenticatedEnvDev = { + id: "e1234", + type: "DEVELOPMENT" as const, + maximumConcurrencyLimit: 10, + concurrencyLimitBurstFactor: new Decimal(2.0), + project: { id: "p1234" }, + organization: { id: "o1234" }, +}; + +function createMessage( + runId: string, + queue: string = "task/my-task", + rateLimitKey?: string +): InputPayload { + return { + runId, + taskIdentifier: "task/my-task", + orgId: "o1234", + projectId: "p1234", + environmentId: "e1234", + environmentType: "DEVELOPMENT", + queue, + timestamp: Date.now(), + attempt: 0, + rateLimitKey, + }; +} + +vi.setConfig({ testTimeout: 60_000 }); + +describe("RunQueue rate limiting", () => { + redisTest( + "basic rate limiting - respects limit when config exists", + async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:rl-test-basic:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:rl-test-basic:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + // Set rate limit: 2 per 10 minutes (emissionInterval = 300000ms = 5 min) + // Using a long period ensures rate limit doesn't recover during test execution + await queue.setQueueRateLimitConfig(authenticatedEnvDev, "task/my-task", { + limit: 2, + periodMs: 600000, // 10 minutes + burst: 2, + }); + + // Enqueue 5 messages + for (let i = 0; i < 5; i++) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage(`run-${i}`), + workerQueue: authenticatedEnvDev.id, + }); + } + + await setTimeout(500); + + // Note: We don't verify initial queue length here because the background worker + // may have already started processing. The important test is rate limiting behavior. + + // Dequeue multiple times - with burst=2, only 2 should pass per burst window + const dequeued: (unknown | undefined)[] = []; + for (let i = 0; i < 5; i++) { + const msg = await queue.dequeueMessageFromWorkerQueue( + `test_rl_${i}`, + authenticatedEnvDev.id + ); + dequeued.push(msg); + } + + // Count how many were actually dequeued vs rate-limited + const dequeuedCount = dequeued.filter((d) => d !== undefined).length; + const rateLimitedCount = dequeued.filter((d) => d === undefined).length; + + // With burst=2 and 5 messages, at most 2 should be dequeued immediately + // (the exact count may vary due to background worker timing, but rate limiting should be active) + expect(dequeuedCount).toBeLessThanOrEqual(2); + expect(rateLimitedCount).toBeGreaterThanOrEqual(3); + + // Concurrency should not exceed burst limit + const concurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + "task/my-task" + ); + expect(concurrency).toBeLessThanOrEqual(2); + + // Rate-limited messages should still be in queue (rescheduled for later) + const remainingLength = await queue.lengthOfQueue(authenticatedEnvDev, "task/my-task"); + expect(remainingLength).toBeGreaterThan(0); + } finally { + await queue.quit(); + } + } + ); + + redisTest("rate limiting disabled - all messages dequeued", async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + disableRateLimits: true, // Rate limiting disabled + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:rl-test-disabled:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:rl-test-disabled:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + // Set strict rate limit: 1 per minute + await queue.setQueueRateLimitConfig(authenticatedEnvDev, "task/my-task", { + limit: 1, + periodMs: 60000, + burst: 1, + }); + + // Enqueue 3 messages + for (let i = 0; i < 3; i++) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage(`run-disabled-${i}`), + workerQueue: authenticatedEnvDev.id, + }); + } + + await setTimeout(500); + + // All should be dequeued since rate limiting is disabled + const dequeued1 = await queue.dequeueMessageFromWorkerQueue( + "test_disabled_1", + authenticatedEnvDev.id + ); + const dequeued2 = await queue.dequeueMessageFromWorkerQueue( + "test_disabled_2", + authenticatedEnvDev.id + ); + const dequeued3 = await queue.dequeueMessageFromWorkerQueue( + "test_disabled_3", + authenticatedEnvDev.id + ); + + expect(dequeued1).not.toBeUndefined(); + expect(dequeued2).not.toBeUndefined(); + expect(dequeued3).not.toBeUndefined(); + + // All should be processed (concurrency = 3) + const concurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + "task/my-task" + ); + expect(concurrency).toBe(3); + } finally { + await queue.quit(); + } + }); + + redisTest( + "per-key rate limiting - separate buckets per rateLimitKey", + async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:rl-test-perkey:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:rl-test-perkey:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + // Set rate limit: 1 per minute with burst of 1 + await queue.setQueueRateLimitConfig(authenticatedEnvDev, "task/my-task", { + limit: 1, + periodMs: 60000, + burst: 1, + }); + + // Enqueue 2 messages for tenant-A + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage("run-a1", "task/my-task", "tenant-A"), + workerQueue: authenticatedEnvDev.id, + }); + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage("run-a2", "task/my-task", "tenant-A"), + workerQueue: authenticatedEnvDev.id, + }); + + // Enqueue 2 messages for tenant-B + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage("run-b1", "task/my-task", "tenant-B"), + workerQueue: authenticatedEnvDev.id, + }); + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage("run-b2", "task/my-task", "tenant-B"), + workerQueue: authenticatedEnvDev.id, + }); + + await setTimeout(500); + + // Dequeue all available + const dequeued1 = await queue.dequeueMessageFromWorkerQueue( + "test_perkey_1", + authenticatedEnvDev.id + ); + const dequeued2 = await queue.dequeueMessageFromWorkerQueue( + "test_perkey_2", + authenticatedEnvDev.id + ); + const dequeued3 = await queue.dequeueMessageFromWorkerQueue( + "test_perkey_3", + authenticatedEnvDev.id + ); + const dequeued4 = await queue.dequeueMessageFromWorkerQueue( + "test_perkey_4", + authenticatedEnvDev.id + ); + + // Should get 2 messages (1 from each tenant, since each has independent bucket) + const successfulDequeues = [dequeued1, dequeued2, dequeued3, dequeued4].filter( + (d) => d !== undefined + ); + expect(successfulDequeues.length).toBe(2); + + // Verify we got one from each tenant + const rateLimitKeys = successfulDequeues.map((d) => d!.message.rateLimitKey); + expect(rateLimitKeys).toContain("tenant-A"); + expect(rateLimitKeys).toContain("tenant-B"); + } finally { + await queue.quit(); + } + } + ); + + redisTest("no rate limit config - all messages dequeued normally", async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:rl-test-noconfig:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:rl-test-noconfig:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + // No rate limit config set + + // Enqueue 5 messages + for (let i = 0; i < 5; i++) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage(`run-noconfig-${i}`), + workerQueue: authenticatedEnvDev.id, + }); + } + + await setTimeout(500); + + // All should be dequeued + const dequeued = []; + for (let i = 0; i < 5; i++) { + const d = await queue.dequeueMessageFromWorkerQueue( + `test_noconfig_${i}`, + authenticatedEnvDev.id + ); + if (d) dequeued.push(d); + } + + expect(dequeued.length).toBe(5); + } finally { + await queue.quit(); + } + }); + + redisTest("rate-limited messages do not increment concurrency", async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:rl-test-concurrency:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:rl-test-concurrency:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + // Set strict rate limit: 1 per minute, burst 1 + await queue.setQueueRateLimitConfig(authenticatedEnvDev, "task/my-task", { + limit: 1, + periodMs: 60000, + burst: 1, + }); + + // Initial concurrency should be 0 + const initialConcurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + "task/my-task" + ); + expect(initialConcurrency).toBe(0); + + // Enqueue 3 messages + for (let i = 0; i < 3; i++) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: createMessage(`run-conc-${i}`), + workerQueue: authenticatedEnvDev.id, + }); + } + + await setTimeout(500); + + // Try to dequeue multiple times + await queue.dequeueMessageFromWorkerQueue("test_conc_1", authenticatedEnvDev.id); + await queue.dequeueMessageFromWorkerQueue("test_conc_2", authenticatedEnvDev.id); + await queue.dequeueMessageFromWorkerQueue("test_conc_3", authenticatedEnvDev.id); + + // Only 1 should have passed rate limit, so concurrency should be 1 + const finalConcurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + "task/my-task" + ); + expect(finalConcurrency).toBe(1); + + // Queue should still have 2 messages (rescheduled for later) + const queueLength = await queue.lengthOfQueue(authenticatedEnvDev, "task/my-task"); + expect(queueLength).toBe(2); + } finally { + await queue.quit(); + } + }); +}); diff --git a/internal-packages/run-engine/src/run-queue/types.ts b/internal-packages/run-engine/src/run-queue/types.ts index ee1ce41b79..bde5266eea 100644 --- a/internal-packages/run-engine/src/run-queue/types.ts +++ b/internal-packages/run-engine/src/run-queue/types.ts @@ -11,6 +11,7 @@ export const InputPayload = z.object({ environmentType: z.nativeEnum(RuntimeEnvironmentType), queue: z.string(), concurrencyKey: z.string().optional(), + rateLimitKey: z.string().optional(), timestamp: z.number(), attempt: z.number(), }); @@ -120,6 +121,16 @@ export interface RunQueueKeyProducer { // Concurrency sweeper methods markedForAckKey(): string; currentConcurrencySetKeyScanPattern(): string; + + // Rate limiting keys + queueRateLimitConfigKey(env: RunQueueKeyProducerEnvironment, queue: string): string; + queueRateLimitBucketKey( + env: RunQueueKeyProducerEnvironment, + queue: string, + rateLimitKey?: string + ): string; + queueRateLimitConfigKeyFromQueue(queue: string): string; + queueRateLimitBucketKeyFromQueue(queue: string, rateLimitKey?: string): string; } export type EnvQueues = { diff --git a/packages/core/src/v3/schemas/__tests__/rateLimit.test.ts b/packages/core/src/v3/schemas/__tests__/rateLimit.test.ts new file mode 100644 index 0000000000..5889116e31 --- /dev/null +++ b/packages/core/src/v3/schemas/__tests__/rateLimit.test.ts @@ -0,0 +1,115 @@ +import { describe, it, expect } from "vitest"; +import { QueueRateLimitConfigSchema, DurationStringSchema } from "../schemas.js"; + +describe("DurationStringSchema", () => { + it("validates milliseconds", () => { + expect(DurationStringSchema.safeParse("100ms").success).toBe(true); + expect(DurationStringSchema.safeParse("1500ms").success).toBe(true); + }); + + it("validates seconds", () => { + expect(DurationStringSchema.safeParse("1s").success).toBe(true); + expect(DurationStringSchema.safeParse("30s").success).toBe(true); + expect(DurationStringSchema.safeParse("1.5s").success).toBe(true); + }); + + it("validates minutes", () => { + expect(DurationStringSchema.safeParse("1m").success).toBe(true); + expect(DurationStringSchema.safeParse("60m").success).toBe(true); + }); + + it("validates hours", () => { + expect(DurationStringSchema.safeParse("1h").success).toBe(true); + expect(DurationStringSchema.safeParse("24h").success).toBe(true); + }); + + it("validates days", () => { + expect(DurationStringSchema.safeParse("1d").success).toBe(true); + expect(DurationStringSchema.safeParse("7d").success).toBe(true); + }); + + it("rejects invalid formats", () => { + expect(DurationStringSchema.safeParse("invalid").success).toBe(false); + expect(DurationStringSchema.safeParse("1x").success).toBe(false); + expect(DurationStringSchema.safeParse("").success).toBe(false); + expect(DurationStringSchema.safeParse("ms").success).toBe(false); + expect(DurationStringSchema.safeParse("10").success).toBe(false); + expect(DurationStringSchema.safeParse("-1s").success).toBe(false); + }); +}); + +describe("QueueRateLimitConfigSchema", () => { + it("parses valid config with all fields", () => { + const result = QueueRateLimitConfigSchema.safeParse({ + limit: 100, + period: "1m", + burst: 20, + }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.limit).toBe(100); + expect(result.data.period).toBe("1m"); + expect(result.data.burst).toBe(20); + } + }); + + it("parses config without optional burst", () => { + const result = QueueRateLimitConfigSchema.safeParse({ + limit: 10, + period: "1s", + }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.limit).toBe(10); + expect(result.data.period).toBe("1s"); + expect(result.data.burst).toBeUndefined(); + } + }); + + it("accepts various period formats", () => { + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "100ms" }).success).toBe(true); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "30s" }).success).toBe(true); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "5m" }).success).toBe(true); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1h" }).success).toBe(true); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1d" }).success).toBe(true); + }); + + it("rejects invalid period formats", () => { + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1x" }).success).toBe(false); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "invalid" }).success).toBe( + false + ); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "" }).success).toBe(false); + }); + + it("requires positive limit", () => { + expect(QueueRateLimitConfigSchema.safeParse({ limit: 0, period: "1s" }).success).toBe(false); + expect(QueueRateLimitConfigSchema.safeParse({ limit: -1, period: "1s" }).success).toBe(false); + }); + + it("requires limit to be an integer", () => { + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10.5, period: "1s" }).success).toBe(false); + }); + + it("requires burst to be a positive integer when provided", () => { + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1s", burst: 0 }).success).toBe( + false + ); + expect( + QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1s", burst: -1 }).success + ).toBe(false); + expect( + QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1s", burst: 5.5 }).success + ).toBe(false); + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10, period: "1s", burst: 5 }).success).toBe( + true + ); + }); + + it("rejects missing required fields", () => { + expect(QueueRateLimitConfigSchema.safeParse({ limit: 10 }).success).toBe(false); + expect(QueueRateLimitConfigSchema.safeParse({ period: "1s" }).success).toBe(false); + expect(QueueRateLimitConfigSchema.safeParse({}).success).toBe(false); + }); +}); + diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 5e5fff18ea..551e5f7c61 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -188,6 +188,11 @@ export const TriggerTaskRequestBody = z.object({ }) .optional(), concurrencyKey: z.string().optional(), + /** + * Rate limit key for per-tenant/per-user rate limiting. + * Creates a separate rate limit bucket for every unique value. + */ + rateLimitKey: z.string().optional(), delay: z.string().or(z.coerce.date()).optional(), idempotencyKey: z.string().optional(), idempotencyKeyTTL: z.string().optional(), @@ -237,6 +242,11 @@ export const BatchTriggerTaskItem = z.object({ options: z .object({ concurrencyKey: z.string().optional(), + /** + * Rate limit key for per-tenant/per-user rate limiting. + * Creates a separate rate limit bucket for every unique value. + */ + rateLimitKey: z.string().optional(), delay: z.string().or(z.coerce.date()).optional(), idempotencyKey: z.string().optional(), idempotencyKeyTTL: z.string().optional(), diff --git a/packages/core/src/v3/schemas/schemas.ts b/packages/core/src/v3/schemas/schemas.ts index 233068c0b7..5b6ac61663 100644 --- a/packages/core/src/v3/schemas/schemas.ts +++ b/packages/core/src/v3/schemas/schemas.ts @@ -102,6 +102,19 @@ export const RateLimitOptions = z.discriminatedUnion("type", [ export type RateLimitOptions = z.infer; +// Duration string format: number + unit (ms, s, m, h, d) +export const DurationStringSchema = z.string().regex(/^\d+(\.\d+)?(ms|s|m|h|d)$/); +export type DurationString = z.infer; + +// Queue Rate limit configuration schema (uses DurationStringSchema for period validation) +export const QueueRateLimitConfigSchema = z.object({ + limit: z.number().int().positive(), + period: DurationStringSchema, + burst: z.number().int().positive().optional(), +}); + +export type QueueRateLimitConfig = z.infer; + export const RetryOptions = z.object({ /** The number of attempts before giving up */ maxAttempts: z.number().int().optional(), @@ -133,6 +146,20 @@ export const RetryOptions = z.object({ export type RetryOptions = z.infer; +/** Rate limit configuration using GCRA (Generic Cell Rate Algorithm) */ +export const RateLimitConfig = z.object({ + /** Maximum number of requests allowed in the period */ + limit: z.number().int().min(1).max(100000), + /** Time window - must be a valid duration string (e.g., "1s", "100ms", "5m", "1h") */ + period: z.string().regex(/^\d+(\.\d+)?(ms|s|m|h|d)$/, { + message: 'Period must be a valid duration string (e.g., "1s", "100ms", "5m", "1h")', + }), + /** Optional burst allowance - allows temporary exceeding of rate limit (defaults to limit) */ + burst: z.number().int().min(1).optional(), +}); + +export type RateLimitConfig = z.infer; + export const QueueManifest = z.object({ /** You can define a shared queue and then pass the name in to your task. * @@ -170,6 +197,21 @@ export const QueueManifest = z.object({ * * If this property is omitted, the task can potentially use up the full concurrency of an environment */ concurrencyLimit: z.number().int().min(0).max(100000).optional().nullable(), + /** Optional rate limit configuration for controlling request frequency. + * + * Unlike concurrencyLimit (which controls how many tasks run at once), + * rateLimit controls how frequently tasks can be dequeued. + * + * @example + * ```ts + * // Limit to 10 requests per second + * rateLimit: { limit: 10, period: "1s" } + * + * // Limit to 100 requests per minute with burst allowance + * rateLimit: { limit: 100, period: "1m", burst: 150 } + * ``` + */ + rateLimit: RateLimitConfig.optional(), }); export type QueueManifest = z.infer; diff --git a/packages/core/src/v3/types/queues.ts b/packages/core/src/v3/types/queues.ts index 9e87f136e2..26b69593cf 100644 --- a/packages/core/src/v3/types/queues.ts +++ b/packages/core/src/v3/types/queues.ts @@ -35,4 +35,38 @@ export type QueueOptions = { * * If this property is omitted, the task can potentially use up the full concurrency of an environment */ concurrencyLimit?: number; + /** Rate limit configuration for controlling request frequency. + * + * Unlike concurrencyLimit (which controls how many tasks run at once), + * rateLimit controls how frequently tasks can be dequeued. + * + * @example + * ```ts + * const rateLimitedQueue = queue({ + * name: "api-calls", + * rateLimit: { + * limit: 10, + * period: "1s", + * }, + * }); + * + * // Per-tenant rate limiting - pass rateLimitKey at trigger time + * await myTask.trigger(payload, { + * rateLimitKey: `tenant-${payload.tenantId}`, + * }); + * + * // Also works with tasks.trigger() + * await tasks.trigger("my-task", payload, { + * rateLimitKey: `tenant-${tenantId}`, + * }); + * ``` + */ + rateLimit?: { + /** Maximum number of requests allowed in the period */ + limit: number; + /** Time window as a duration string (e.g., "1s", "100ms", "5m", "1h") */ + period: string; + /** Optional burst allowance (defaults to limit) */ + burst?: number; + }; }; diff --git a/packages/core/src/v3/types/tasks.ts b/packages/core/src/v3/types/tasks.ts index f463b20f49..ae708fca5e 100644 --- a/packages/core/src/v3/types/tasks.ts +++ b/packages/core/src/v3/types/tasks.ts @@ -220,10 +220,65 @@ type CommonTaskOptions< }, }); * ``` + * + * @example + * rate limiting to 10 requests per second + * + * ```ts + * export const rateLimitedTask = task({ + id: "rate-limited", + queue: { + rateLimit: { + limit: 10, + period: "1s", + }, + }, + run: async ({ payload, ctx }) => { + //... + }, + }); + * ``` + * + * @example + * per-tenant rate limiting - pass rateLimitKey at trigger time + * + * ```ts + * // Define task with rate limit + * export const perTenantTask = task({ + id: "per-tenant", + queue: { + rateLimit: { + limit: 100, + period: "1m", + }, + }, + run: async ({ payload, ctx }) => { + //... + }, + }); + * + * // Trigger with rateLimitKey option + * await perTenantTask.trigger(payload, { + * rateLimitKey: `tenant-${payload.tenantId}`, + * }); + * ``` */ queue?: { name?: string; concurrencyLimit?: number; + /** Rate limit configuration for controlling request frequency. + * + * Unlike concurrencyLimit (which controls how many tasks run at once), + * rateLimit controls how frequently tasks can be dequeued. + */ + rateLimit?: { + /** Maximum number of requests allowed in the period */ + limit: number; + /** Time window as a duration string (e.g., "1s", "100ms", "5m", "1h") */ + period: string; + /** Optional burst allowance (defaults to limit) */ + burst?: number; + }; }; /** Configure the spec of the [machine](https://trigger.dev/docs/machines) you want your task to run on. * @@ -783,6 +838,20 @@ export type TriggerOptions = { */ concurrencyKey?: string; + /** + * The `rateLimitKey` creates a separate rate limit bucket for every unique value of the key. + * This allows per-tenant or per-user rate limiting. + * + * @example + * ```ts + * await myTask.trigger(payload, { rateLimitKey: `tenant-${tenantId}` }); + * + * // Also works with tasks.trigger() + * await tasks.trigger("my-task", payload, { rateLimitKey: `tenant-${tenantId}` }); + * ``` + */ + rateLimitKey?: string; + /** * The delay before the task is executed. This can be a string like "1h" or a Date object. * diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index 87025f0816..fe4010c433 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -129,7 +129,11 @@ export type Context = TaskRunContext; export { BatchTriggerError }; export function queue(options: QueueOptions): Queue { - resourceCatalog.registerQueueMetadata(options); + resourceCatalog.registerQueueMetadata({ + name: options.name, + concurrencyLimit: options.concurrencyLimit, + rateLimit: options.rateLimit, + }); // @ts-expect-error options[Symbol.for("trigger.dev/queue")] = true; @@ -248,6 +252,7 @@ export function createTask< resourceCatalog.registerQueueMetadata({ name: queue.name, concurrencyLimit: queue.concurrencyLimit, + rateLimit: queue.rateLimit, }); } @@ -380,6 +385,7 @@ export function createSchemaTask< resourceCatalog.registerQueueMetadata({ name: queue.name, concurrencyLimit: queue.concurrencyLimit, + rateLimit: queue.rateLimit, }); } @@ -1747,6 +1753,7 @@ async function* transformBatchItemsStream( options: { queue: item.options?.queue ? { name: item.options.queue } : undefined, concurrencyKey: item.options?.concurrencyKey, + rateLimitKey: item.options?.rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -1800,6 +1807,7 @@ async function* transformBatchItemsStreamForWait( lockToVersion: taskContext.worker?.version, queue: item.options?.queue ? { name: item.options.queue } : undefined, concurrencyKey: item.options?.concurrencyKey, + rateLimitKey: item.options?.rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -1850,6 +1858,7 @@ async function* transformBatchByTaskItemsStream( ? { name: queue } : undefined, concurrencyKey: item.options?.concurrencyKey, + rateLimitKey: item.options?.rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -2008,6 +2019,7 @@ async function* transformSingleTaskBatchItemsStreamForWait( ? { name: queue } : undefined, concurrencyKey: item.options?.concurrencyKey, + rateLimitKey: item.options?.rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -2049,6 +2061,7 @@ async function trigger_internal( options: { queue: options?.queue ? { name: options.queue } : undefined, concurrencyKey: options?.concurrencyKey, + rateLimitKey: options?.rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey), @@ -2128,6 +2141,7 @@ async function batchTrigger_internal( ? { name: queue } : undefined, concurrencyKey: item.options?.concurrencyKey, + rateLimitKey: item.options?.rateLimitKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, delay: item.options?.delay, @@ -2281,6 +2295,7 @@ async function triggerAndWait_internal