Skip to content

fix(file-based): throttle state-message emission to prevent platform OOM on large file streams#1032

Open
Anatolii Yatsuk (tolik0) wants to merge 3 commits into
mainfrom
tolik0/cdk/file-based-cursor-throttle-state
Open

fix(file-based): throttle state-message emission to prevent platform OOM on large file streams#1032
Anatolii Yatsuk (tolik0) wants to merge 3 commits into
mainfrom
tolik0/cdk/file-based-cursor-throttle-state

Conversation

@tolik0
Copy link
Copy Markdown
Contributor

@tolik0 Anatolii Yatsuk (tolik0) commented May 21, 2026

Summary

  • Throttle FileBasedConcurrentCursor.emit_state_message() to at most one state per 600 s during a sync.
  • Force-emit a final state from ensure_at_least_one_state_emitted() so the platform always receives the closing state even when the sync finishes inside the throttle window.
  • Mirror of the proven throttle pattern in ConcurrentPerPartitionCursor (PR for oncall #7856), applied to the file-based concurrent cursor.

Why

FileBasedConcurrentCursor emits a state message after every processed file. Each state contains the full file-history dict, so on a sync of N files the cursor emits N states whose sizes grow with N (O(N²) total bytes). The platform/orchestrator buffers each state until the destination ACKs the matching record batch; destinations flush on time/size, so state messages pile up in orchestrator memory and OOM the replication pod on large file streams.

Downstream symptom: the source pod is torn down without emitting terminal stream status, and the destination fails with:

TransientErrorException: Input was fully read, but some streams did not receive a terminal stream status message

Related: oncall #7856 (declarative concurrent cursor, fixed by the same throttle pattern) and #12663 (file-based, this fix).

Test plan

  • unit_tests/sources/file_based/ full suite — 788 passed, 1 skipped.
  • New test_state_throttling exercises the throttle at <600 s / >600 s boundaries.
  • New test_ensure_at_least_one_state_emitted_bypasses_throttle confirms the closing state always emits, even when called inside the throttle window.

Risk

Low. Same pattern landed previously for ConcurrentPerPartitionCursor with no regressions. The throttled emissions are state messages — they are checkpoints, not data; per-file emission was never required for correctness, and the final state is preserved.

Summary by CodeRabbit

  • New Features
    • State message emissions in file-based concurrent streams are now time-throttled to reduce emission frequency, with a guaranteed final state message delivered immediately when needed.
  • Bug Fixes / Improvements
    • Improved handling of streams not selected for sync during checks/discovery so concurrent cursors are created with full context.
  • Tests
    • Added unit tests covering throttling behavior and guaranteed final state emission.

Review Change Stack

…OOM on large file streams

FileBasedConcurrentCursor emits a state message after every processed
file. Each state contains the full file-history dict, so on a sync of N
files the cursor emits N states whose sizes grow with N (O(N^2) total
bytes). The platform/orchestrator buffers each state until the
destination ACKs the matching record batch; destinations flush on
time/size, so state messages pile up in orchestrator memory and OOM the
replication pod on large file streams. The downstream symptom is the
source pod being torn down without emitting terminal stream status,
which surfaces at the destination as TransientErrorException("Input was
fully read, but some streams did not receive a terminal stream status
message").

Mirror the throttle pattern from ConcurrentPerPartitionCursor (fix for
oncall #7856) on the file-based concurrent cursor: emit at most one
state per 600 seconds during the sync, and force-emit a final state via
ensure_at_least_one_state_emitted so the platform always receives the
closing state regardless of timing.
Copilot AI review requested due to automatic review settings May 21, 2026 15:43
@github-actions
Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/cdk/file-based-cursor-throttle-state#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/cdk/file-based-cursor-throttle-state

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 21, 2026

📝 Walkthrough

Walkthrough

Adds time-based throttling to FileBasedConcurrentCursor state-message emissions with a final-emission bypass, plus a change in FileBasedSource.streams() to instantiate concurrent cursors with the full constructor when sync_mode is None.

Changes

State Emission Throttling

Layer / File(s) Summary
Throttling infrastructure and setup
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py
time imported; DEFAULT_STATE_EMISSION_INTERVAL_SECONDS constant and docs added; _last_emission_time initialized to track last emission.
Throttling implementation and final-emission bypass
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py
emit_state_message now routes through _throttle_state_message/_emit_state_message(throttle=...) which conditionally suppress emissions based on elapsed time; ensure_at_least_one_state_emitted calls _emit_state_message(throttle=False) to force final emission.
Throttling behavior validation
unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py
Adds test_state_throttling and test_ensure_at_least_one_state_emitted_bypasses_throttle validating throttling and final-emission bypass.

Cursor Construction for sync_mode=None

Layer / File(s) Summary
streams() cursor instantiation change
airbyte_cdk/sources/file_based/file_based_source.py
When sync_mode is None, instantiate AbstractConcurrentFileBasedCursor using the full constructor signature (stream config, name, intermediate cursor arg as None, stream state, message repository, ConnectorStateManager, and cursor field); non-concurrent cursor behavior unchanged.

🎯 3 (Moderate) | ⏱️ ~20 minutes

Would you like a small integration test that uses time.sleep to validate real-world throttling behavior, wdyt?

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 38.46% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately describes the main change: implementing throttling for state-message emissions in FileBasedConcurrentCursor to address OOM issues on large file streams.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch tolik0/cdk/file-based-cursor-throttle-state

Warning

Review ran into problems

🔥 Problems

Stopped waiting for pipeline failures after 30000ms. One of your pipelines takes longer than our 30000ms fetch window to run, so review may not consider pipeline-failure results for inline comments if any failures occurred after the fetch window. Increase the timeout if you want to wait longer or run a @coderabbit review after the pipeline has finished.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py (1)

184-198: 💤 Low value

Would time.monotonic() be a better fit for measuring elapsed intervals here, wdyt?

time.time() reflects wall-clock time and can jump backward/forward on NTP adjustments or manual clock changes, which could either prematurely unthrottle or starve emissions for a long stretch. time.monotonic() is purpose-built for measuring elapsed durations and is immune to wall-clock shifts. Totally optional — I see the PR description mentions matching the pattern used in ConcurrentPerPartitionCursor, so consistency may outweigh this. Happy to defer.

♻️ Optional tweak
-    def _throttle_state_message(self) -> Optional[float]:
-        current_time = time.time()
+    def _throttle_state_message(self) -> Optional[float]:
+        current_time = time.monotonic()
         if (
             current_time - self._last_emission_time
             <= self.DEFAULT_STATE_EMISSION_INTERVAL_SECONDS
         ):
             return None
         return current_time
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py`
around lines 184 - 198, Replace wall-clock timing with monotonic timing: in the
_throttle_state_message method use time.monotonic() instead of time.time() to
compute elapsed intervals against DEFAULT_STATE_EMISSION_INTERVAL_SECONDS and
set/compare _last_emission_time using monotonic values; ensure
_emit_state_message continues to call _throttle_state_message and assign
_last_emission_time the returned monotonic timestamp so emissions are robust to
system clock changes and consistent with the pattern used in
ConcurrentPerPartitionCursor.
unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py (1)

592-626: 💤 Low value

Nice coverage of the throttle boundaries — one tiny thought on the patch target, wdyt?

The test patches the global time.time symbol, which works because the cursor calls time.time() (attribute lookup on the module) rather than a local rebinding. If anyone ever refactors the cursor to from time import time, this patch would silently stop intercepting the call and the tests could keep "passing" while no longer exercising the throttle. Patching the symbol where it's used (e.g. mocker.patch("airbyte_cdk.sources.file_based.stream.concurrent.cursor.file_based_concurrent_cursor.time.time")) would make the test resilient to that refactor. Totally optional polish.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py`
around lines 592 - 626, The test_state_throttling currently patches the global
time.time which can break silently if emit_state_message or its module changes
to use "from time import time"; update the mock to patch the symbol where it's
used instead (patch
"airbyte_cdk.sources.file_based.stream.concurrent.cursor.file_based_concurrent_cursor.time.time")
so emit_state_message continues to be intercepted; keep assertions around
cursor._last_emission_time, cursor.emit_state_message,
mock_connector_manager.update_state_for_stream and mock_repo.emit_message
unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In
`@airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py`:
- Around line 184-198: Replace wall-clock timing with monotonic timing: in the
_throttle_state_message method use time.monotonic() instead of time.time() to
compute elapsed intervals against DEFAULT_STATE_EMISSION_INTERVAL_SECONDS and
set/compare _last_emission_time using monotonic values; ensure
_emit_state_message continues to call _throttle_state_message and assign
_last_emission_time the returned monotonic timestamp so emissions are robust to
system clock changes and consistent with the pattern used in
ConcurrentPerPartitionCursor.

In
`@unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py`:
- Around line 592-626: The test_state_throttling currently patches the global
time.time which can break silently if emit_state_message or its module changes
to use "from time import time"; update the mock to patch the symbol where it's
used instead (patch
"airbyte_cdk.sources.file_based.stream.concurrent.cursor.file_based_concurrent_cursor.time.time")
so emit_state_message continues to be intercepted; keep assertions around
cursor._last_emission_time, cursor.emit_state_message,
mock_connector_manager.update_state_for_stream and mock_repo.emit_message
unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: e494079b-dd97-4462-b4a3-bca1eb0531e1

📥 Commits

Reviewing files that changed from the base of the PR and between f67a9d9 and 0170fdd.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py
  • unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR mitigates orchestrator OOMs on large file-based concurrent streams by reducing the volume of emitted state messages while still guaranteeing a terminal/final state is always sent at stream completion.

Changes:

  • Added time-based throttling to FileBasedConcurrentCursor.emit_state_message() (max 1 emission per 600 seconds).
  • Ensured ensure_at_least_one_state_emitted() force-emits a final state by bypassing the throttle.
  • Added unit tests covering throttling behavior and the final-state bypass.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py Introduces throttled state emission and a throttle bypass for the final state message.
unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py Adds tests validating the throttle window behavior and that end-of-stream state emission bypasses throttling.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented May 21, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py (2)

184-188: ⚡ Quick win

Use time.monotonic() for elapsed-time throttling?

time.time() reflects wall-clock time, which can jump (NTP corrections, manual clock changes, DST on some platforms) and isn't guaranteed to be non-decreasing. For measuring an elapsed interval like this throttle, time.monotonic() is the idiomatic choice and immune to system clock adjustments — a backward jump here could suppress state emissions for much longer than 600s, while a forward jump could defeat the throttle entirely. wdyt about switching to monotonic for both the read and the stored timestamp?

♻️ Proposed change
     def _throttle_state_message(self) -> Optional[float]:
-        current_time = time.time()
+        current_time = time.monotonic()
         if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL_SECONDS:
             return None
         return current_time

(Initial value 0.0 at construction still works since monotonic() returns a positive float that is always far greater than 0 in practice — first emission will pass the throttle as intended.)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py`
around lines 184 - 188, Replace wall-clock timing with a monotonic clock in the
throttling logic: in _throttle_state_message use time.monotonic() instead of
time.time(), and ensure any code that initializes or updates the stored
timestamp self._last_emission_time (e.g., constructor or wherever you set it
after emitting state) also uses time.monotonic() so reads and writes are
consistent with DEFAULT_STATE_EMISSION_INTERVAL_SECONDS; initial 0.0 can remain
unchanged since monotonic() values will be larger on first check.

337-340: 💤 Low value

Should the final force-emit also update _last_emission_time?

ensure_at_least_one_state_emitted bypasses the throttle but doesn't refresh _last_emission_time. In practice this is the terminal call so it likely doesn't matter, but if the cursor instance is ever reused or if add_file happens to fire right after (e.g. in tests or future refactors), the next emit_state_message could immediately pass the throttle since the tracker wasn't bumped. Want to update the timestamp here too for symmetry, or is keeping it purely "final-emission-only" semantics intentional? wdyt?

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py`
around lines 337 - 340, The final-force emit in
ensure_at_least_one_state_emitted currently calls
_emit_state_message(throttle=False) but does not update the emission tracker
_last_emission_time, which can allow a subsequent emit_state_message or add_file
call to bypass throttling unexpectedly; modify ensure_at_least_one_state_emitted
to also refresh _last_emission_time (e.g., set self._last_emission_time =
current timestamp using the same time source used elsewhere in the class)
immediately after or before calling _emit_state_message so the final emission is
reflected in the throttle tracker and keeps behavior consistent with normal
emits.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In
`@airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py`:
- Around line 184-188: Replace wall-clock timing with a monotonic clock in the
throttling logic: in _throttle_state_message use time.monotonic() instead of
time.time(), and ensure any code that initializes or updates the stored
timestamp self._last_emission_time (e.g., constructor or wherever you set it
after emitting state) also uses time.monotonic() so reads and writes are
consistent with DEFAULT_STATE_EMISSION_INTERVAL_SECONDS; initial 0.0 can remain
unchanged since monotonic() values will be larger on first check.
- Around line 337-340: The final-force emit in ensure_at_least_one_state_emitted
currently calls _emit_state_message(throttle=False) but does not update the
emission tracker _last_emission_time, which can allow a subsequent
emit_state_message or add_file call to bypass throttling unexpectedly; modify
ensure_at_least_one_state_emitted to also refresh _last_emission_time (e.g., set
self._last_emission_time = current timestamp using the same time source used
elsewhere in the class) immediately after or before calling _emit_state_message
so the final emission is reflected in the throttle tracker and keeps behavior
consistent with normal emits.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 43f3a308-8b11-4c0e-91b2-a44488e62e0d

📥 Commits

Reviewing files that changed from the base of the PR and between 0170fdd and 1d3d679.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 21, 2026

PyTest Results (Fast)

4 071 tests  +2   4 060 ✅ +2   7m 57s ⏱️ +13s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 0fa23c9. ± Comparison against base commit f67a9d9.

♻️ This comment has been updated with latest results.

@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented May 21, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/26237349277

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 21, 2026

PyTest Results (Full)

4 074 tests  +2   4 062 ✅ +2   11m 38s ⏱️ +41s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 0fa23c9. ± Comparison against base commit f67a9d9.

♻️ This comment has been updated with latest results.


def _throttle_state_message(self) -> Optional[float]:
current_time = time.time()
if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL_SECONDS:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use time.monotonic() instead of time.time() for elapsed-time throttling. time.time() can jump forward or backward due to wall-clock adjustments, which can either bypass the throttle early or suppress state emission far longer than 600 seconds. Since _last_emission_time only measures elapsed duration within this process, a monotonic clock is the safer fit.


Devin session

…ck branch

FileBasedSource.streams() iterates every stream declared in the connector
config (not just the catalog). For streams without a catalog entry
sync_mode is None and the fallback branch calls
`self.cursor_cls(stream_config)`. That single-arg form works for
DefaultFileBasedCursor, but FileBasedConcurrentCursor requires 7 args, so
the moment a connector switches to cursor_cls=FileBasedConcurrentCursor
it crashes during read on any partial-catalog selection, plus on every
check/discover (no catalog at all).

Detect the cursor flavor and pass the full constructor args when it's a
concurrent cursor. Unblocks file-based connectors (e.g. source-s3) from
migrating onto the concurrent cursor path and picking up the state
emission throttle added earlier in this PR.
@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented May 21, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/26244645194

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
airbyte_cdk/sources/file_based/file_based_source.py (1)

315-333: ⚡ Quick win

Make state_manager=None handling for FileBasedConcurrentCursor more explicit (check/discover)

FileBasedConcurrentCursor.__init__ just stores connector_state_manager; it’s only dereferenced when emitting state in _emit_state_message() (called from add_file()). The check_connection path parses files directly via the availability strategy (no cursor.add_file()), and discover only calls Stream.get_json_schema() / schema inference (no cursor usage). So this likely won’t crash in check/discover.

Would you still prefer a defensive route in this state_manager=None branch (e.g., fall back to FileBasedFinalStateCursor or use a no-op/stub ConnectorStateManager), or at least add a clearer comment that cursor methods like add_file() won’t run during check/discover?

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/file_based/file_based_source.py` around lines 315 - 333,
When creating a concurrent cursor in the branch where state_manager may be None,
make the handling explicit: if issubclass(self.cursor_cls,
AbstractConcurrentFileBasedCursor) and state_manager is None, instantiate a safe
fallback cursor (e.g., FileBasedFinalStateCursor) instead of calling
self.cursor_cls with a None connector_state_manager, or alternatively supply a
no-op stub ConnectorStateManager; update the comment near this branch to explain
that check/discover paths don't call cursor.add_file()/_emit_state_message() so
a None state_manager won't be used, and reference cursor_cls,
AbstractConcurrentFileBasedCursor, FileBasedFinalStateCursor, state_manager,
add_file, and _emit_state_message in the comment to make the decision clear.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@airbyte_cdk/sources/file_based/file_based_source.py`:
- Around line 315-333: When creating a concurrent cursor in the branch where
state_manager may be None, make the handling explicit: if
issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor) and state_manager
is None, instantiate a safe fallback cursor (e.g., FileBasedFinalStateCursor)
instead of calling self.cursor_cls with a None connector_state_manager, or
alternatively supply a no-op stub ConnectorStateManager; update the comment near
this branch to explain that check/discover paths don't call
cursor.add_file()/_emit_state_message() so a None state_manager won't be used,
and reference cursor_cls, AbstractConcurrentFileBasedCursor,
FileBasedFinalStateCursor, state_manager, add_file, and _emit_state_message in
the comment to make the decision clear.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: b6d72774-5d8a-4f21-81b5-19dc85623672

📥 Commits

Reviewing files that changed from the base of the PR and between 1d3d679 and 0fa23c9.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/file_based/file_based_source.py

Anatolii Yatsuk (tolik0) added a commit to airbytehq/airbyte that referenced this pull request May 21, 2026
source-s3 previously used the legacy DefaultFileBasedCursor via the
source-s3-specific Cursor subclass. That cursor goes through the legacy
non-concurrent file-based read path, which emits a state message per
processed file. With ~10 k files in a stream, the platform/orchestrator
buffers ~10 k state messages (each carrying the growing history dict)
waiting for the destination to ACK them; on slow destinations the
orchestrator pod OOMs, the source pod is torn down before emitting
terminal stream status, and the destination fails with
TransientErrorException("Input was fully read, but some streams did not
receive a terminal stream status message").

Move source-s3 onto FileBasedConcurrentCursor (CDK) which:
  - emits state at most once per 600s during a sync (state throttle
    added in airbytehq/airbyte-python-cdk#1032);
  - always force-emits a final state via ensure_at_least_one_state_emitted;
  - runs the read on the concurrent file-based path with up to
    DEFAULT_CONCURRENCY (100) workers.

Verified locally against a customer connection with ~9 k files: state
messages dropped from 9 200 to 2, runtime from 52:46 to 4:24, stdout
volume from 4.1 GB to 922 MB, sync completes successfully with a
STREAM_STATUS COMPLETE.

The legacy source-s3 Cursor class only existed to host the v3-to-v4
state migration shipped in 2023 (#29028). 2.5 years on, all active
customers have completed the migration on their first v4 sync, so the
subclass and its tests are removed. A hypothetical customer with stale
v3 state would trigger one full re-sync, after which they'd be on v4.

Pinning to airbyte-cdk 7.19.2.post3.dev26244645194, a prerelease that
includes both the state throttle and a fallback-branch dispatch fix
for the file-based source so connectors using FileBasedConcurrentCursor
do not crash on partial-catalog reads / check / discover.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants