-
Notifications
You must be signed in to change notification settings - Fork 36
Running Hamilton on Flyte #233
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| # Hamilton on FlyteKit | ||
|
|
||
| In this example we show you how to create a simple dataflow and have it executed on Flyte runtime. This dataflow | ||
| creates a dataframe as a result. It performs a series of transforms on the | ||
| input to create columns that appear in the output. | ||
|
|
||
|
|
||
| File organization: | ||
|
|
||
| * `my_functions.py` houses the logic that we want to compute. Note how the functions are named, and what input | ||
| parameters they require. That is how we create a DAG modeling the dataflow we want to happen. | ||
| * `my_script.py` houses how to get Hamilton to create the DAG and exercise it with some inputs. | ||
| * `my_notebook_script.py` houses how one might iterate in a notebook environment and provide a way to inline define Hamilton | ||
| functions and add them to the DAG constructed. To be clear, it is not used by `my_script.py`, but showing an alternate path | ||
| to running things. | ||
|
|
||
| To run things: | ||
| ```bash | ||
| > python my_script.py | ||
| ``` | ||
|
|
||
| If you have questions, or need help with this example, | ||
| join us on [slack](https://join.slack.com/t/hamilton-opensource/shared_invite/zt-1bjs72asx-wcUTgH7q7QX1igiQ5bbdcg), and we'll try to help! |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| import pandas as pd | ||
|
|
||
| """ | ||
| Notes: | ||
| 1. This file is used for all the [ray|dask|spark]/hello_world examples. | ||
| 2. It therefore show cases how you can write something once and not only scale it, but port it | ||
| to different frameworks with ease! | ||
| """ | ||
|
|
||
|
|
||
| def avg_3wk_spend(spend: pd.Series) -> pd.Series: | ||
| """Rolling 3 week average spend.""" | ||
| return spend.rolling(3).mean() | ||
|
|
||
|
|
||
| def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series: | ||
| """The cost per signup in relation to spend.""" | ||
| return spend / signups | ||
|
|
||
|
|
||
| def spend_mean(spend: pd.Series) -> float: | ||
| """Shows function creating a scalar. In this case it computes the mean of the entire column.""" | ||
| return spend.mean() | ||
|
|
||
|
|
||
| def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series: | ||
| """Shows function that takes a scalar. In this case to zero mean spend.""" | ||
| return spend - spend_mean | ||
|
|
||
|
|
||
| def spend_std_dev(spend: pd.Series) -> float: | ||
| """Function that computes the standard deviation of the spend column.""" | ||
| return spend.std() | ||
|
|
||
|
|
||
| def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series: | ||
| """Function showing one way to make spend have zero mean and unit variance.""" | ||
| return spend_zero_mean / spend_std_dev |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| sf-hamilton[flytekit] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| import importlib | ||
| import logging | ||
| import sys | ||
|
|
||
| import pandas as pd | ||
|
|
||
| from hamilton import base, driver | ||
| from hamilton.experimental import h_flytekit | ||
|
|
||
| logging.basicConfig(stream=sys.stdout) | ||
| initial_columns = { # load from actuals or wherever -- this is our initial data we use as input. | ||
| # Note: these values don't have to be all series, they could be a scalar. | ||
| "signups": pd.Series([1, 10, 50, 100, 200, 400]), | ||
| "spend": pd.Series([10, 10, 20, 40, 40, 50]), | ||
| } | ||
| # we need to tell hamilton where to load function definitions from | ||
| module_name = "my_functions" | ||
| module = importlib.import_module(module_name) | ||
| fga = h_flytekit.FlyteKitAdapter() | ||
| dr = driver.Driver(initial_columns, module, adapter=fga) # can pass in multiple modules | ||
| # we need to specify what we want in the final dataframe. | ||
| output_columns = [ | ||
| "spend", | ||
| "signups", | ||
| "avg_3wk_spend", | ||
| "spend_per_signup", | ||
| "spend_zero_mean_unit_variance", | ||
| ] | ||
| # let's create the dataframe! | ||
| df = dr.execute(output_columns) | ||
| print(df.to_string()) | ||
|
|
||
| # To visualize do `pip install sf-hamilton[visualization]` if you want these to work | ||
| # dr.visualize_execution(output_columns, './my_dag.dot', {}) | ||
| # dr.display_all_functions('./my_full_dag.dot') |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| ../../tests/resources |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| import sys | ||
|
|
||
| import numpy as np | ||
| import pandas as pd | ||
| import pytest | ||
|
|
||
| from hamilton import driver | ||
| from hamilton.experimental import h_flytekit | ||
|
|
||
| from . import unsupported_modules | ||
| from .resources import example_module | ||
|
|
||
|
|
||
| def test_flytekit_graph_adapter(): | ||
| initial_columns = { | ||
| "signups": pd.Series([1, 10, 50, 100, 200, 400], name="signups"), | ||
| "spend": pd.Series([10, 10, 20, 40, 40, 50], name="spend"), | ||
| } | ||
| dr = driver.Driver( | ||
| initial_columns, example_module, adapter=h_flytekit.FlyteKitAdapter("my_workflow") | ||
| ) | ||
| output_columns = [ | ||
| "spend", | ||
| "signups", | ||
| "avg_3wk_spend", | ||
| "spend_per_signup", | ||
| ] | ||
| df = dr.execute(output_columns) | ||
| assert set(df) == set(output_columns) | ||
| expected_column = pd.Series( | ||
| [0.0, 0.0, 13.33333, 23.33333, 33.33333, 43.33333], | ||
| index=[0, 1, 2, 3, 4, 5], | ||
| name="avg_3wk_spend", | ||
| ) | ||
| pd.testing.assert_series_equal(df.avg_3wk_spend.fillna(0.0).sort_index(), expected_column) | ||
|
|
||
|
|
||
| def test_flytekit_graph_adapter_unsupported_type(): | ||
| columns = { | ||
| "a": unsupported_modules.CustomType(1), | ||
| "b": unsupported_modules.CustomType(2), | ||
| } | ||
| dr = driver.Driver( | ||
| columns, unsupported_modules, adapter=h_flytekit.FlyteKitAdapter("my_workflow") | ||
| ) | ||
| with pytest.raises(ValueError): | ||
| assert dr.execute(["a", "b", "add_custom_type"]) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| import numpy as np | ||
|
|
||
|
|
||
| class CustomType: | ||
| def __init__(self, value: int): | ||
| self.value = value | ||
|
|
||
|
|
||
| def add_custom_type(a: CustomType, b: CustomType) -> int: | ||
| """adds two custom types, these should be class methods but this is just an example""" | ||
| return a.value + b.value |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,169 @@ | ||
| import logging | ||
| import typing | ||
| from pathlib import Path | ||
| from typing import Type | ||
|
|
||
| import flytekit | ||
| import pandas as pd | ||
| from flytekit import FlyteContext, FlyteContextManager, Literal, LiteralType | ||
| from flytekit.core import promise, workflow | ||
| from flytekit.core.type_engine import T, TypeEngine, TypeTransformer | ||
| from flytekit.models.literals import Scalar, Schema | ||
| from flytekit.models.types import SchemaType | ||
| from flytekit.types.schema import SchemaEngine, SchemaFormat, SchemaHandler | ||
| from flytekit.types.schema.types_pandas import ( | ||
| PandasDataFrameTransformer, | ||
| PandasSchemaReader, | ||
| PandasSchemaWriter, | ||
| ) | ||
|
|
||
| from hamilton import base, node | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class PandasSeriesTransformer(TypeTransformer[pd.Series]): | ||
| """ | ||
| Creates a transformer to handle PandasSeries, Similar to PandasTransformer | ||
| in flight repo https://github.com/flyteorg/flytekit/blob/master/flytekit/types/schema/types_pandas.py | ||
| In essence, we convert the series to dataframe before writing to the disk. | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| super().__init__("PandasSeries<->GenericSchema", pd.Series) | ||
|
|
||
| def get_literal_type(self, t: Type[pd.Series]) -> LiteralType: | ||
| return LiteralType(schema=self._get_schema_type()) | ||
|
|
||
| def to_literal( | ||
| self, | ||
| ctx: FlyteContext, | ||
| python_val: pd.Series, | ||
| python_type: Type[pd.Series], | ||
| expected: LiteralType, | ||
| ) -> Literal: | ||
| local_dir = ctx.file_access.get_random_local_directory() | ||
| w = PandasSchemaWriter(local_dir=Path(local_dir), cols=None, fmt=SchemaFormat.PARQUET) | ||
| w.write(python_val.to_frame(name=str(python_val.name))) | ||
| remote_path = ctx.file_access.get_random_remote_directory() | ||
| ctx.file_access.put_data(local_dir, remote_path, is_multipart=True) | ||
| return Literal(scalar=Scalar(schema=Schema(remote_path, self._get_schema_type()))) | ||
|
|
||
| def to_python_value( | ||
| self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[pd.Series] | ||
| ) -> T: | ||
| if not (lv and lv.scalar and lv.scalar.schema): | ||
| return pd.Series() | ||
| local_dir = ctx.file_access.get_random_local_directory() | ||
| ctx.file_access.get_data(lv.scalar.schema.uri, local_dir, is_multipart=True) | ||
| r = PandasSchemaReader(local_dir=Path(local_dir), cols=None, fmt=SchemaFormat.PARQUET) | ||
| # return first column as series should have only one | ||
| return r.all().iloc[:, 0] | ||
|
|
||
| def to_html( | ||
| self, ctx: FlyteContext, python_val: pd.Series, expected_python_type: Type[pd.Series] | ||
| ) -> str: | ||
| return python_val.describe().to_string() | ||
|
|
||
| def _get_schema_type(self): | ||
| return SchemaType(columns=[]) | ||
|
|
||
|
|
||
| # Register with FlyteKit type engine | ||
| TypeEngine.register(PandasSeriesTransformer()) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ramannanda9 since Hamilton supports any object type, we need to think how to handle arbitrary python object types here too.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah not the best idea, but they ideally could default to just pickle dumping rather than raising unsupported type. |
||
|
|
||
|
|
||
| def _check_supported_type_in_flyte(node_type: typing.Type) -> bool: | ||
| try: | ||
| TypeEngine.get_transformer(node_type) | ||
| return True | ||
| except ValueError: | ||
| return False | ||
|
|
||
|
|
||
| class FlyteKitAdapter(base.HamiltonGraphAdapter, base.ResultMixin): | ||
| """Class representing what's require to run Hamilton on Flytekit | ||
|
|
||
| Use `pip install sf-hamilton[flytekit] to get the dependencies required to run this. | ||
|
|
||
| Flytekit Python is the Python Library for easily authoring, testing, deploying, | ||
| and interacting with Flyte tasks, workflows, and launch plans | ||
|
|
||
| If you have custom types that are not supported with FlyteKit that you want to use in Hamilton, | ||
| you can register them with the type engine in the driver. | ||
|
|
||
| Dataclasses, numpy arrays, pandas series and python native types are supported OOTB. | ||
|
|
||
|
|
||
| """ | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the right track @ramannanda9 ! Here we'd probably need an explanation here of how we're actually using Flyte. E.g. the code that we map things to, how does it behave/work and are there any caveats, decisions made, etc. |
||
|
|
||
| def __init__( | ||
| self, workflow_name: str, result_builder: base.ResultMixin = base.PandasDataFrameResult | ||
| ): | ||
| """Constructor""" | ||
| self.workflow = workflow.ImperativeWorkflow(workflow_name) | ||
| self.result_builder = result_builder | ||
|
|
||
| @staticmethod | ||
| def check_input_type(node_type: typing.Type, input_value: typing.Any) -> bool: | ||
| if isinstance(node_type, promise.Promise): | ||
| return True | ||
| return node_type == typing.Any or isinstance(input_value, node_type) | ||
|
|
||
| @staticmethod | ||
| def check_node_type_equivalence(node_type: typing.Type, input_type: typing.Type) -> bool: | ||
| return node_type == input_type | ||
|
|
||
| def execute_node(self, node: node.Node, kwargs: typing.Dict[str, typing.Any]) -> typing.Any: | ||
| """Function that is called as we walk the graph to determine how to execute a hamilton function. | ||
|
|
||
| :param node: the node from the graph | ||
| :param kwargs: the arguments that should be passed to it | ||
| :return: Create a flytekit task and execute it? | ||
| """ | ||
| # define a python task | ||
| task = flytekit.PythonFunctionTask(task_config=None, task_function=node.callable) | ||
| input_kwargs = {} | ||
| # map node inputs to flyte inputs | ||
| for input_node in node.dependencies: | ||
| # avoids adding the same input twice | ||
| if input_node.name not in self.workflow.inputs: | ||
| if _check_supported_type_in_flyte(input_node.type): | ||
| self.workflow.add_workflow_input(input_node.name, input_node.type) | ||
| else: | ||
| raise ValueError( | ||
| f"Input type {input_node.type} is not supported by Flytekit. " | ||
| f"Please register it with the type engine in the driver code." | ||
| ) | ||
| input_kwargs[input_node.name] = self.workflow.inputs[input_node.name] | ||
| # add the node to workflow, Adding the entity i.e. the callable to the workflow also populates the nodes outputs | ||
| wf_node = self.workflow.add_entity(task, **input_kwargs) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might be misreading, but why would it already have outputs/where does it get it from?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, See Imperative Workflow . It happens when you add the entity i.e. task with |
||
| # map node output to flyte output | ||
| if wf_node.outputs is not None: | ||
| # we should have single output.... | ||
| output_name, output_value = next(iter(wf_node.outputs.items())) | ||
| # map node output to flyte output | ||
| if _check_supported_type_in_flyte(node.type): | ||
| self.workflow.add_workflow_output(node.name, output_value, node.type) | ||
| else: | ||
| raise ValueError( | ||
| f"Output type {node.type} is not supported by Flytekit. " | ||
| f"Please register it with the type engine in the driver code." | ||
| ) | ||
|
|
||
| return wf_node | ||
|
|
||
| def build_result(self, **outputs: typing.Dict[str, typing.Any]) -> typing.Any: | ||
| execution_results = self.workflow.execute(**outputs) | ||
| if execution_results is not None: | ||
| expected_output_names = list(self.workflow.python_interface.outputs.keys()) | ||
| if len(expected_output_names) == 1: | ||
| wf_outputs_as_map = {expected_output_names[0]: execution_results} | ||
| else: | ||
| wf_outputs_as_map = { | ||
| expected_output_names[i]: execution_results[i] | ||
| for i, _ in enumerate(execution_results) | ||
| } | ||
| for k, v in wf_outputs_as_map.items(): | ||
| outputs[k] = v | ||
| return self.result_builder.build_result(**outputs) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's different between the one here and that one?