Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions examples/flytekit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Hamilton on FlyteKit

In this example we show you how to create a simple dataflow and have it executed on Flyte runtime. This dataflow
creates a dataframe as a result. It performs a series of transforms on the
input to create columns that appear in the output.


File organization:

* `my_functions.py` houses the logic that we want to compute. Note how the functions are named, and what input
parameters they require. That is how we create a DAG modeling the dataflow we want to happen.
* `my_script.py` houses how to get Hamilton to create the DAG and exercise it with some inputs.
* `my_notebook_script.py` houses how one might iterate in a notebook environment and provide a way to inline define Hamilton
functions and add them to the DAG constructed. To be clear, it is not used by `my_script.py`, but showing an alternate path
to running things.

To run things:
```bash
> python my_script.py
```

If you have questions, or need help with this example,
join us on [slack](https://join.slack.com/t/hamilton-opensource/shared_invite/zt-1bjs72asx-wcUTgH7q7QX1igiQ5bbdcg), and we'll try to help!
38 changes: 38 additions & 0 deletions examples/flytekit/my_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pandas as pd

"""
Notes:
1. This file is used for all the [ray|dask|spark]/hello_world examples.
2. It therefore show cases how you can write something once and not only scale it, but port it
to different frameworks with ease!
"""


def avg_3wk_spend(spend: pd.Series) -> pd.Series:
"""Rolling 3 week average spend."""
return spend.rolling(3).mean()


def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series:
"""The cost per signup in relation to spend."""
return spend / signups


def spend_mean(spend: pd.Series) -> float:
"""Shows function creating a scalar. In this case it computes the mean of the entire column."""
return spend.mean()


def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series:
"""Shows function that takes a scalar. In this case to zero mean spend."""
return spend - spend_mean


def spend_std_dev(spend: pd.Series) -> float:
"""Function that computes the standard deviation of the spend column."""
return spend.std()


def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series:
"""Function showing one way to make spend have zero mean and unit variance."""
return spend_zero_mean / spend_std_dev
1 change: 1 addition & 0 deletions examples/flytekit/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sf-hamilton[flytekit]
35 changes: 35 additions & 0 deletions examples/flytekit/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import importlib
import logging
import sys

import pandas as pd

from hamilton import base, driver
from hamilton.experimental import h_flytekit

logging.basicConfig(stream=sys.stdout)
initial_columns = { # load from actuals or wherever -- this is our initial data we use as input.
# Note: these values don't have to be all series, they could be a scalar.
"signups": pd.Series([1, 10, 50, 100, 200, 400]),
"spend": pd.Series([10, 10, 20, 40, 40, 50]),
}
# we need to tell hamilton where to load function definitions from
module_name = "my_functions"
module = importlib.import_module(module_name)
fga = h_flytekit.FlyteKitAdapter()
dr = driver.Driver(initial_columns, module, adapter=fga) # can pass in multiple modules
# we need to specify what we want in the final dataframe.
output_columns = [
"spend",
"signups",
"avg_3wk_spend",
"spend_per_signup",
"spend_zero_mean_unit_variance",
]
# let's create the dataframe!
df = dr.execute(output_columns)
print(df.to_string())

# To visualize do `pip install sf-hamilton[visualization]` if you want these to work
# dr.visualize_execution(output_columns, './my_dag.dot', {})
# dr.display_all_functions('./my_full_dag.dot')
Empty file.
1 change: 1 addition & 0 deletions graph_adapter_tests/h_flytekit/resources
47 changes: 47 additions & 0 deletions graph_adapter_tests/h_flytekit/test_h_flytekit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import sys

import numpy as np
import pandas as pd
import pytest

from hamilton import driver
from hamilton.experimental import h_flytekit

from . import unsupported_modules
from .resources import example_module


def test_flytekit_graph_adapter():
initial_columns = {
"signups": pd.Series([1, 10, 50, 100, 200, 400], name="signups"),
"spend": pd.Series([10, 10, 20, 40, 40, 50], name="spend"),
}
dr = driver.Driver(
initial_columns, example_module, adapter=h_flytekit.FlyteKitAdapter("my_workflow")
)
output_columns = [
"spend",
"signups",
"avg_3wk_spend",
"spend_per_signup",
]
df = dr.execute(output_columns)
assert set(df) == set(output_columns)
expected_column = pd.Series(
[0.0, 0.0, 13.33333, 23.33333, 33.33333, 43.33333],
index=[0, 1, 2, 3, 4, 5],
name="avg_3wk_spend",
)
pd.testing.assert_series_equal(df.avg_3wk_spend.fillna(0.0).sort_index(), expected_column)


def test_flytekit_graph_adapter_unsupported_type():
columns = {
"a": unsupported_modules.CustomType(1),
"b": unsupported_modules.CustomType(2),
}
dr = driver.Driver(
columns, unsupported_modules, adapter=h_flytekit.FlyteKitAdapter("my_workflow")
)
with pytest.raises(ValueError):
assert dr.execute(["a", "b", "add_custom_type"])
11 changes: 11 additions & 0 deletions graph_adapter_tests/h_flytekit/unsupported_modules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import numpy as np


class CustomType:
def __init__(self, value: int):
self.value = value


def add_custom_type(a: CustomType, b: CustomType) -> int:
"""adds two custom types, these should be class methods but this is just an example"""
return a.value + b.value
169 changes: 169 additions & 0 deletions hamilton/experimental/h_flytekit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import logging
import typing
from pathlib import Path
from typing import Type

import flytekit
import pandas as pd
from flytekit import FlyteContext, FlyteContextManager, Literal, LiteralType
from flytekit.core import promise, workflow
from flytekit.core.type_engine import T, TypeEngine, TypeTransformer
from flytekit.models.literals import Scalar, Schema
from flytekit.models.types import SchemaType
from flytekit.types.schema import SchemaEngine, SchemaFormat, SchemaHandler
from flytekit.types.schema.types_pandas import (
PandasDataFrameTransformer,
PandasSchemaReader,
PandasSchemaWriter,
)

from hamilton import base, node

logger = logging.getLogger(__name__)


class PandasSeriesTransformer(TypeTransformer[pd.Series]):
"""
Creates a transformer to handle PandasSeries, Similar to PandasTransformer
in flight repo https://github.com/flyteorg/flytekit/blob/master/flytekit/types/schema/types_pandas.py
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's different between the one here and that one?

In essence, we convert the series to dataframe before writing to the disk.
"""

def __init__(self):
super().__init__("PandasSeries<->GenericSchema", pd.Series)

def get_literal_type(self, t: Type[pd.Series]) -> LiteralType:
return LiteralType(schema=self._get_schema_type())

def to_literal(
self,
ctx: FlyteContext,
python_val: pd.Series,
python_type: Type[pd.Series],
expected: LiteralType,
) -> Literal:
local_dir = ctx.file_access.get_random_local_directory()
w = PandasSchemaWriter(local_dir=Path(local_dir), cols=None, fmt=SchemaFormat.PARQUET)
w.write(python_val.to_frame(name=str(python_val.name)))
remote_path = ctx.file_access.get_random_remote_directory()
ctx.file_access.put_data(local_dir, remote_path, is_multipart=True)
return Literal(scalar=Scalar(schema=Schema(remote_path, self._get_schema_type())))

def to_python_value(
self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[pd.Series]
) -> T:
if not (lv and lv.scalar and lv.scalar.schema):
return pd.Series()
local_dir = ctx.file_access.get_random_local_directory()
ctx.file_access.get_data(lv.scalar.schema.uri, local_dir, is_multipart=True)
r = PandasSchemaReader(local_dir=Path(local_dir), cols=None, fmt=SchemaFormat.PARQUET)
# return first column as series should have only one
return r.all().iloc[:, 0]

def to_html(
self, ctx: FlyteContext, python_val: pd.Series, expected_python_type: Type[pd.Series]
) -> str:
return python_val.describe().to_string()

def _get_schema_type(self):
return SchemaType(columns=[])


# Register with FlyteKit type engine
TypeEngine.register(PandasSeriesTransformer())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ramannanda9 since Hamilton supports any object type, we need to think how to handle arbitrary python object types here too.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah not the best idea, but they ideally could default to just pickle dumping rather than raising unsupported type.



def _check_supported_type_in_flyte(node_type: typing.Type) -> bool:
try:
TypeEngine.get_transformer(node_type)
return True
except ValueError:
return False


class FlyteKitAdapter(base.HamiltonGraphAdapter, base.ResultMixin):
"""Class representing what's require to run Hamilton on Flytekit

Use `pip install sf-hamilton[flytekit] to get the dependencies required to run this.

Flytekit Python is the Python Library for easily authoring, testing, deploying,
and interacting with Flyte tasks, workflows, and launch plans

If you have custom types that are not supported with FlyteKit that you want to use in Hamilton,
you can register them with the type engine in the driver.

Dataclasses, numpy arrays, pandas series and python native types are supported OOTB.


"""
Copy link
Copy Markdown
Collaborator

@skrawcz skrawcz Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the right track @ramannanda9 !

Here we'd probably need an explanation here of how we're actually using Flyte. E.g. the code that we map things to, how does it behave/work and are there any caveats, decisions made, etc.


def __init__(
self, workflow_name: str, result_builder: base.ResultMixin = base.PandasDataFrameResult
):
"""Constructor"""
self.workflow = workflow.ImperativeWorkflow(workflow_name)
self.result_builder = result_builder

@staticmethod
def check_input_type(node_type: typing.Type, input_value: typing.Any) -> bool:
if isinstance(node_type, promise.Promise):
return True
return node_type == typing.Any or isinstance(input_value, node_type)

@staticmethod
def check_node_type_equivalence(node_type: typing.Type, input_type: typing.Type) -> bool:
return node_type == input_type

def execute_node(self, node: node.Node, kwargs: typing.Dict[str, typing.Any]) -> typing.Any:
"""Function that is called as we walk the graph to determine how to execute a hamilton function.

:param node: the node from the graph
:param kwargs: the arguments that should be passed to it
:return: Create a flytekit task and execute it?
"""
# define a python task
task = flytekit.PythonFunctionTask(task_config=None, task_function=node.callable)
input_kwargs = {}
# map node inputs to flyte inputs
for input_node in node.dependencies:
# avoids adding the same input twice
if input_node.name not in self.workflow.inputs:
if _check_supported_type_in_flyte(input_node.type):
self.workflow.add_workflow_input(input_node.name, input_node.type)
else:
raise ValueError(
f"Input type {input_node.type} is not supported by Flytekit. "
f"Please register it with the type engine in the driver code."
)
input_kwargs[input_node.name] = self.workflow.inputs[input_node.name]
# add the node to workflow, Adding the entity i.e. the callable to the workflow also populates the nodes outputs
wf_node = self.workflow.add_entity(task, **input_kwargs)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be misreading, but why would it already have outputs/where does it get it from?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, See Imperative Workflow . It happens when you add the entity i.e. task with input_kwargs

# map node output to flyte output
if wf_node.outputs is not None:
# we should have single output....
output_name, output_value = next(iter(wf_node.outputs.items()))
# map node output to flyte output
if _check_supported_type_in_flyte(node.type):
self.workflow.add_workflow_output(node.name, output_value, node.type)
else:
raise ValueError(
f"Output type {node.type} is not supported by Flytekit. "
f"Please register it with the type engine in the driver code."
)

return wf_node

def build_result(self, **outputs: typing.Dict[str, typing.Any]) -> typing.Any:
execution_results = self.workflow.execute(**outputs)
if execution_results is not None:
expected_output_names = list(self.workflow.python_interface.outputs.keys())
if len(expected_output_names) == 1:
wf_outputs_as_map = {expected_output_names[0]: execution_results}
else:
wf_outputs_as_map = {
expected_output_names[i]: execution_results[i]
for i, _ in enumerate(execution_results)
}
for k, v in wf_outputs_as_map.items():
outputs[k] = v
return self.result_builder.build_result(**outputs)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def load_requirements():
"ray": ["ray>=2.0.0", "pyarrow"],
"pyspark": ["pyspark[pandas_on_spark]"],
"pandera": ["pandera"],
"flytekit": ["flytekit"],
},
# Relevant project URLs
project_urls={ # Optional
Expand Down