Skip to content

fix(file-based): respect _concurrency_level for thread pool sizing#1035

Draft
devin-ai-integration[bot] wants to merge 2 commits into
mainfrom
devin/1779723955-fix-file-based-concurrency-level
Draft

fix(file-based): respect _concurrency_level for thread pool sizing#1035
devin-ai-integration[bot] wants to merge 2 commits into
mainfrom
devin/1779723955-fix-file-based-concurrency-level

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

Summary

FileBasedSource.__init__ always created the ConcurrentSource with MAX_CONCURRENCY (100) workers regardless of the subclass's _concurrency_level setting. This meant connectors that set _concurrency_level to a lower value (e.g. 20) still got 100 concurrent file readers, causing OOM on large S3 streams within 2 Gi container limits.

This fix uses _concurrency_level (capped at MAX_CONCURRENCY) to size the thread pool and initial partition count. When _concurrency_level is None (default), the existing MAX_CONCURRENCY behavior is preserved.

Root cause: The concurrent file-based cursor migration (airbytehq/airbyte#78325) for source-s3 triggered OOM failures on large streams because the thread pool always ran 100 concurrent file readers regardless of the connector's declared _concurrency_level. See airbytehq/oncall#12714 for the incident.

Related to https://github.com/airbytehq/oncall/issues/12714:

Review & Testing Checklist for Human

  • Verify the _concurrency_level is correctly used to size the thread pool (check ConcurrentSource.create receives the expected num_workers and initial_number_of_partitions_to_generate)
  • Verify that connectors with _concurrency_level = None (the default) still get MAX_CONCURRENCY workers — no regression for existing connectors
  • Run source-s3 with _concurrency_level = 20 against a large S3 stream to verify memory stays under 2 Gi

Notes

The companion change in source-s3 (setting _concurrency_level = 20) will be submitted as a separate PR in the monorepo once this CDK change is available.

Link to Devin session: https://app.devin.ai/sessions/ff881275041b469f9a6aed60a6af0fe2

FileBasedSource.__init__ always created the ConcurrentSource with
MAX_CONCURRENCY (100) workers regardless of the subclass's
_concurrency_level setting. This meant connectors that set
_concurrency_level to a lower value (e.g. 20) still got 100
concurrent file readers, causing OOM on large S3 streams within
2 Gi container limits.

Use _concurrency_level (capped at MAX_CONCURRENCY) to size the
thread pool and initial partition count. When _concurrency_level
is None (default), the existing MAX_CONCURRENCY is used.

Co-Authored-By: bot_apk <[email protected]>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1779723955-fix-file-based-concurrency-level#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1779723955-fix-file-based-concurrency-level

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

Comment thread unit_tests/sources/file_based/test_file_based_source_concurrency.py Fixed
@github-actions
Copy link
Copy Markdown

PyTest Results (Fast)

4 074 tests  +5   4 063 ✅ +5   7m 17s ⏱️ -27s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit eb8f910. ± Comparison against base commit f67a9d9.

@github-actions
Copy link
Copy Markdown

PyTest Results (Full)

4 077 tests  +5   4 065 ✅ +5   11m 12s ⏱️ +15s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit eb8f910. ± Comparison against base commit f67a9d9.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

0 participants