Skip to content

Add recurring task to fan out per-connection warehouse loader invocations #7294

@gagantrivedi

Description

@gagantrivedi

Summary

Register a recurring task in the Flagsmith task processor that, on each tick, iterates over active warehouse connections, logs a metrics row with started_at, and invokes the Lambda loader (#7292) with that run's ID. The Lambda updates the same row on completion via #7293.

Depends on: #7292 (Lambda loader), #7276 (connection model), #7293 (metrics endpoint)

Scope

Recurring task

Register in the warehouse-connections app (same pattern as organisations/tasks.py):

@register_recurring_task(run_every=timedelta(hours=1))
def invoke_warehouse_loaders() -> None:
    ...

Task body

For each active connection (status IN ('connected', 'error'), skipping pending_customer_setup and rotating):

  1. Insert a metrics row: { connection_id, started_at: now, completed_at: null, events_delivered: null, batches_dlq: null, error: null } — returns a run_id
  2. Invoke the Lambda asynchronously with { "connection_id": "<id>", "run_id": "<run_id>" }
  3. Lambda reports completion via Add metrics ingest endpoint for warehouse loader #7293 keyed by run_id, which updates the row

A single failed Lambda invocation must not prevent fan-out to the remaining connections.

Why pre-create the row

Interval

Default run_every=timedelta(hours=1) — matches the PRD's "within 1 hour" latency target.

Acceptance criteria

  • Task registered and visible in the task processor's recurring-task list
  • On each tick, a metrics row is created and one Lambda invocation per active connection is fired with the run_id
  • Connections in non-active statuses are skipped
  • Failure invoking Lambda for one connection does not affect others
  • Orphaned runs (started with no completion) remain visible in the metrics table

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions