Replies: 3 comments 2 replies
-
|
After much discussion, we are converging on the following approach:
|
Beta Was this translation helpful? Give feedback.
-
|
The cascade delete is nice, this design will eliminate the possibility of "orphaned" jobs, removing the need for the This means users with delete/drop privilege on the Autopopulate tables must also have delete/drop privilege on the corresponding |
Beta Was this translation helpful? Give feedback.
-
|
Updated specification: Autopopulate 2.0 SpecificationOverviewThis specification redesigns the DataJoint job handling system to provide better visibility, control, and scalability for distributed computing workflows. The new system replaces the schema-level Problem StatementCurrent Jobs Table LimitationsThe existing
Key Source Limitations
Proposed SolutionTerminology
Core Design Principles
ArchitectureJobs Table StructureEach Important: The jobs table primary key includes only those attributes that come through foreign keys in the target table's primary key. Additional primary key attributes (if any) are excluded. This means:
Access PatternJobs are accessed as a property of the computed table: # Current pattern (schema-level)
schema.jobs
# New pattern (per-table)
MyTable.jobs
# Examples
FilteredImage.jobs # Access jobs table
FilteredImage.jobs & 'status="error"' # Query errors
FilteredImage.jobs.refresh() # Refresh job queueStatus Values
Status TransitionsstateDiagram-v2
state "(none)" as none1
state "(none)" as none2
none1 --> pending : refresh()
none1 --> ignore : ignore()
pending --> reserved : reserve()
reserved --> none2 : complete()
reserved --> success : complete()*
reserved --> error : error()
success --> pending : refresh()*
error --> none2 : delete()
success --> none2 : delete()
ignore --> none2 : delete()
Transition methods:
Manual status control:
API DesignJobsTable Classclass JobsTable(Table):
"""Hidden table managing job queue for a computed table."""
@property
def definition(self) -> str:
"""Dynamically generated based on parent table's primary key."""
...
def refresh(
self,
*restrictions,
delay: float = 0,
priority: int = 5,
stale_timeout: float = None
) -> dict:
"""
Refresh the jobs queue: add new jobs and remove stale ones.
Operations performed:
1. Add new jobs: (key_source & restrictions) - target - jobs → insert as 'pending'
2. Remove stale jobs: pending jobs older than stale_timeout whose keys
are no longer in key_source (upstream records were deleted)
Args:
restrictions: Conditions to filter key_source
delay: Seconds from now until jobs become available for processing.
Default: 0 (jobs are immediately available).
Uses database server time to avoid client clock synchronization issues.
priority: Priority for new jobs (lower = more urgent). Default: 5
stale_timeout: Seconds after which pending jobs are checked for staleness.
Jobs older than this are removed if their key is no longer
in key_source. Default from config: jobs.stale_timeout (3600s)
Returns:
{'added': int, 'removed': int} - counts of jobs added and stale jobs removed
"""
...
def reserve(self, key: dict) -> bool:
"""
Attempt to reserve a job for processing.
Updates status to 'reserved' if currently 'pending' and scheduled_time <= now.
No locking is used; rare conflicts are resolved by the make() transaction.
Returns:
True if reservation successful, False if job not found or not pending.
"""
...
def complete(self, key: dict, duration: float = None) -> None:
"""
Mark a job as successfully completed.
Updates status to 'success', records duration and completion time.
"""
...
def error(self, key: dict, error_message: str, error_stack: str = None) -> None:
"""
Mark a job as failed with error details.
Updates status to 'error', records error message and stack trace.
"""
...
def ignore(self, key: dict) -> None:
"""
Mark a job to be ignored (skipped during populate).
To reset an ignored job, delete it and call refresh().
"""
...
# delete() is inherited from delete_quick() - no confirmation required
# Usage: (jobs & condition).delete() or jobs.errors.delete()
@property
def pending(self) -> QueryExpression:
"""Return query for pending jobs."""
return self & 'status="pending"'
@property
def reserved(self) -> QueryExpression:
"""Return query for reserved jobs."""
return self & 'status="reserved"'
@property
def errors(self) -> QueryExpression:
"""Return query for error jobs."""
return self & 'status="error"'
@property
def ignored(self) -> QueryExpression:
"""Return query for ignored jobs."""
return self & 'status="ignore"'
@property
def completed(self) -> QueryExpression:
"""Return query for completed jobs."""
return self & 'status="success"'AutoPopulate IntegrationThe def populate(
self,
*restrictions,
suppress_errors: bool = False,
return_exception_objects: bool = False,
reserve_jobs: bool = False,
order: str = "original",
limit: int = None,
max_calls: int = None,
display_progress: bool = False,
processes: int = 1,
make_kwargs: dict = None,
# New parameters
priority: int = None, # Only process jobs at this priority or more urgent (lower values)
refresh: bool = True, # Refresh jobs queue if no pending jobs available
) -> dict:
"""
Populate the table by calling make() for each missing entry.
New behavior with reserve_jobs=True:
1. Fetch all non-stale pending jobs (ordered by priority ASC, scheduled_time ASC)
2. For each pending job:
a. Mark job as 'reserved' (per-key, before make)
b. Call make(key)
c. On success: mark job as 'success' or delete (based on keep_completed)
d. On error: mark job as 'error' with message/stack
3. If refresh=True and no pending jobs were found, call self.jobs.refresh()
and repeat from step 1
4. Continue until no more pending jobs or max_calls reached
"""
...Progress and Monitoring# Current progress reporting
remaining, total = MyTable.progress()
# Enhanced progress with jobs table
MyTable.jobs.progress() # Returns detailed status breakdown
# Example output:
# {
# 'pending': 150,
# 'reserved': 3,
# 'success': 847,
# 'error': 12,
# 'ignore': 5,
# 'total': 1017
# }Priority and SchedulingPriority and scheduling are handled via # Add urgent jobs (priority=0 is most urgent)
MyTable.jobs.refresh(priority=0)
# Add normal jobs (default priority=5)
MyTable.jobs.refresh()
# Add low-priority background jobs
MyTable.jobs.refresh(priority=10)
# Schedule jobs for future processing (2 hours from now)
MyTable.jobs.refresh(delay=2*60*60) # 7200 seconds
# Schedule jobs for tomorrow (24 hours from now)
MyTable.jobs.refresh(delay=24*60*60)
# Combine: urgent jobs with 1-hour delay
MyTable.jobs.refresh(priority=0, delay=3600)
# Add urgent jobs for specific subjects
MyTable.jobs.refresh(Subject & 'priority="urgent"', priority=0)Implementation DetailsTable Naming ConventionJobs tables follow the existing hidden table naming pattern:
Primary Key DerivationThe jobs table primary key includes only those attributes derived from foreign keys in the target table's primary key: # Example 1: FK-only primary key (simple case)
@schema
class FilteredImage(dj.Computed):
definition = """
-> Image
---
filtered_image : <djblob>
"""
# Jobs table primary key: (image_id) — same as target
# Example 2: Target with additional PK attribute
@schema
class Analysis(dj.Computed):
definition = """
-> Recording
analysis_method : varchar(32) # Additional PK attribute
---
result : float
"""
# Jobs table primary key: (recording_id) — excludes 'analysis_method'
# One job entry covers all analysis_method values for a given recordingThe jobs table has no foreign key constraints for performance reasons. Stale Job HandlingStale jobs are pending jobs whose upstream records have been deleted. Since there are no FK constraints on jobs tables, these jobs remain until cleaned up by # refresh() handles stale jobs automatically
result = FilteredImage.jobs.refresh()
# Returns: {'added': 10, 'removed': 3} # 3 stale jobs cleaned up
# Stale detection logic:
# 1. Find pending jobs where created_time < (now - stale_timeout)
# 2. Check if their keys still exist in key_source
# 3. Remove pending jobs whose keys no longer existWhy not use foreign key cascading deletes?
Table Drop and Alter BehaviorWhen an auto-populated table is dropped, its associated jobs table is automatically dropped: # Dropping FilteredImage also drops ~filtered_image__jobs
FilteredImage.drop()When an auto-populated table is altered (e.g., primary key changes), the jobs table is dropped and can be recreated via # Alter that changes primary key structure
# Jobs table is dropped since its structure no longer matches
FilteredImage.alter()
# Recreate jobs table with new structure
FilteredImage.jobs.refresh()Lazy Table CreationJobs tables are created automatically on first use: # First call to populate with reserve_jobs=True creates the jobs table
FilteredImage.populate(reserve_jobs=True)
# Creates ~filtered_image__jobs if it doesn't exist, then populates
# Alternatively, explicitly create/refresh the jobs table
FilteredImage.jobs.refresh()The jobs table is created with a primary key derived from the target table's foreign key attributes. Conflict ResolutionConflict resolution relies on the transaction surrounding each
When two workers attempt to populate the same key:
Important: Only errors that occur inside Why this is acceptable:
Job Reservation vs Pre-PartitioningThe job reservation mechanism ( # Pre-partitioning example: orchestrator divides work explicitly
all_pending = FilteredImage.jobs.pending.fetch("KEY")
# Split jobs among workers (e.g., by worker index)
n_workers = 4
for worker_id in range(n_workers):
worker_jobs = all_pending[worker_id::n_workers] # Round-robin assignment
# Send worker_jobs to worker via orchestration system (Slurm, K8s, etc.)
# Worker receives its assigned keys and processes them directly
for key in assigned_keys:
FilteredImage.populate(key, reserve_jobs=False)When to use each approach:
Both approaches benefit from the same transaction-based conflict resolution as a safety net. Orphaned Job HandlingOrphaned jobs are reserved jobs from crashed or terminated processes. The API does not provide an algorithmic method for detecting or clearing orphaned jobs because this is dependent on the orchestration system (e.g., Slurm job IDs, Kubernetes pod status, process heartbeats). Users must manually clear orphaned jobs using the # Delete all reserved jobs (use with caution - may kill active jobs!)
MyTable.jobs.reserved.delete()
# Delete reserved jobs from a specific host that crashed
(MyTable.jobs.reserved & 'host="crashed-node"').delete()
# Delete reserved jobs older than 1 hour (likely orphaned)
(MyTable.jobs.reserved & 'reserved_time < NOW() - INTERVAL 1 HOUR').delete()
# Delete and re-add as pending
MyTable.jobs.reserved.delete()
MyTable.jobs.refresh()Note: Deleting a reserved job does not terminate the running worker—it simply removes the reservation record. If the worker is still running, it will complete its Configuration OptionsNew configuration settings for job management: # In datajoint config
dj.config['jobs.auto_refresh'] = True # Auto-refresh on populate (default: True)
dj.config['jobs.keep_completed'] = False # Keep success records (default: False)
dj.config['jobs.stale_timeout'] = 3600 # Seconds before pending job is considered stale (default: 3600)
dj.config['jobs.default_priority'] = 5 # Default priority for new jobs (lower = more urgent)Usage ExamplesBasic Distributed Computing# Worker 1
FilteredImage.populate(reserve_jobs=True)
# Worker 2 (can run simultaneously)
FilteredImage.populate(reserve_jobs=True)
# Monitor progress
print(FilteredImage.jobs.progress())Priority-Based Processing# Add urgent jobs (priority=0 is most urgent)
urgent_subjects = Subject & 'priority="urgent"'
FilteredImage.jobs.refresh(urgent_subjects, priority=0)
# Workers will process lowest-priority-value jobs first
FilteredImage.populate(reserve_jobs=True)Scheduled Processing# Schedule jobs for overnight processing (8 hours from now)
FilteredImage.jobs.refresh('subject_id > 100', delay=8*60*60)
# Only jobs whose scheduled_time <= now will be processed
FilteredImage.populate(reserve_jobs=True)Error Recovery# View errors
errors = FilteredImage.jobs.errors.fetch(as_dict=True)
for err in errors:
print(f"Key: {err['subject_id']}, Error: {err['error_message']}")
# Delete specific error jobs after fixing the issue
(FilteredImage.jobs & 'subject_id=42').delete()
# Delete all error jobs
FilteredImage.jobs.errors.delete()
# Re-add deleted jobs as pending (if keys still in key_source)
FilteredImage.jobs.refresh()Dashboard Queries# Get pipeline-wide status using schema.jobs
def pipeline_status(schema):
return {
jt.target.table_name: jt.progress()
for jt in schema.jobs
}
# Example output:
# {
# 'FilteredImage': {'pending': 150, 'reserved': 3, 'success': 847, 'error': 12},
# 'Analysis': {'pending': 500, 'reserved': 0, 'success': 0, 'error': 0},
# }
# Refresh all jobs tables in the schema
for jobs_table in schema.jobs:
jobs_table.refresh()
# Get all errors across the pipeline
all_errors = []
for jt in schema.jobs:
errors = jt.errors.fetch(as_dict=True)
for err in errors:
err['_table'] = jt.target.table_name
all_errors.append(err)Backward CompatibilityMigrationThis is a major release. The legacy schema-level
API CompatibilityThe # Returns list of JobsTable objects
schema.jobs
# [FilteredImage.jobs, Analysis.jobs, ...]
# Iterate over all jobs tables
for jobs_table in schema.jobs:
print(f"{jobs_table.target.table_name}: {jobs_table.progress()}")
# Query all errors across the schema
all_errors = [job for jt in schema.jobs for job in jt.errors.fetch(as_dict=True)]
# Refresh all jobs tables
for jobs_table in schema.jobs:
jobs_table.refresh()This replaces the legacy single Hazard AnalysisThis section identifies potential hazards and their mitigations. Race Conditions
State Transitions
Data Integrity
Performance
Operational
Migration
Future Extensions
RationaleWhy Not External Orchestration?The team considered integrating external tools like Airflow or Flyte but rejected this approach because:
The built-in jobs system provides 80% of the value with minimal additional complexity. Why Per-Table Jobs?Per-table jobs tables provide:
Why Remove Key Hashing?The current system hashes primary keys to support arbitrary key types. The new system uses native keys because:
Why FK-Derived Primary Keys Only?The jobs table primary key includes only attributes derived from foreign keys in the target table's primary key. This design:
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Problem Statement:
The current dataJoint-python approach for jobs reservation, orchestration, and execution (i.e. the
autopopulate) faces scalability limitations. While its original design effectively handled job reservation/distribution for parallelization, it falls short when building a comprehensive data platform.Limitations of the
jobstableThe existing
jobstable functions more as an error/reserve table than a true jobs queue.error(failed jobs) andreserved(jobs in progress) states. It lacks crucial statuses such as:pending/scheduled(jobs not yet started)success(record of successfully completed jobs and their duration).key_sourceto get a list of jobs, which, while ensuring up-to-date information, strains the database.Limitations of
key_sourceBehavior/UsageThe default
key_source(an inner-join of parent tables) is intended to represent all possible jobs for a given table.key_source(e.g., restricting byparamsetor other tables).key_sourcesettings are only visible to the local code executing the pipeline, not globally at the database level. This leads to:key_sourcedefinitions.(Table.key_source - Table).fetch('KEY')is DataJoint's method for retrieving the job queue and can be an expensive operation, especially when called frequently by multiple workers. This significantly strains the database server, as observed by other users.Proposed Solution: New Jobs Table
Step 1: A New
JOB2Table (Name TBD)A new table, tentatively named
JOB2, would be introduced with the following schema:table_name:varchar(255)- TheclassNameof the table.key_hash:char(32)- A hash of the job's key.status:enum('reserved','error','ignore','scheduled','success')- The current status of the job.key:json- A JSON structure containing the job's key.status_message:varchar(2000)- e.g., error message if failed.error_stack:mediumblob- The error stack if the job failed.timestamp:timestamp- The scheduled time (UTC) for the job to run.run_duration:float- The run duration in seconds.run_version:json- Representation of the code/environment version of the run (e.g., git commit hash).user:varchar(255)- The database user.host:varchar(255)- The system hostname.pid:int unsigned- The system process ID.connection_id:bigint unsigned- The database connection ID.Step 2: Mechanism to "Hydrate"/"Refresh" the
JOB2TableA new class method,
refresh_jobs(), would be introduced for every Autopopulate table. This method would:key_sourceof the table.JOB2.JOB2due to upstream record deletions.The key challenge here is how and when to trigger
refresh_jobs(). If triggered by everypopulate(reserved_jobs=True)call, it could become a bottleneck due to read/write operations toJOB2and potential race conditions/deadlocks.Step 3: New/Updated
populate()FunctionThe
populate()function would be updated to:JOB2for a list of "scheduled" jobs.populate1(key)as usual for each job.JOB2tosuccessand add additional information (e.g., run duration, code version).Considerations
refresh_jobs()Frequency and Staleness: How often shouldrefresh_jobs()be called, and what level of staleness inJOB2is acceptable? A centralized process could refresh jobs for each research project on a schedule (e.g., every 10, 15, or 30 minutes), similar to current worker-manager cron jobs. This would address the performance issues related tokey_sourcethat many users have experienced.refresh_jobs()without Pipeline Code: Shouldrefresh_jobs()be callable without the pipeline code installed (i.e., from a "virtual module")? Yes, to avoid the complexity and expense of requiring full code installation.Notes
We have considered adopting and integrating with other industry standards for workflow orchestration such as Airflow, Flyte or Prefect, and have produced and evaluated multiple working prototypes.
However, we think that the additional burden of deployment & maintenance of those tools is too much for a python open-source project such as DataJoint - the enhanced features come with significant DevOps requirements & burden.
Beta Was this translation helpful? Give feedback.
All reactions