diff --git a/examples/flytekit/README.md b/examples/flytekit/README.md new file mode 100644 index 00000000..4693b972 --- /dev/null +++ b/examples/flytekit/README.md @@ -0,0 +1,23 @@ +# Hamilton on FlyteKit + +In this example we show you how to create a simple dataflow and have it executed on Flyte runtime. This dataflow +creates a dataframe as a result. It performs a series of transforms on the +input to create columns that appear in the output. + + +File organization: + +* `my_functions.py` houses the logic that we want to compute. Note how the functions are named, and what input +parameters they require. That is how we create a DAG modeling the dataflow we want to happen. +* `my_script.py` houses how to get Hamilton to create the DAG and exercise it with some inputs. +* `my_notebook_script.py` houses how one might iterate in a notebook environment and provide a way to inline define Hamilton +functions and add them to the DAG constructed. To be clear, it is not used by `my_script.py`, but showing an alternate path +to running things. + +To run things: +```bash +> python my_script.py +``` + +If you have questions, or need help with this example, +join us on [slack](https://join.slack.com/t/hamilton-opensource/shared_invite/zt-1bjs72asx-wcUTgH7q7QX1igiQ5bbdcg), and we'll try to help! diff --git a/examples/flytekit/my_functions.py b/examples/flytekit/my_functions.py new file mode 100644 index 00000000..b54f6b17 --- /dev/null +++ b/examples/flytekit/my_functions.py @@ -0,0 +1,38 @@ +import pandas as pd + +""" +Notes: + 1. This file is used for all the [ray|dask|spark]/hello_world examples. + 2. It therefore show cases how you can write something once and not only scale it, but port it + to different frameworks with ease! +""" + + +def avg_3wk_spend(spend: pd.Series) -> pd.Series: + """Rolling 3 week average spend.""" + return spend.rolling(3).mean() + + +def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series: + """The cost per signup in relation to spend.""" + return spend / signups + + +def spend_mean(spend: pd.Series) -> float: + """Shows function creating a scalar. In this case it computes the mean of the entire column.""" + return spend.mean() + + +def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series: + """Shows function that takes a scalar. In this case to zero mean spend.""" + return spend - spend_mean + + +def spend_std_dev(spend: pd.Series) -> float: + """Function that computes the standard deviation of the spend column.""" + return spend.std() + + +def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series: + """Function showing one way to make spend have zero mean and unit variance.""" + return spend_zero_mean / spend_std_dev diff --git a/examples/flytekit/requirements.txt b/examples/flytekit/requirements.txt new file mode 100644 index 00000000..f0c96d96 --- /dev/null +++ b/examples/flytekit/requirements.txt @@ -0,0 +1 @@ +sf-hamilton[flytekit] diff --git a/examples/flytekit/run.py b/examples/flytekit/run.py new file mode 100644 index 00000000..3b5671f1 --- /dev/null +++ b/examples/flytekit/run.py @@ -0,0 +1,35 @@ +import importlib +import logging +import sys + +import pandas as pd + +from hamilton import base, driver +from hamilton.experimental import h_flytekit + +logging.basicConfig(stream=sys.stdout) +initial_columns = { # load from actuals or wherever -- this is our initial data we use as input. + # Note: these values don't have to be all series, they could be a scalar. + "signups": pd.Series([1, 10, 50, 100, 200, 400]), + "spend": pd.Series([10, 10, 20, 40, 40, 50]), +} +# we need to tell hamilton where to load function definitions from +module_name = "my_functions" +module = importlib.import_module(module_name) +fga = h_flytekit.FlyteKitAdapter() +dr = driver.Driver(initial_columns, module, adapter=fga) # can pass in multiple modules +# we need to specify what we want in the final dataframe. +output_columns = [ + "spend", + "signups", + "avg_3wk_spend", + "spend_per_signup", + "spend_zero_mean_unit_variance", +] +# let's create the dataframe! +df = dr.execute(output_columns) +print(df.to_string()) + +# To visualize do `pip install sf-hamilton[visualization]` if you want these to work +# dr.visualize_execution(output_columns, './my_dag.dot', {}) +# dr.display_all_functions('./my_full_dag.dot') diff --git a/graph_adapter_tests/h_flytekit/__init__.py b/graph_adapter_tests/h_flytekit/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/graph_adapter_tests/h_flytekit/resources b/graph_adapter_tests/h_flytekit/resources new file mode 120000 index 00000000..1e58ceb6 --- /dev/null +++ b/graph_adapter_tests/h_flytekit/resources @@ -0,0 +1 @@ +../../tests/resources \ No newline at end of file diff --git a/graph_adapter_tests/h_flytekit/test_h_flytekit.py b/graph_adapter_tests/h_flytekit/test_h_flytekit.py new file mode 100644 index 00000000..f647f801 --- /dev/null +++ b/graph_adapter_tests/h_flytekit/test_h_flytekit.py @@ -0,0 +1,47 @@ +import sys + +import numpy as np +import pandas as pd +import pytest + +from hamilton import driver +from hamilton.experimental import h_flytekit + +from . import unsupported_modules +from .resources import example_module + + +def test_flytekit_graph_adapter(): + initial_columns = { + "signups": pd.Series([1, 10, 50, 100, 200, 400], name="signups"), + "spend": pd.Series([10, 10, 20, 40, 40, 50], name="spend"), + } + dr = driver.Driver( + initial_columns, example_module, adapter=h_flytekit.FlyteKitAdapter("my_workflow") + ) + output_columns = [ + "spend", + "signups", + "avg_3wk_spend", + "spend_per_signup", + ] + df = dr.execute(output_columns) + assert set(df) == set(output_columns) + expected_column = pd.Series( + [0.0, 0.0, 13.33333, 23.33333, 33.33333, 43.33333], + index=[0, 1, 2, 3, 4, 5], + name="avg_3wk_spend", + ) + pd.testing.assert_series_equal(df.avg_3wk_spend.fillna(0.0).sort_index(), expected_column) + + +def test_flytekit_graph_adapter_unsupported_type(): + columns = { + "a": unsupported_modules.CustomType(1), + "b": unsupported_modules.CustomType(2), + } + dr = driver.Driver( + columns, unsupported_modules, adapter=h_flytekit.FlyteKitAdapter("my_workflow") + ) + with pytest.raises(ValueError): + assert dr.execute(["a", "b", "add_custom_type"]) diff --git a/graph_adapter_tests/h_flytekit/unsupported_modules.py b/graph_adapter_tests/h_flytekit/unsupported_modules.py new file mode 100644 index 00000000..cb4c7954 --- /dev/null +++ b/graph_adapter_tests/h_flytekit/unsupported_modules.py @@ -0,0 +1,11 @@ +import numpy as np + + +class CustomType: + def __init__(self, value: int): + self.value = value + + +def add_custom_type(a: CustomType, b: CustomType) -> int: + """adds two custom types, these should be class methods but this is just an example""" + return a.value + b.value diff --git a/hamilton/experimental/h_flytekit.py b/hamilton/experimental/h_flytekit.py new file mode 100644 index 00000000..f1a8a70c --- /dev/null +++ b/hamilton/experimental/h_flytekit.py @@ -0,0 +1,169 @@ +import logging +import typing +from pathlib import Path +from typing import Type + +import flytekit +import pandas as pd +from flytekit import FlyteContext, FlyteContextManager, Literal, LiteralType +from flytekit.core import promise, workflow +from flytekit.core.type_engine import T, TypeEngine, TypeTransformer +from flytekit.models.literals import Scalar, Schema +from flytekit.models.types import SchemaType +from flytekit.types.schema import SchemaEngine, SchemaFormat, SchemaHandler +from flytekit.types.schema.types_pandas import ( + PandasDataFrameTransformer, + PandasSchemaReader, + PandasSchemaWriter, +) + +from hamilton import base, node + +logger = logging.getLogger(__name__) + + +class PandasSeriesTransformer(TypeTransformer[pd.Series]): + """ + Creates a transformer to handle PandasSeries, Similar to PandasTransformer + in flight repo https://github.com/flyteorg/flytekit/blob/master/flytekit/types/schema/types_pandas.py + In essence, we convert the series to dataframe before writing to the disk. + """ + + def __init__(self): + super().__init__("PandasSeries<->GenericSchema", pd.Series) + + def get_literal_type(self, t: Type[pd.Series]) -> LiteralType: + return LiteralType(schema=self._get_schema_type()) + + def to_literal( + self, + ctx: FlyteContext, + python_val: pd.Series, + python_type: Type[pd.Series], + expected: LiteralType, + ) -> Literal: + local_dir = ctx.file_access.get_random_local_directory() + w = PandasSchemaWriter(local_dir=Path(local_dir), cols=None, fmt=SchemaFormat.PARQUET) + w.write(python_val.to_frame(name=str(python_val.name))) + remote_path = ctx.file_access.get_random_remote_directory() + ctx.file_access.put_data(local_dir, remote_path, is_multipart=True) + return Literal(scalar=Scalar(schema=Schema(remote_path, self._get_schema_type()))) + + def to_python_value( + self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[pd.Series] + ) -> T: + if not (lv and lv.scalar and lv.scalar.schema): + return pd.Series() + local_dir = ctx.file_access.get_random_local_directory() + ctx.file_access.get_data(lv.scalar.schema.uri, local_dir, is_multipart=True) + r = PandasSchemaReader(local_dir=Path(local_dir), cols=None, fmt=SchemaFormat.PARQUET) + # return first column as series should have only one + return r.all().iloc[:, 0] + + def to_html( + self, ctx: FlyteContext, python_val: pd.Series, expected_python_type: Type[pd.Series] + ) -> str: + return python_val.describe().to_string() + + def _get_schema_type(self): + return SchemaType(columns=[]) + + +# Register with FlyteKit type engine +TypeEngine.register(PandasSeriesTransformer()) + + +def _check_supported_type_in_flyte(node_type: typing.Type) -> bool: + try: + TypeEngine.get_transformer(node_type) + return True + except ValueError: + return False + + +class FlyteKitAdapter(base.HamiltonGraphAdapter, base.ResultMixin): + """Class representing what's require to run Hamilton on Flytekit + + Use `pip install sf-hamilton[flytekit] to get the dependencies required to run this. + + Flytekit Python is the Python Library for easily authoring, testing, deploying, + and interacting with Flyte tasks, workflows, and launch plans + + If you have custom types that are not supported with FlyteKit that you want to use in Hamilton, + you can register them with the type engine in the driver. + + Dataclasses, numpy arrays, pandas series and python native types are supported OOTB. + + + """ + + def __init__( + self, workflow_name: str, result_builder: base.ResultMixin = base.PandasDataFrameResult + ): + """Constructor""" + self.workflow = workflow.ImperativeWorkflow(workflow_name) + self.result_builder = result_builder + + @staticmethod + def check_input_type(node_type: typing.Type, input_value: typing.Any) -> bool: + if isinstance(node_type, promise.Promise): + return True + return node_type == typing.Any or isinstance(input_value, node_type) + + @staticmethod + def check_node_type_equivalence(node_type: typing.Type, input_type: typing.Type) -> bool: + return node_type == input_type + + def execute_node(self, node: node.Node, kwargs: typing.Dict[str, typing.Any]) -> typing.Any: + """Function that is called as we walk the graph to determine how to execute a hamilton function. + + :param node: the node from the graph + :param kwargs: the arguments that should be passed to it + :return: Create a flytekit task and execute it? + """ + # define a python task + task = flytekit.PythonFunctionTask(task_config=None, task_function=node.callable) + input_kwargs = {} + # map node inputs to flyte inputs + for input_node in node.dependencies: + # avoids adding the same input twice + if input_node.name not in self.workflow.inputs: + if _check_supported_type_in_flyte(input_node.type): + self.workflow.add_workflow_input(input_node.name, input_node.type) + else: + raise ValueError( + f"Input type {input_node.type} is not supported by Flytekit. " + f"Please register it with the type engine in the driver code." + ) + input_kwargs[input_node.name] = self.workflow.inputs[input_node.name] + # add the node to workflow, Adding the entity i.e. the callable to the workflow also populates the nodes outputs + wf_node = self.workflow.add_entity(task, **input_kwargs) + # map node output to flyte output + if wf_node.outputs is not None: + # we should have single output.... + output_name, output_value = next(iter(wf_node.outputs.items())) + # map node output to flyte output + if _check_supported_type_in_flyte(node.type): + self.workflow.add_workflow_output(node.name, output_value, node.type) + else: + raise ValueError( + f"Output type {node.type} is not supported by Flytekit. " + f"Please register it with the type engine in the driver code." + ) + + return wf_node + + def build_result(self, **outputs: typing.Dict[str, typing.Any]) -> typing.Any: + execution_results = self.workflow.execute(**outputs) + if execution_results is not None: + expected_output_names = list(self.workflow.python_interface.outputs.keys()) + if len(expected_output_names) == 1: + wf_outputs_as_map = {expected_output_names[0]: execution_results} + else: + wf_outputs_as_map = { + expected_output_names[i]: execution_results[i] + for i, _ in enumerate(execution_results) + } + for k, v in wf_outputs_as_map.items(): + outputs[k] = v + return self.result_builder.build_result(**outputs) diff --git a/setup.py b/setup.py index 8247ac6a..7c1e8899 100644 --- a/setup.py +++ b/setup.py @@ -76,6 +76,7 @@ def load_requirements(): "ray": ["ray>=2.0.0", "pyarrow"], "pyspark": ["pyspark[pandas_on_spark]"], "pandera": ["pandera"], + "flytekit": ["flytekit"], }, # Relevant project URLs project_urls={ # Optional