Summary
Rework the existing PoC streaming logic so that evaluations are delivered to the correct warehouse connection per organisation, instead of a single hardcoded Snowflake account. The new loader runs as an AWS Lambda invoked per connection.
Depends on: #7276 (connection CRUD + credential storage)
Context
connector/snowflake_batch_loader.py currently loads from a single S3 prefix into one hardcoded Snowflake account. With configurable connections (#7276), the loader must run per connection with its own credentials and Snowflake destination. Connections are per-organisation (1:1), so the existing per-org S3 layout is reused — no change to how evaluations are written.
The new loader is a Lambda function triggered by a recurring task in the Flagsmith task processor — the task fans out one Lambda invocation per active connection on each tick.
Scope
1. Per-connection batching
- Reuses the existing per-org S3 structure
- Flush trigger: recurring task. Each tick, the Lambda loads whatever is in that org's S3 prefix at that moment.
2. Lambda loader
Port snowflake_batch_loader.py to a Lambda handler:
- Event payload:
{ "connection_id": "..." }
- Credentials and Snowflake config are read from the connection row + secret store on invocation
- Concurrency: dedupe by connection ID (e.g. DB-level advisory lock or
in_flight flag on the connection row) so overlapping task-processor ticks don't collide
- Packaged + deployed alongside the rest of the Flagsmith infra (Terraform / existing Lambda pattern)
3. Retry logic and dead-letter queue
- Exponential backoff on
PUT / COPY failures, capped at a small number of retries per invocation
- On exhaustion, move the affected batch to a DLQ prefix (e.g.
dead-letter/{org_id}/) preserving the original objects for later replay
- Subsequent invocations do not re-process DLQ'd batches automatically
4. Metrics emission
After each run, the Lambda POSTs a metrics payload to a core API endpoint (endpoint itself to be added on core later — out of scope here):
- Events delivered (count)
- Completion timestamp
- DLQ'd batch count (if any)
Core persists them and serves them back via #7289.
Out of scope (tracked separately)
- Ingest routing (event-service → S3)
- Task processor recurring task that invokes the Lambda per connection
- Core API endpoint that receives metrics from the Lambda
- Per-flag / per-environment filtering
- Schema audit
Acceptance criteria
- Two orgs with different Snowflake accounts receive their evaluation data in their respective warehouses, isolated end-to-end
- Failure loading into Org A (e.g. paused warehouse) does not affect Org B
- Batches that exhaust retries land in a DLQ prefix and can be inspected
- Metrics are POSTed to core after each run
Summary
Rework the existing PoC streaming logic so that evaluations are delivered to the correct warehouse connection per organisation, instead of a single hardcoded Snowflake account. The new loader runs as an AWS Lambda invoked per connection.
Depends on: #7276 (connection CRUD + credential storage)
Context
connector/snowflake_batch_loader.pycurrently loads from a single S3 prefix into one hardcoded Snowflake account. With configurable connections (#7276), the loader must run per connection with its own credentials and Snowflake destination. Connections are per-organisation (1:1), so the existing per-org S3 layout is reused — no change to how evaluations are written.The new loader is a Lambda function triggered by a recurring task in the Flagsmith task processor — the task fans out one Lambda invocation per active connection on each tick.
Scope
1. Per-connection batching
2. Lambda loader
Port
snowflake_batch_loader.pyto a Lambda handler:{ "connection_id": "..." }in_flightflag on the connection row) so overlapping task-processor ticks don't collide3. Retry logic and dead-letter queue
PUT/COPYfailures, capped at a small number of retries per invocationdead-letter/{org_id}/) preserving the original objects for later replay4. Metrics emission
After each run, the Lambda
POSTs a metrics payload to a core API endpoint (endpoint itself to be added on core later — out of scope here):Core persists them and serves them back via #7289.
Out of scope (tracked separately)
Acceptance criteria