fix(file-based): throttle state-message emission to prevent platform OOM on large file streams#1032
Conversation
…OOM on large file streams
FileBasedConcurrentCursor emits a state message after every processed
file. Each state contains the full file-history dict, so on a sync of N
files the cursor emits N states whose sizes grow with N (O(N^2) total
bytes). The platform/orchestrator buffers each state until the
destination ACKs the matching record batch; destinations flush on
time/size, so state messages pile up in orchestrator memory and OOM the
replication pod on large file streams. The downstream symptom is the
source pod being torn down without emitting terminal stream status,
which surfaces at the destination as TransientErrorException("Input was
fully read, but some streams did not receive a terminal stream status
message").
Mirror the throttle pattern from ConcurrentPerPartitionCursor (fix for
oncall #7856) on the file-based concurrent cursor: emit at most one
state per 600 seconds during the sync, and force-emit a final state via
ensure_at_least_one_state_emitted so the platform always receives the
closing state regardless of timing.
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/cdk/file-based-cursor-throttle-state#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/cdk/file-based-cursor-throttle-statePR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
📝 WalkthroughWalkthroughAdds time-based throttling to FileBasedConcurrentCursor state-message emissions with a final-emission bypass, plus a change in FileBasedSource.streams() to instantiate concurrent cursors with the full constructor when sync_mode is None. ChangesState Emission Throttling
Cursor Construction for sync_mode=None
🎯 3 (Moderate) | ⏱️ ~20 minutes Would you like a small integration test that uses time.sleep to validate real-world throttling behavior, wdyt? 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsStopped waiting for pipeline failures after 30000ms. One of your pipelines takes longer than our 30000ms fetch window to run, so review may not consider pipeline-failure results for inline comments if any failures occurred after the fetch window. Increase the timeout if you want to wait longer or run a Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py (1)
184-198: 💤 Low valueWould
time.monotonic()be a better fit for measuring elapsed intervals here, wdyt?
time.time()reflects wall-clock time and can jump backward/forward on NTP adjustments or manual clock changes, which could either prematurely unthrottle or starve emissions for a long stretch.time.monotonic()is purpose-built for measuring elapsed durations and is immune to wall-clock shifts. Totally optional — I see the PR description mentions matching the pattern used inConcurrentPerPartitionCursor, so consistency may outweigh this. Happy to defer.♻️ Optional tweak
- def _throttle_state_message(self) -> Optional[float]: - current_time = time.time() + def _throttle_state_message(self) -> Optional[float]: + current_time = time.monotonic() if ( current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL_SECONDS ): return None return current_time🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py` around lines 184 - 198, Replace wall-clock timing with monotonic timing: in the _throttle_state_message method use time.monotonic() instead of time.time() to compute elapsed intervals against DEFAULT_STATE_EMISSION_INTERVAL_SECONDS and set/compare _last_emission_time using monotonic values; ensure _emit_state_message continues to call _throttle_state_message and assign _last_emission_time the returned monotonic timestamp so emissions are robust to system clock changes and consistent with the pattern used in ConcurrentPerPartitionCursor.unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py (1)
592-626: 💤 Low valueNice coverage of the throttle boundaries — one tiny thought on the patch target, wdyt?
The test patches the global
time.timesymbol, which works because the cursor callstime.time()(attribute lookup on the module) rather than a local rebinding. If anyone ever refactors the cursor tofrom time import time, this patch would silently stop intercepting the call and the tests could keep "passing" while no longer exercising the throttle. Patching the symbol where it's used (e.g.mocker.patch("airbyte_cdk.sources.file_based.stream.concurrent.cursor.file_based_concurrent_cursor.time.time")) would make the test resilient to that refactor. Totally optional polish.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py` around lines 592 - 626, The test_state_throttling currently patches the global time.time which can break silently if emit_state_message or its module changes to use "from time import time"; update the mock to patch the symbol where it's used instead (patch "airbyte_cdk.sources.file_based.stream.concurrent.cursor.file_based_concurrent_cursor.time.time") so emit_state_message continues to be intercepted; keep assertions around cursor._last_emission_time, cursor.emit_state_message, mock_connector_manager.update_state_for_stream and mock_repo.emit_message unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In
`@airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py`:
- Around line 184-198: Replace wall-clock timing with monotonic timing: in the
_throttle_state_message method use time.monotonic() instead of time.time() to
compute elapsed intervals against DEFAULT_STATE_EMISSION_INTERVAL_SECONDS and
set/compare _last_emission_time using monotonic values; ensure
_emit_state_message continues to call _throttle_state_message and assign
_last_emission_time the returned monotonic timestamp so emissions are robust to
system clock changes and consistent with the pattern used in
ConcurrentPerPartitionCursor.
In
`@unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py`:
- Around line 592-626: The test_state_throttling currently patches the global
time.time which can break silently if emit_state_message or its module changes
to use "from time import time"; update the mock to patch the symbol where it's
used instead (patch
"airbyte_cdk.sources.file_based.stream.concurrent.cursor.file_based_concurrent_cursor.time.time")
so emit_state_message continues to be intercepted; keep assertions around
cursor._last_emission_time, cursor.emit_state_message,
mock_connector_manager.update_state_for_stream and mock_repo.emit_message
unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: e494079b-dd97-4462-b4a3-bca1eb0531e1
📒 Files selected for processing (2)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.pyunit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py
There was a problem hiding this comment.
Pull request overview
This PR mitigates orchestrator OOMs on large file-based concurrent streams by reducing the volume of emitted state messages while still guaranteeing a terminal/final state is always sent at stream completion.
Changes:
- Added time-based throttling to
FileBasedConcurrentCursor.emit_state_message()(max 1 emission per 600 seconds). - Ensured
ensure_at_least_one_state_emitted()force-emits a final state by bypassing the throttle. - Added unit tests covering throttling behavior and the final-state bypass.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py |
Introduces throttled state emission and a throttle bypass for the final state message. |
unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py |
Adds tests validating the throttle window behavior and that end-of-stream state emission bypasses throttling. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
/autofix
|
There was a problem hiding this comment.
🧹 Nitpick comments (2)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py (2)
184-188: ⚡ Quick winUse
time.monotonic()for elapsed-time throttling?
time.time()reflects wall-clock time, which can jump (NTP corrections, manual clock changes, DST on some platforms) and isn't guaranteed to be non-decreasing. For measuring an elapsed interval like this throttle,time.monotonic()is the idiomatic choice and immune to system clock adjustments — a backward jump here could suppress state emissions for much longer than 600s, while a forward jump could defeat the throttle entirely. wdyt about switching to monotonic for both the read and the stored timestamp?♻️ Proposed change
def _throttle_state_message(self) -> Optional[float]: - current_time = time.time() + current_time = time.monotonic() if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL_SECONDS: return None return current_time(Initial value
0.0at construction still works sincemonotonic()returns a positive float that is always far greater than 0 in practice — first emission will pass the throttle as intended.)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py` around lines 184 - 188, Replace wall-clock timing with a monotonic clock in the throttling logic: in _throttle_state_message use time.monotonic() instead of time.time(), and ensure any code that initializes or updates the stored timestamp self._last_emission_time (e.g., constructor or wherever you set it after emitting state) also uses time.monotonic() so reads and writes are consistent with DEFAULT_STATE_EMISSION_INTERVAL_SECONDS; initial 0.0 can remain unchanged since monotonic() values will be larger on first check.
337-340: 💤 Low valueShould the final force-emit also update
_last_emission_time?
ensure_at_least_one_state_emittedbypasses the throttle but doesn't refresh_last_emission_time. In practice this is the terminal call so it likely doesn't matter, but if the cursor instance is ever reused or ifadd_filehappens to fire right after (e.g. in tests or future refactors), the nextemit_state_messagecould immediately pass the throttle since the tracker wasn't bumped. Want to update the timestamp here too for symmetry, or is keeping it purely "final-emission-only" semantics intentional? wdyt?🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py` around lines 337 - 340, The final-force emit in ensure_at_least_one_state_emitted currently calls _emit_state_message(throttle=False) but does not update the emission tracker _last_emission_time, which can allow a subsequent emit_state_message or add_file call to bypass throttling unexpectedly; modify ensure_at_least_one_state_emitted to also refresh _last_emission_time (e.g., set self._last_emission_time = current timestamp using the same time source used elsewhere in the class) immediately after or before calling _emit_state_message so the final emission is reflected in the throttle tracker and keeps behavior consistent with normal emits.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In
`@airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py`:
- Around line 184-188: Replace wall-clock timing with a monotonic clock in the
throttling logic: in _throttle_state_message use time.monotonic() instead of
time.time(), and ensure any code that initializes or updates the stored
timestamp self._last_emission_time (e.g., constructor or wherever you set it
after emitting state) also uses time.monotonic() so reads and writes are
consistent with DEFAULT_STATE_EMISSION_INTERVAL_SECONDS; initial 0.0 can remain
unchanged since monotonic() values will be larger on first check.
- Around line 337-340: The final-force emit in ensure_at_least_one_state_emitted
currently calls _emit_state_message(throttle=False) but does not update the
emission tracker _last_emission_time, which can allow a subsequent
emit_state_message or add_file call to bypass throttling unexpectedly; modify
ensure_at_least_one_state_emitted to also refresh _last_emission_time (e.g., set
self._last_emission_time = current timestamp using the same time source used
elsewhere in the class) immediately after or before calling _emit_state_message
so the final emission is reflected in the throttle tracker and keeps behavior
consistent with normal emits.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 43f3a308-8b11-4c0e-91b2-a44488e62e0d
📒 Files selected for processing (1)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py
|
/prerelease
|
|
|
||
| def _throttle_state_message(self) -> Optional[float]: | ||
| current_time = time.time() | ||
| if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL_SECONDS: |
There was a problem hiding this comment.
This should use time.monotonic() instead of time.time() for elapsed-time throttling. time.time() can jump forward or backward due to wall-clock adjustments, which can either bypass the throttle early or suppress state emission far longer than 600 seconds. Since _last_emission_time only measures elapsed duration within this process, a monotonic clock is the safer fit.
…ck branch FileBasedSource.streams() iterates every stream declared in the connector config (not just the catalog). For streams without a catalog entry sync_mode is None and the fallback branch calls `self.cursor_cls(stream_config)`. That single-arg form works for DefaultFileBasedCursor, but FileBasedConcurrentCursor requires 7 args, so the moment a connector switches to cursor_cls=FileBasedConcurrentCursor it crashes during read on any partial-catalog selection, plus on every check/discover (no catalog at all). Detect the cursor flavor and pass the full constructor args when it's a concurrent cursor. Unblocks file-based connectors (e.g. source-s3) from migrating onto the concurrent cursor path and picking up the state emission throttle added earlier in this PR.
|
/prerelease
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
airbyte_cdk/sources/file_based/file_based_source.py (1)
315-333: ⚡ Quick winMake
state_manager=Nonehandling forFileBasedConcurrentCursormore explicit (check/discover)
FileBasedConcurrentCursor.__init__just storesconnector_state_manager; it’s only dereferenced when emitting state in_emit_state_message()(called fromadd_file()). Thecheck_connectionpath parses files directly via the availability strategy (nocursor.add_file()), anddiscoveronly callsStream.get_json_schema()/ schema inference (no cursor usage). So this likely won’t crash incheck/discover.Would you still prefer a defensive route in this
state_manager=Nonebranch (e.g., fall back toFileBasedFinalStateCursoror use a no-op/stubConnectorStateManager), or at least add a clearer comment that cursor methods likeadd_file()won’t run during check/discover?🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte_cdk/sources/file_based/file_based_source.py` around lines 315 - 333, When creating a concurrent cursor in the branch where state_manager may be None, make the handling explicit: if issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor) and state_manager is None, instantiate a safe fallback cursor (e.g., FileBasedFinalStateCursor) instead of calling self.cursor_cls with a None connector_state_manager, or alternatively supply a no-op stub ConnectorStateManager; update the comment near this branch to explain that check/discover paths don't call cursor.add_file()/_emit_state_message() so a None state_manager won't be used, and reference cursor_cls, AbstractConcurrentFileBasedCursor, FileBasedFinalStateCursor, state_manager, add_file, and _emit_state_message in the comment to make the decision clear.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@airbyte_cdk/sources/file_based/file_based_source.py`:
- Around line 315-333: When creating a concurrent cursor in the branch where
state_manager may be None, make the handling explicit: if
issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor) and state_manager
is None, instantiate a safe fallback cursor (e.g., FileBasedFinalStateCursor)
instead of calling self.cursor_cls with a None connector_state_manager, or
alternatively supply a no-op stub ConnectorStateManager; update the comment near
this branch to explain that check/discover paths don't call
cursor.add_file()/_emit_state_message() so a None state_manager won't be used,
and reference cursor_cls, AbstractConcurrentFileBasedCursor,
FileBasedFinalStateCursor, state_manager, add_file, and _emit_state_message in
the comment to make the decision clear.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: b6d72774-5d8a-4f21-81b5-19dc85623672
📒 Files selected for processing (1)
airbyte_cdk/sources/file_based/file_based_source.py
source-s3 previously used the legacy DefaultFileBasedCursor via the
source-s3-specific Cursor subclass. That cursor goes through the legacy
non-concurrent file-based read path, which emits a state message per
processed file. With ~10 k files in a stream, the platform/orchestrator
buffers ~10 k state messages (each carrying the growing history dict)
waiting for the destination to ACK them; on slow destinations the
orchestrator pod OOMs, the source pod is torn down before emitting
terminal stream status, and the destination fails with
TransientErrorException("Input was fully read, but some streams did not
receive a terminal stream status message").
Move source-s3 onto FileBasedConcurrentCursor (CDK) which:
- emits state at most once per 600s during a sync (state throttle
added in airbytehq/airbyte-python-cdk#1032);
- always force-emits a final state via ensure_at_least_one_state_emitted;
- runs the read on the concurrent file-based path with up to
DEFAULT_CONCURRENCY (100) workers.
Verified locally against a customer connection with ~9 k files: state
messages dropped from 9 200 to 2, runtime from 52:46 to 4:24, stdout
volume from 4.1 GB to 922 MB, sync completes successfully with a
STREAM_STATUS COMPLETE.
The legacy source-s3 Cursor class only existed to host the v3-to-v4
state migration shipped in 2023 (#29028). 2.5 years on, all active
customers have completed the migration on their first v4 sync, so the
subclass and its tests are removed. A hypothetical customer with stale
v3 state would trigger one full re-sync, after which they'd be on v4.
Pinning to airbyte-cdk 7.19.2.post3.dev26244645194, a prerelease that
includes both the state throttle and a fallback-branch dispatch fix
for the file-based source so connectors using FileBasedConcurrentCursor
do not crash on partial-catalog reads / check / discover.
Summary
FileBasedConcurrentCursor.emit_state_message()to at most one state per 600 s during a sync.ensure_at_least_one_state_emitted()so the platform always receives the closing state even when the sync finishes inside the throttle window.ConcurrentPerPartitionCursor(PR for oncall #7856), applied to the file-based concurrent cursor.Why
FileBasedConcurrentCursoremits a state message after every processed file. Each state contains the full file-history dict, so on a sync of N files the cursor emits N states whose sizes grow with N (O(N²) total bytes). The platform/orchestrator buffers each state until the destination ACKs the matching record batch; destinations flush on time/size, so state messages pile up in orchestrator memory and OOM the replication pod on large file streams.Downstream symptom: the source pod is torn down without emitting terminal stream status, and the destination fails with:
Related: oncall #7856 (declarative concurrent cursor, fixed by the same throttle pattern) and #12663 (file-based, this fix).
Test plan
unit_tests/sources/file_based/full suite — 788 passed, 1 skipped.test_state_throttlingexercises the throttle at <600 s / >600 s boundaries.test_ensure_at_least_one_state_emitted_bypasses_throttleconfirms the closing state always emits, even when called inside the throttle window.Risk
Low. Same pattern landed previously for
ConcurrentPerPartitionCursorwith no regressions. The throttled emissions are state messages — they are checkpoints, not data; per-file emission was never required for correctness, and the final state is preserved.Summary by CodeRabbit