Skip to content

Build per-connection warehouse loader for analytics #7292

@gagantrivedi

Description

@gagantrivedi

Summary

Rework the existing PoC streaming logic so that evaluations are delivered to the correct warehouse connection per organisation, instead of a single hardcoded Snowflake account. The new loader runs as an AWS Lambda invoked per connection.

Depends on: #7276 (connection CRUD + credential storage)

Context

connector/snowflake_batch_loader.py currently loads from a single S3 prefix into one hardcoded Snowflake account. With configurable connections (#7276), the loader must run per connection with its own credentials and Snowflake destination. Connections are per-organisation (1:1), so the existing per-org S3 layout is reused — no change to how evaluations are written.

The new loader is a Lambda function triggered by a recurring task in the Flagsmith task processor — the task fans out one Lambda invocation per active connection on each tick.

Scope

1. Per-connection batching

  • Reuses the existing per-org S3 structure
  • Flush trigger: recurring task. Each tick, the Lambda loads whatever is in that org's S3 prefix at that moment.

2. Lambda loader

Port snowflake_batch_loader.py to a Lambda handler:

  • Event payload: { "connection_id": "..." }
  • Credentials and Snowflake config are read from the connection row + secret store on invocation
  • Concurrency: dedupe by connection ID (e.g. DB-level advisory lock or in_flight flag on the connection row) so overlapping task-processor ticks don't collide
  • Packaged + deployed alongside the rest of the Flagsmith infra (Terraform / existing Lambda pattern)

3. Retry logic and dead-letter queue

  • Exponential backoff on PUT / COPY failures, capped at a small number of retries per invocation
  • On exhaustion, move the affected batch to a DLQ prefix (e.g. dead-letter/{org_id}/) preserving the original objects for later replay
  • Subsequent invocations do not re-process DLQ'd batches automatically

4. Metrics emission

After each run, the Lambda POSTs a metrics payload to a core API endpoint (endpoint itself to be added on core later — out of scope here):

  • Events delivered (count)
  • Completion timestamp
  • DLQ'd batch count (if any)

Core persists them and serves them back via #7289.

Out of scope (tracked separately)

  • Ingest routing (event-service → S3)
  • Task processor recurring task that invokes the Lambda per connection
  • Core API endpoint that receives metrics from the Lambda
  • Per-flag / per-environment filtering
  • Schema audit

Acceptance criteria

  • Two orgs with different Snowflake accounts receive their evaluation data in their respective warehouses, isolated end-to-end
  • Failure loading into Org A (e.g. paused warehouse) does not affect Org B
  • Batches that exhaust retries land in a DLQ prefix and can be inspected
  • Metrics are POSTed to core after each run

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions