[FIX] Executor Queue for Agentic extraction#1893
[FIX] Executor Queue for Agentic extraction#1893chandrasekharan-zipstack merged 127 commits intomainfrom
Conversation
Conflicts resolved: - docker-compose.yaml: Use main's dedicated dashboard_metric_events queue for worker-metrics - PromptCard.jsx: Keep tool_id matching condition from our async socket feature - PromptRun.jsx: Merge useEffect import from main with our branch - ToolIde.jsx: Keep fire-and-forget socket approach (spinner waits for socket event) - SocketMessages.js: Keep both session-store and socket-custom-tool imports + updateCusToolMessages dep - SocketContext.js: Keep simpler path-based socket connection approach - usePromptRun.js: Keep Celery fire-and-forget with socket delivery over polling - setupProxy.js: Accept main's deletion (migrated to Vite)
for more information, see https://pre-commit.ci
… into feat/execution-backend
for more information, see https://pre-commit.ci
… into feat/execution-backend
Co-Authored-By: Claude Opus 4.6 <[email protected]>
for more information, see https://pre-commit.ci
- Extract _parse_json_body() helper and _ERR_INVALID_JSON constant to deduplicate 5 identical JSON parsing blocks in internal_views.py - Rename `User` local variable to `user_model` (naming convention) - Merge _emit_result/_emit_error into unified _emit_event() in ide_callback tasks to reduce code duplication - Extract _get_task_error() helper to deduplicate AsyncResult error retrieval in ide_index_error and ide_prompt_error - Remove unused `mock_ar_cls` variable in test_ide_callback.py - Add security note documenting why @csrf_exempt is safe on internal endpoints Co-Authored-By: Claude Opus 4.6 <[email protected]>
The IDE callback worker should use the unified worker image (worker-unified.Dockerfile) consistent with all other v2 workers. Co-Authored-By: Claude Opus 4.6 <[email protected]>
The executor worker only consumed from celery_executor_legacy, but the agentic prompt studio dispatches tasks to celery_executor_agentic. This caused agentic operations to sit in RabbitMQ with no consumer, resulting in timeouts and stuck-in-progress states. Co-Authored-By: Claude Opus 4.6 <[email protected]>
WalkthroughRemoved a prompt output option, adjusted Celery retry settings, added an agentic executor queue and surfaced it in configs/health checks, refined NA handling and email extraction in the legacy executor, tightened error handling/logging in destination and file-structure tasks, and updated tests to match NA behavior. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
| Filename | Overview |
|---|---|
| workers/executor/executors/legacy_executor.py | Two targeted fixes: (1) _sanitize_null_values no longer converts top-level "NA" strings to None; (2) email type conversion short-circuits on "NA" instead of making an LLM call. Minor inconsistency: DATE type still converts "NA" → None via _convert_scalar_answer while EMAIL now preserves it. |
| workers/executor/worker.py | Health check now reads queues dynamically from CELERY_QUEUES_EXECUTOR env var via split(","); minor risk of whitespace in token names if env var has spaces around commas. |
| workers/shared/enums/worker_enums_base.py | Adds EXECUTOR_AGENTIC = "celery_executor_agentic" to QueueName enum cleanly — no duplicate names, no import issues. |
| workers/shared/infrastructure/config/registry.py | Adds EXECUTOR_AGENTIC as an additional queue for the executor worker type; task routing is unchanged (agentic tasks are expected to be dispatched with an explicit queue, not via route config). |
| docker/docker-compose.yaml | Default CELERY_QUEUES_EXECUTOR now includes celery_executor_agentic alongside celery_executor_legacy. |
| workers/tests/test_answer_prompt.py | Tests updated to assert "NA" strings are preserved at the top level instead of being converted to None; accurately reflects the new _sanitize_null_values behavior. |
| backend/prompt_studio/prompt_studio_core_v2/static/select_choices.json | Removes deprecated line_item output type from select choices; straightforward cleanup. |
| backend/workflow_manager/workflow_v2/workflow_helper.py | Removes autoretry_for and related retry options from the execute_bin Celery task decorator, consistent with max_retries=0 intent. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[Agentic task dispatched\nqueue: celery_executor_agentic] --> B{Executor Worker\nlistens on both queues}
B --> C[celery_executor_legacy\nprimary queue]
B --> D[celery_executor_agentic\nnew additional queue]
C --> E[LegacyExecutor._handle_answer_prompt]
D --> E
E --> F{output_type?}
F -->|EMAIL| G{answer == 'NA'?}
G -->|yes| H[preserve answer as 'NA' string]
G -->|no| I[run_completion LLM call]
F -->|DATE| J[_convert_scalar_answer\n'NA' → None unchanged]
F -->|TEXT / other| K[store raw answer]
K --> L[_sanitize_null_values]
H --> L
I --> L
J --> L
L --> M{value is list?}
M -->|yes| N[convert 'NA' items → None]
M -->|no - dict| O[_sanitize_dict_values]
M -->|no - string top-level| P[preserve as-is NEW BEHAVIOR]
Prompt To Fix All With AI
This is a comment left during a code review.
Path: workers/executor/worker.py
Line: 46-48
Comment:
**Potential whitespace in queue names from env var**
`split(",")` on a value like `"celery_executor_legacy, celery_executor_agentic"` (with a space after the comma) will produce `[" celery_executor_agentic"]` with a leading space. While this only affects the health check response here, it's good practice to strip individual tokens so the output is reliably clean:
```suggestion
"queues": [
q.strip()
for q in os.environ.get(
"CELERY_QUEUES_EXECUTOR", "celery_executor_legacy"
).split(",")
if q.strip()
],
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: workers/executor/executors/legacy_executor.py
Line: 1699-1711
Comment:
**Inconsistent NA handling between EMAIL and DATE types**
After this change, the two types diverge in how they handle an `"NA"` answer:
- **EMAIL** (updated here): `"NA"` → `"NA"` string preserved
- **DATE** (unchanged, line 1723): still calls `_convert_scalar_answer`, which returns `None` for `"NA"` answers
`_convert_scalar_answer` (lines 1268–1269):
```python
if answer.lower() == "na":
return None
```
If the intent behind the `_sanitize_null_values` fix is that `"NA"` values in structured output should be preserved as strings (the PR description says "Top-level 'NA' values were being silently converted to None"), then DATE has the same problem through a different code path. Was the inconsistency intentional (i.e., `None` is semantically correct for missing dates but not for missing emails)?
How can I resolve this? If you propose a fix, please make it concise.Reviews (6): Last reviewed commit: "Revert ETL destination pipeline changes ..." | Re-trigger Greptile
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Signed-off-by: harini-venkataraman <[email protected]>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Signed-off-by: harini-venkataraman <[email protected]>
for more information, see https://pre-commit.ci
- Widen except clause in structure_tool_task to catch FileOperationError and log write paths for diagnostics - Add diagnostic logging at all silent return-None points in destination connector so missing INFILE/METADATA paths are visible in logs - Raise RuntimeError instead of silently skipping when no tool execution result is available for DB/FS destinations, preventing false success - Remove dead retry config from execute_bin task (max_retries=0) - Fix duplicate EXECUTOR/IDE_CALLBACK enum entries in WorkerType Co-Authored-By: Claude Opus 4.6 <[email protected]>
for more information, see https://pre-commit.ci
chandrasekharan-zipstack
left a comment
There was a problem hiding this comment.
Change the PR title to follow the format
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
workers/executor/executors/legacy_executor.py (1)
1056-1070: Docstring is now inconsistent with actual behavior.The docstring on line 1060 states "Replace 'NA' strings with None in structured output" but the implementation now only sanitizes "NA" within nested lists and dicts, preserving top-level "NA" strings. This could mislead future maintainers.
📝 Proposed docstring update
`@staticmethod` def _sanitize_null_values( structured_output: dict[str, Any], ) -> dict[str, Any]: - """Replace 'NA' strings with None in structured output.""" + """Replace 'NA' strings with None in nested structures only. + + Top-level 'NA' values are preserved. Only 'NA' strings inside + lists or nested dicts are converted to None. + """ for k, v in structured_output.items():🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/executor/executors/legacy_executor.py` around lines 1056 - 1070, Docstring for LegacyExecutor._sanitize_null_values is inaccurate: it claims to "Replace 'NA' strings with None in structured output" but the method only replaces "NA" within nested lists/dicts (using LegacyExecutor._sanitize_dict_values) and leaves top-level "NA" strings untouched; update the docstring to accurately describe the behavior (e.g., "Sanitize nested list/dict values by replacing 'NA' strings with None; top-level string values are not modified") and reference the helper LegacyExecutor._sanitize_dict_values in the docstring so maintainers know nested sanitization is delegated.workers/executor/worker.py (1)
47-49: Prefer the resolved worker config as the health-check source of truth.If this is meant to report the queues this process actually started with,
config.queue_config.to_queue_list()is safer than re-splittingCELERY_QUEUES_EXECUTORand carrying a separate legacy-only fallback.♻️ Proposed refactor
- "queues": os.environ.get( - "CELERY_QUEUES_EXECUTOR", "celery_executor_legacy" - ).split(","), + "queues": ( + config.queue_config.to_queue_list() + if config + else WorkerRegistry.get_queue_config( + WorkerType.EXECUTOR + ).to_queue_list() + ),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/executor/worker.py` around lines 47 - 49, The health-check should report the actual queues the process started with rather than re-splitting the CELERY_QUEUES_EXECUTOR env var: replace the current use of os.environ.get(...).split(",") in the "queues" field with the resolved worker config call config.queue_config.to_queue_list() (or equivalent accessor on your QueueConfig) so the health-check uses the authoritative, possibly-processed queue list the worker actually started with; keep a minimal fallback only if config.queue_config may be unset (use an empty list or existing legacy default).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/workflow_manager/workflow_v2/workflow_helper.py`:
- Around line 598-603: The `@celery_app.task` decorator call for async_execute_bin
is broken because the closing parenthesis comes before retry_jitter=True; fix it
by moving retry_jitter=True inside the decorator argument list and removing the
stray closing paren so the decorator reads something like
`@celery_app.task`(name="async_execute_bin", max_retries=0, retry_jitter=True)
directly above the async_execute_bin function; ensure indentation is corrected
so the decorator encloses all arguments and the function registers properly.
In `@workers/executor/executors/legacy_executor.py`:
- Around line 1698-1711: Add a clarifying inline comment near the EMAIL branch
in legacy_executor.py explaining that for PSKeys.EMAIL the literal string "NA"
is intentionally preserved (checked via answer.lower() == "na" and assigned
directly to structured_output[prompt_name]) whereas other types (NUMBER,
BOOLEAN, DATE) map "NA" to None; reference the output_type == PSKeys.EMAIL
branch, the answer.lower() == "na" check, and the use of
answer_prompt_svc.run_completion so future maintainers understand this
deliberate discrepancy and its impact on downstream consumers.
In `@workers/shared/enums/worker_enums_base.py`:
- Around line 27-31: Remove the duplicate IDE_CALLBACK member from the
WorkerType enum so it's defined only once, remove the extra repeated
`@classmethod` decorator on the from_directory_name method so it has a single
`@classmethod`, and deduplicate entries in the port_mapping dict so each
WorkerType (notably WorkerType.EXECUTOR and WorkerType.IDE_CALLBACK) appears
only once with the correct port value; update port_mapping to keep the intended
port for each unique key and remove the redundant mapping(s).
---
Nitpick comments:
In `@workers/executor/executors/legacy_executor.py`:
- Around line 1056-1070: Docstring for LegacyExecutor._sanitize_null_values is
inaccurate: it claims to "Replace 'NA' strings with None in structured output"
but the method only replaces "NA" within nested lists/dicts (using
LegacyExecutor._sanitize_dict_values) and leaves top-level "NA" strings
untouched; update the docstring to accurately describe the behavior (e.g.,
"Sanitize nested list/dict values by replacing 'NA' strings with None; top-level
string values are not modified") and reference the helper
LegacyExecutor._sanitize_dict_values in the docstring so maintainers know nested
sanitization is delegated.
In `@workers/executor/worker.py`:
- Around line 47-49: The health-check should report the actual queues the
process started with rather than re-splitting the CELERY_QUEUES_EXECUTOR env
var: replace the current use of os.environ.get(...).split(",") in the "queues"
field with the resolved worker config call config.queue_config.to_queue_list()
(or equivalent accessor on your QueueConfig) so the health-check uses the
authoritative, possibly-processed queue list the worker actually started with;
keep a minimal fallback only if config.queue_config may be unset (use an empty
list or existing legacy default).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 335d2886-4446-4367-9349-e179ae7cb57e
📒 Files selected for processing (8)
backend/prompt_studio/prompt_studio_core_v2/static/select_choices.jsonbackend/workflow_manager/workflow_v2/workflow_helper.pydocker/docker-compose.yamlworkers/executor/executors/legacy_executor.pyworkers/executor/worker.pyworkers/shared/enums/worker_enums_base.pyworkers/shared/infrastructure/config/registry.pyworkers/tests/test_answer_prompt.py
💤 Files with no reviewable changes (1)
- backend/prompt_studio/prompt_studio_core_v2/static/select_choices.json
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@workers/shared/workflow/destination_connector.py`:
- Around line 661-672: The error message for the missing tool execution result
is misleading because the code raises a RuntimeError (failing the operation)
rather than merely "skipping" the filesystem copy; update the message used in
logger.error, log_file_error (with exec_ctx.workflow_log and
exec_ctx.file_execution_id), and the RuntimeError to reflect failure (e.g.,
"filesystem copy failed" or "cannot perform filesystem copy") and keep the
existing context from result.tool_execution_result and file_ctx.execution_error
when constructing the message.
- Around line 711-721: The error message says "database insertion skipped" but
the code still raises RuntimeError; update the behavior in the block that builds
error_msg (references: file_ctx.file_name, logger, log_file_error,
exec_ctx.file_execution_id) so it matches the message: do not raise RuntimeError
— instead log the error and return/exit the enclosing function early (or
explicitly return None) to skip database insertion, or if an exception is
desired change the message to reflect a raised error; prefer removing the raise
RuntimeError(error_msg) and returning to mirror the filesystem handler's
"skipped" behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 49bfbde0-46e7-44f6-be53-72fb0dab43bb
📒 Files selected for processing (5)
backend/workflow_manager/workflow_v2/workflow_helper.pydocker/docker-compose.yamlworkers/file_processing/structure_tool_task.pyworkers/shared/enums/worker_enums_base.pyworkers/shared/workflow/destination_connector.py
💤 Files with no reviewable changes (1)
- backend/workflow_manager/workflow_v2/workflow_helper.py
Reverts diagnostic logging and error-raising changes in structure_tool_task.py and destination_connector.py. Co-Authored-By: Claude Opus 4.6 <[email protected]>
|
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|



What
celery_executor_agenticqueue to the executor worker so it can process agentic prompt studio execution tasks_sanitize_null_valuesto preserve top-level "NA" strings instead of converting them toNoneline_itemoutput type from prompt studio select choicesCELERY_QUEUES_EXECUTORenv var instead of hardcodingWhy
celery_executor_agenticqueue, but no worker was consuming from it, causing agentic prompt studio executions to hang indefinitelyNone, which changed the semantics of the extraction output and caused incorrect downstream behaviorline_itemoutput type was removed in favor oftable, and the stale entry needed cleanupHow
EXECUTOR_AGENTICtoQueueNameenum, registered it as an additional queue inWorkerRegistryfor the executor worker type, and updateddocker-compose.yamlto include bothcelery_executor_legacyandcelery_executor_agenticin the defaultCELERY_QUEUES_EXECUTORenv varworker.pyto read queues dynamically from theCELERY_QUEUES_EXECUTORenvironment variable instead of returning a hardcoded list_sanitize_null_valuesto only sanitize "NA" values inside nested lists, preserving top-level "NA" strings as-isanswer_prompt_svc.run_completiondirectly instead of the_convert_scalar_answerhelpertest_answer_prompt.pyto reflect the new NA preservation behaviorCan this PR break any existing features? If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
celery_executor_agentic). Existing legacy queue processing is unaffected since the primary queue remainscelery_executor_legacy.None. This is an intentional fix — downstream consumers that relied onNonefor "NA" answers may need to handle the string "NA" instead. Nested list "NA" values are still sanitized toNone.line_itemoutput type: Any existing prompts configured withline_itemoutput type would no longer have that option. This should be safe asline_itemhas been superseded bytable.Database Migrations
Env Config
NA
Relevant Docs
Related Issues or PRs
Dependencies Versions
Notes on Testing
workers/tests/test_answer_prompt.pyto validate the new NA preservation behaviorScreenshots
N/A
Checklist
I have read and understood the Contribution Guidelines.