You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hamilton and Airflow are complementary tools that operate at different levels:
Airflow orchestrates tasks (Python, SQL, Bash, etc.) and ensures they complete.
Hamilton is a data transformation micro-framework where Python functions define DAG nodes.
Today, using Hamilton inside Airflow requires manual boilerplate in every task: importing modules, creating the Driver, executing it, and handling results. The existing examples/airflow directory demonstrates this pattern. A provider package would eliminate this boilerplate and give Hamilton first-class status in the Airflow ecosystem.
This proposal targets Airflow 3 only. Airflow providers are moving away from Airflow 2 support, and an Airflow 3-only provider avoids compatibility shims, direct metadata database assumptions, and old XCom guidance that no longer reflects configured XCom backends.
Package Identity
Community provider (v1)
Official Apache provider (future)
PyPI name
airflow-provider-hamilton
apache-airflow-providers-apache-hamilton
Python package
hamilton_provider
airflow.providers.apache.hamilton
Lives in
Hamilton monorepo (submodule)
Airflow monorepo
Starting as a community provider is lower friction -- no Airflow PR process, release on Hamilton's own cadence. Can migrate to an official Apache provider later if there is demand.
No types.py dataclass is proposed for v1. The provider should expose Airflow-native operator/decorator arguments and normal TaskFlow function arguments instead of requiring a custom HamiltonTask return object.
A declarative operator for Hamilton modules that handle their own data loading or materialization. inputs, config, and overrides are templateable dictionaries. Dict-shaped Jinja templates require render_template_as_native_obj=True on the DAG; otherwise rendered values may arrive as strings and must either be valid JSON objects or fail with a clear AirflowException.
template_fields = ("inputs", "config", "overrides") -- Airflow renders these before execute() runs.
template_fields_renderers tells the Airflow UI these fields are JSON-like.
coerce_json_object() accepts dictionaries as-is, parses JSON object strings, and raises AirflowException for non-object strings with guidance to enable render_template_as_native_obj=True.
result_handler transforms output before Airflow handles XCom.
All Hamilton imports are deferred to execution so DAG parsing stays fast.
The operator does not query or write Airflow metadata database tables directly.
When to use
The operator is designed for static Airflow tasks where Hamilton modules are known at DAG parse time and runtime variation is mostly through config, inputs, and overrides. It works well for modules using Hamilton data loaders/savers, materializers, or Airflow XCom-backed inputs.
API 2: @task.hamilton
A TaskFlow decorator for cases needing Python input preparation, dynamic runtime values, or Builder-level control while still fitting normal Airflow TaskFlow semantics. The decorated function receives individual Airflow task arguments and returns a dictionary of additional Hamilton inputs, or None if no runtime inputs are needed.
Hamilton execution options live on the decorator/operator kwargs, not in a custom dataclass return value. This keeps the callable compatible with mapped tasks and with the way Airflow users expect TaskFlow functions to behave.
The decorated operator should follow the same pattern as Airflow's provider decorators: inherit from DecoratedOperator and the concrete underlying operator, pass kwargs_to_upstream, and customize the point where the Python callable result is available. The implementation should be tested against Airflow 3 rather than justified by a hand-written MRO description.
Async/deferrable execution is out of scope for v1. Hamilton has async driver behavior, but mixing that with Airflow's async/deferrable task lifecycle needs a separate design. v1 should fail fast for coroutine callables instead of accidentally running them through the synchronous driver path.
Shared Hamilton Execution
Both APIs should use one internal helper so operator and decorator behavior does not drift:
modules are strings for DAG serialization and parse-time lightness. If future users need module objects for advanced local use, that can be added explicitly later; v1 should keep the provider serialization-friendly.
OpenLineage
OpenLineage support is required for v1. Hamilton already exposes lineage-capable metadata through materializers and data loader/saver decorators; the Airflow provider should translate that metadata into Airflow's OpenLineage integration.
Provider behavior
Add a HamiltonLineageCollector lifecycle adapter that records Hamilton loader/saver node metadata during execution.
Implement get_openlineage_facets_on_complete(self, task_instance) on HamiltonOperator and _HamiltonDecoratedOperator.
Lazily import Airflow OpenLineage classes from the OpenLineage provider inside the lineage method.
Convert collected file metadata and SQL metadata into OperatorLineage(inputs=[...], outputs=[...]).
Return empty OperatorLineage() when no loader/saver/materializer metadata was collected.
Log extraction failures, but do not fail a successful Hamilton task only because optional lineage extraction failed.
The collector should rely on Hamilton metadata produced by @load_from, @save_to, @dataloader, @datasaver, and materializers. The docs should show that users get the best lineage when Hamilton performs or describes the I/O boundary instead of hiding all I/O inside arbitrary Python code.
XCom Strategy
Current Airflow Reality
The provider should not claim that XCom values must always be small. With the default database-backed XCom implementation, large values remain a poor fit. With a configured custom XCom backend, especially object-storage backends from Airflow Common IO patterns, deployments can route large XCom values to external storage and keep only references or metadata in the Airflow database.
The provider should therefore avoid prescribing a single persistence pattern:
Pattern
Mechanism
When to use
Airflow-managed XCom
Return Hamilton dict[str, Any]; Airflow's configured XCom backend stores or routes it.
Teams with an object-storage XCom backend or small scalar results
Hamilton-managed persistence
Use @save_to, @datasaver, or materializers and return metadata/path-like values.
Compatibility, custom serialization, or selecting a subset of results
Recommended Durable Dataset Pattern
Hamilton modules can use @save_to.* decorators to persist durable outputs. This is a data product and lineage choice, not a workaround for XCom in every deployment.
Do not build a Hamilton-specific custom XCom backend in v1. Users should configure Airflow's XCom backend for their deployment. The provider documentation should link to current Airflow/Astronomer guidance on custom XCom backend strategies and object-storage routing.
No implementation path reads or writes Airflow metadata database tables directly.
OpenLineage conversion returns empty lineage when no metadata exists.
Integration Tests
Unit tests with mocks cannot honestly validate behaviors that depend on the Airflow 3 task lifecycle (template rendering, mapped-task expansion, OpenLineage facet emission). These run against an in-process Airflow 3 task runner using temporary directories — hermetic, no external services required. Gated behind a [test] extras install so the base provider package stays lean.
Template rendering with native objects. Execute a HamiltonOperator inside a DAG with render_template_as_native_obj=True; assert that dict-shaped Jinja templates in inputs, config, and overrides arrive as dicts (not stringified) and that a non-object string raises AirflowException with the expected guidance.
End-to-end Hamilton execution via HamiltonOperator. Run a real Hamilton module end-to-end against a tmp dir using @load_from.csv / @save_to.parquet; assert final_vars are returned and that the file was written.
TaskFlow chaining and mapped-task expansion via @task.hamilton. Run a small TaskFlow DAG where @task.hamilton consumes upstream output via XCom; additionally exercise mapped-task expansion (.expand(...)) to confirm the decorated operator passes Airflow's mapped-task validation path at scheduler time.
OpenLineage facet emission. With the OpenLineage provider installed, execute a HamiltonOperator whose Hamilton module uses @load_from / @save_to; invoke get_openlineage_facets_on_complete(task_instance) and assert the returned OperatorLineage carries the expected input and output datasets. Also assert empty OperatorLineage() when the module performs no I/O.
Documentation
Installation for Airflow 3 only.
HamiltonOperator quickstart.
@task.hamilton quickstart using individual TaskFlow arguments.
Template rendering guidance, including render_template_as_native_obj=True.
XCom backend guidance that distinguishes default DB XCom from object-storage/custom XCom backends.
OpenLineage setup and an example using Hamilton @load_from / @save_to.
What Is NOT in v1
Airflow 2 support: v1 is Airflow 3 only.
Direct Airflow metadata DB access: use public Airflow APIs only.
Custom XCom backend: deployment concern, not provider scope.
Airflow Connection type / Hook: Hamilton runs local Python dataflows; there is no single external service connection.
HamiltonTask / HamiltonDataClass decorator return type: not Airflow-native enough and weak for mapped tasks.
Async/deferrable execution: requires a separate design using Hamilton async driver semantics and Airflow's deferrable model.
Dynamic execution (Parallelizable[]): advanced Builder feature. Can be added later.
IDE auto-completion for @task.hamilton in Airflow's core task object: applicable only if the provider moves into the Airflow monorepo or ships separate typing support.
System tests against external services: Airflow's tests/system/ convention targets real external systems (S3, BigQuery, Snowflake, etc.). v1 has no external service surface — Hamilton runs local Python and the provider declares no Hook or Connection type — so there is nothing meaningful to system-test against. File I/O paths are exercised by the integration tests against tmp directories.
The provider may add narrower minimum provider versions once implementation verifies the exact Airflow OpenLineage and standard provider APIs required.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Motivation
Hamilton and Airflow are complementary tools that operate at different levels:
Today, using Hamilton inside Airflow requires manual boilerplate in every task: importing modules, creating the Driver, executing it, and handling results. The existing
examples/airflowdirectory demonstrates this pattern. A provider package would eliminate this boilerplate and give Hamilton first-class status in the Airflow ecosystem.This proposal targets Airflow 3 only. Airflow providers are moving away from Airflow 2 support, and an Airflow 3-only provider avoids compatibility shims, direct metadata database assumptions, and old XCom guidance that no longer reflects configured XCom backends.
Package Identity
airflow-provider-hamiltonapache-airflow-providers-apache-hamiltonhamilton_providerairflow.providers.apache.hamiltonStarting as a community provider is lower friction -- no Airflow PR process, release on Hamilton's own cadence. Can migrate to an official Apache provider later if there is demand.
Package Structure
No
types.pydataclass is proposed for v1. The provider should expose Airflow-native operator/decorator arguments and normal TaskFlow function arguments instead of requiring a customHamiltonTaskreturn object.Entry Point Registration
pyproject.toml:get_provider_info.py:API Surface
API 1:
HamiltonOperatorA declarative operator for Hamilton modules that handle their own data loading or materialization.
inputs,config, andoverridesare templateable dictionaries. Dict-shaped Jinja templates requirerender_template_as_native_obj=Trueon the DAG; otherwise rendered values may arrive as strings and must either be valid JSON objects or fail with a clearAirflowException.Usage
Implementation Sketch
Key properties:
template_fields = ("inputs", "config", "overrides")-- Airflow renders these beforeexecute()runs.template_fields_rendererstells the Airflow UI these fields are JSON-like.coerce_json_object()accepts dictionaries as-is, parses JSON object strings, and raisesAirflowExceptionfor non-object strings with guidance to enablerender_template_as_native_obj=True.result_handlertransforms output before Airflow handles XCom.When to use
The operator is designed for static Airflow tasks where Hamilton modules are known at DAG parse time and runtime variation is mostly through config, inputs, and overrides. It works well for modules using Hamilton data loaders/savers, materializers, or Airflow XCom-backed inputs.
API 2:
@task.hamiltonA TaskFlow decorator for cases needing Python input preparation, dynamic runtime values, or Builder-level control while still fitting normal Airflow TaskFlow semantics. The decorated function receives individual Airflow task arguments and returns a dictionary of additional Hamilton inputs, or
Noneif no runtime inputs are needed.Hamilton execution options live on the decorator/operator kwargs, not in a custom dataclass return value. This keeps the callable compatible with mapped tasks and with the way Airflow users expect TaskFlow functions to behave.
Usage
Implementation Sketch
The decorator factory follows Airflow provider patterns:
Execution Model
The decorated operator should follow the same pattern as Airflow's provider decorators: inherit from
DecoratedOperatorand the concrete underlying operator, passkwargs_to_upstream, and customize the point where the Python callable result is available. The implementation should be tested against Airflow 3 rather than justified by a hand-written MRO description.Async/deferrable execution is out of scope for v1. Hamilton has async driver behavior, but mixing that with Airflow's async/deferrable task lifecycle needs a separate design. v1 should fail fast for coroutine callables instead of accidentally running them through the synchronous driver path.
Shared Hamilton Execution
Both APIs should use one internal helper so operator and decorator behavior does not drift:
modulesare strings for DAG serialization and parse-time lightness. If future users need module objects for advanced local use, that can be added explicitly later; v1 should keep the provider serialization-friendly.OpenLineage
OpenLineage support is required for v1. Hamilton already exposes lineage-capable metadata through materializers and data loader/saver decorators; the Airflow provider should translate that metadata into Airflow's OpenLineage integration.
Provider behavior
HamiltonLineageCollectorlifecycle adapter that records Hamilton loader/saver node metadata during execution.get_openlineage_facets_on_complete(self, task_instance)onHamiltonOperatorand_HamiltonDecoratedOperator.OperatorLineage(inputs=[...], outputs=[...]).OperatorLineage()when no loader/saver/materializer metadata was collected.Implementation Sketch
The collector should rely on Hamilton metadata produced by
@load_from,@save_to,@dataloader,@datasaver, and materializers. The docs should show that users get the best lineage when Hamilton performs or describes the I/O boundary instead of hiding all I/O inside arbitrary Python code.XCom Strategy
Current Airflow Reality
The provider should not claim that XCom values must always be small. With the default database-backed XCom implementation, large values remain a poor fit. With a configured custom XCom backend, especially object-storage backends from Airflow Common IO patterns, deployments can route large XCom values to external storage and keep only references or metadata in the Airflow database.
The provider should therefore avoid prescribing a single persistence pattern:
dict[str, Any]; Airflow's configured XCom backend stores or routes it.@save_to,@datasaver, or materializers and return metadata/path-like values.result_handlerbefore Airflow handles XCom.Recommended Durable Dataset Pattern
Hamilton modules can use
@save_to.*decorators to persist durable outputs. This is a data product and lineage choice, not a workaround for XCom in every deployment.Custom XCom Backend Scope
Do not build a Hamilton-specific custom XCom backend in v1. Users should configure Airflow's XCom backend for their deployment. The provider documentation should link to current Airflow/Astronomer guidance on custom XCom backend strategies and object-storage routing.
Operator vs. Decorator Decision Guide
HamiltonOperator@task.hamiltonTests and Documentation
Tests and docs are mandatory for the first PR.
Unit Tests
HamiltonOperatorimports modules at execution time and returnsdict[str, Any].HamiltonOperatorappliesconfig,inputs,overrides,adapters, materializers, andresult_handler.coerce_json_object; non-object strings raiseAirflowExceptionwithrender_template_as_native_obj=Trueguidance._HamiltonDecoratedOperatoraccepts normal TaskFlow args and merges returned runtime inputs.@task.hamiltonrejects non-dict, non-Nonecallable returns.@task.hamiltonrejects coroutine callables in v1.Integration Tests
Unit tests with mocks cannot honestly validate behaviors that depend on the Airflow 3 task lifecycle (template rendering, mapped-task expansion, OpenLineage facet emission). These run against an in-process Airflow 3 task runner using temporary directories — hermetic, no external services required. Gated behind a
[test]extras install so the base provider package stays lean.HamiltonOperatorinside a DAG withrender_template_as_native_obj=True; assert that dict-shaped Jinja templates ininputs,config, andoverridesarrive as dicts (not stringified) and that a non-object string raisesAirflowExceptionwith the expected guidance.HamiltonOperator. Run a real Hamilton module end-to-end against a tmp dir using@load_from.csv/@save_to.parquet; assertfinal_varsare returned and that the file was written.@task.hamilton. Run a small TaskFlow DAG where@task.hamiltonconsumes upstream output via XCom; additionally exercise mapped-task expansion (.expand(...)) to confirm the decorated operator passes Airflow's mapped-task validation path at scheduler time.HamiltonOperatorwhose Hamilton module uses@load_from/@save_to; invokeget_openlineage_facets_on_complete(task_instance)and assert the returnedOperatorLineagecarries the expected input and output datasets. Also assert emptyOperatorLineage()when the module performs no I/O.Documentation
HamiltonOperatorquickstart.@task.hamiltonquickstart using individual TaskFlow arguments.render_template_as_native_obj=True.@load_from/@save_to.What Is NOT in v1
HamiltonTask/HamiltonDataClassdecorator return type: not Airflow-native enough and weak for mapped tasks.Parallelizable[]): advanced Builder feature. Can be added later.@task.hamiltonin Airflow's coretaskobject: applicable only if the provider moves into the Airflow monorepo or ships separate typing support.tests/system/convention targets real external systems (S3, BigQuery, Snowflake, etc.). v1 has no external service surface — Hamilton runs local Python and the provider declares no Hook or Connection type — so there is nothing meaningful to system-test against. File I/O paths are exercised by the integration tests against tmp directories.Dependencies
The provider may add narrower minimum provider versions once implementation verifies the exact Airflow OpenLineage and standard provider APIs required.
Prior Art
examples/airflow/demonstrates manual Hamilton+Airflow integration.@task.snowparkinjects a Snowflake session into the callable and follows Airflow provider decorator patterns.@task.pysparkpasses Spark context and demonstratesDecoratedOperatorplus a concrete operator.get_openlineage_facets_on_complete()and returnOperatorLineage.Beta Was this translation helpful? Give feedback.
All reactions