fix(fetcher): release fetched payload after each worker iteration#916
Open
Kyzgor wants to merge 1 commit into
Open
fix(fetcher): release fetched payload after each worker iteration#916Kyzgor wants to merge 1 commit into
Kyzgor wants to merge 1 commit into
Conversation
A fetch worker runs `while True: event, callback = await queue.get(); ...`. Python keeps a frame's locals bound until they are reassigned or the frame exits, so a worker blocked on the next `queue.get()` kept the previous iteration's `event`/`fetcher`/`res`/`data` alive — pinning the last fetched dataset and its HTTP response in memory. With N fetching workers this retained up to N full datasets indefinitely: a per-update memory high-water mark, proportional to payload size, that only reset on process restart. Null these locals in the worker's `finally` block so an idle worker holds no payload. Adds a regression test asserting an idle worker does not retain its last fetched payload. Closes permitio#844 Closes permitio#770
✅ Deploy Preview for opal-docs canceled.
|
This was referenced Jun 8, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes Issue
Closes #844
Closes #770
Changes proposed
Fixes a memory high-water mark in the data fetching engine: a fetch worker held its last
fetched dataset (and HTTP response) in memory while it was idle.
opal_common/fetcher/engine/fetch_worker.py: null all five per-iteration locals(
event,callback,fetcher,res,data) in the worker'sfinallyblock, which runsafter the exception handlers so they still see
event, and pre-initializefetcher = res = data = Nonebefore thetry(those three may be unassigned if an earlyexception occurs). An idle worker now holds no payload between queue items.
opal_common/fetcher/tests/fetch_worker_retention_test.py(new): a regression test that anidle worker does not retain its last fetched payload. It uses a weakref, so it fails if the
cleanup is removed.
Root cause
fetch_workerrunswhile True: event, callback = await queue.get(); .... In CPython a framekeeps its locals bound until they are reassigned or the frame exits, so a worker blocked on the
next
queue.get()still referenced the previous iteration'sevent/fetcher/res/data,which pins the last fetched dataset and its
aiohttpresponse. WithFETCHING_WORKER_COUNTworkers (default 6) this retained up to N full datasets, proportional to payload size, released
only on process restart. OPA's own memory was unaffected (the data is correctly stored there);
the retention was entirely client-side in the worker frames. That is also why it looked like a
leak yet heap inspection after things settle found nothing: the objects free as soon as a worker
handles a new item, but idle workers keep the last one.
This is the same mechanism behind both reports. #844 triggers a fetch on every reconnect
("Initial load"), and #770 triggers one on every periodic datasource refresh.
Check List (Check all the applicable boxes)
Screenshots
N/A. The behaviour is memory usage, quantified below.
Note to reviewers
Before/after, from a local harness driving the real
DataUpdater-> fetch -> store path (fullreproduction steps available on request):
The defining signature of the bug was that the retained-dataset count equalled the worker count.
After the fix the retained count is 1 (measured with the default
FETCHING_WORKER_COUNTof 6),and since the retention was structurally one dataset per worker it no longer scales with the
pool size.
malloc_trimdoes not reclaim the pre-fix growth, which confirms genuine retentionrather than allocator fragmentation.
Correctness: the null-out is in
finally, which runs after theexceptblocks that callengine._on_failure(err, event), so the failure handler still receives the realevent. Theexisting fetcher and
data_updatertests pass (10), and with the new regression test all 11 pass.Scope: this PR only changes the worker frame-retention behaviour. During the investigation I also
found a small, separate unbounded object accumulation in the pub/sub reconnect path
(
asyncio.as_completedinfastapi-websocket-pubsub). It grows one task and one queue perreconnect but is only KB-scale and lives in a different repository, so I've raised it separately
and it is not part of this change. The growth fixed here is a bounded high-water mark under
normal (gc-enabled) runtime.
CI note: the
security/snykcheck tends to error on fork PRs (org token/quota) rather thanindicating a code problem. Happy to have a maintainer re-run it.