This is an automated email from the ASF dual-hosted git repository.

skrawcz pushed a commit to branch remove_ray_workflows
in repository https://gitbox.apache.org/repos/asf/hamilton.git

commit d91a2ee1702a2433eb602151e60e86de10c12828
Author: Stefan Krawczyk <[email protected]>
AuthorDate: Fri Jul 25 23:27:04 2025 -0700

    Deletes RayWorkflows
    
    Ray workflows are now unsupported. So removing.
---
 .../graph-adapters/RayWorkflowGraphAdapter.rst     | 10 ---
 docs/reference/graph-adapters/index.rst            |  1 -
 examples/ray/hello_world/README.md                 |  6 --
 examples/ray/hello_world/run_rayworkflow.py        | 42 ----------
 hamilton/experimental/h_ray.py                     |  4 +-
 hamilton/plugins/h_ray.py                          | 97 ----------------------
 plugin_tests/h_ray/test_h_ray_workflow.py          | 75 -----------------
 7 files changed, 2 insertions(+), 233 deletions(-)

diff --git a/docs/reference/graph-adapters/RayWorkflowGraphAdapter.rst 
b/docs/reference/graph-adapters/RayWorkflowGraphAdapter.rst
deleted file mode 100644
index b302e2f6..00000000
--- a/docs/reference/graph-adapters/RayWorkflowGraphAdapter.rst
+++ /dev/null
@@ -1,10 +0,0 @@
-=============================
-h_ray.RayWorkflowGraphAdapter
-=============================
-A Graph Adapter for delegating the execution of hamilton nodes to Ray.
-
-
-.. autoclass:: hamilton.plugins.h_ray.RayWorkflowGraphAdapter
-   :special-members: __init__
-   :members:
-   :inherited-members:
diff --git a/docs/reference/graph-adapters/index.rst 
b/docs/reference/graph-adapters/index.rst
index 08c94575..37f84a87 100644
--- a/docs/reference/graph-adapters/index.rst
+++ b/docs/reference/graph-adapters/index.rst
@@ -20,5 +20,4 @@ Reference
    DaskGraphAdapter
    PySparkUDFGraphAdapter
    RayGraphAdapter
-   RayWorkflowGraphAdapter
    SparkKoalasGraphAdapter
diff --git a/examples/ray/hello_world/README.md 
b/examples/ray/hello_world/README.md
index f2c40d10..e40c3262 100644
--- a/examples/ray/hello_world/README.md
+++ b/examples/ray/hello_world/README.md
@@ -12,8 +12,6 @@ File organization:
 * `data_loaders.py` houses logic to load data for the business_logic.py 
module. The
 idea is that you'd swap this module out for other ways of loading data.
 * `run.py` is the script that ties everything together that uses vanilla Ray.
-* `run_rayworkflow.py` is the script that again ties everything together, but 
this time uses
-[Ray Workflows](https://docs.ray.io/en/latest/workflows/concepts.html) to 
execute.
 
 # Running the code:
 For the vanilla Ray implementation use:
@@ -23,7 +21,3 @@ For the vanilla Ray implementation use:
 Here is the visualization of the execution:
 
 ![ray_dag](ray_dag.png)
-
-For the [Ray Workflow](https://docs.ray.io/en/latest/workflows/concepts.html) 
implementation use:
-
-> python run_rayworkflow.py
diff --git a/examples/ray/hello_world/run_rayworkflow.py 
b/examples/ray/hello_world/run_rayworkflow.py
deleted file mode 100644
index 40c33c5e..00000000
--- a/examples/ray/hello_world/run_rayworkflow.py
+++ /dev/null
@@ -1,42 +0,0 @@
-import ray
-from ray import workflow
-
-from hamilton import base, driver, log_setup
-from hamilton.plugins import h_ray
-
-if __name__ == "__main__":
-    log_setup.setup_logging()
-    workflow.init()
-    # You can also script module import loading by knowing the module name
-    # See run.py for an example of doing it that way.
-    import business_logic
-    import data_loaders
-
-    modules = [data_loaders, business_logic]
-    initial_columns = {  # could load data here via some other means, or 
delegate to a module as we have done.
-        # 'signups': pd.Series([1, 10, 50, 100, 200, 400]),
-        "signups_location": "some_path",
-        # 'spend': pd.Series([10, 10, 20, 40, 40, 50]),
-        "spend_location": "some_other_path",
-    }
-    rga = h_ray.RayWorkflowGraphAdapter(
-        result_builder=base.PandasDataFrameResult(),
-        # Ray will resume a run if possible based on workflow id
-        workflow_id="hello-world-123",
-    )
-    dr = driver.Driver(initial_columns, *modules, adapter=rga)
-    # we need to specify what we want in the final dataframe.
-    output_columns = [
-        "spend",
-        "signups",
-        "avg_3wk_spend",
-        "spend_per_signup",
-        "spend_zero_mean_unit_variance",
-    ]
-    # let's create the dataframe!
-    df = dr.execute(output_columns)
-    # To visualize do `pip install "sf-hamilton[visualization]"` if you want 
these to work
-    # dr.visualize_execution(output_columns, './my_dag.dot', {})
-    # dr.display_all_functions('./my_full_dag.dot')
-    print(df.to_string())
-    ray.shutdown()
diff --git a/hamilton/experimental/h_ray.py b/hamilton/experimental/h_ray.py
index bf870b89..cfc36bf7 100644
--- a/hamilton/experimental/h_ray.py
+++ b/hamilton/experimental/h_ray.py
@@ -1,6 +1,6 @@
 import logging
 
-from hamilton.plugins.h_ray import RayGraphAdapter, RayTaskExecutor, 
RayWorkflowGraphAdapter
+from hamilton.plugins.h_ray import RayGraphAdapter, RayTaskExecutor
 
 logger = logging.getLogger(__name__)
 logger.warning(
@@ -8,4 +8,4 @@ logger.warning(
     " Please use hamilton.plugins.h_ray instead."
 )
 
-__all__ = ["RayGraphAdapter", "RayWorkflowGraphAdapter", "RayTaskExecutor"]
+__all__ = ["RayGraphAdapter", "RayTaskExecutor"]
diff --git a/hamilton/plugins/h_ray.py b/hamilton/plugins/h_ray.py
index 66d0a925..0b36d65a 100644
--- a/hamilton/plugins/h_ray.py
+++ b/hamilton/plugins/h_ray.py
@@ -6,7 +6,6 @@ import time
 import typing
 
 import ray
-from ray import workflow
 
 from hamilton import base, htypes, lifecycle, node
 from hamilton.execution import executors
@@ -172,102 +171,6 @@ class RayGraphAdapter(
             ray.shutdown()
 
 
-class RayWorkflowGraphAdapter(base.HamiltonGraphAdapter, base.ResultMixin):
-    """Class representing what's required to make Hamilton run Ray Workflows
-
-    Use `pip install sf-hamilton[ray]` to get the dependencies required to run 
this.
-
-    Ray workflows is a more robust way to scale computation for any type of 
Hamilton graph.
-
-    What's the difference between this and RayGraphAdapter?
-    --------------------------------------------------------
-        * Ray workflows offer durable computation. That is, they save and 
checkpoint each function.
-        * This enables one to run a workflow, and not have to restart it if 
something fails, assuming correct\
-        Ray workflow usage.
-
-    Tips
-    ----
-    See https://docs.ray.io/en/latest/workflows/basics.html for the source of 
the following:
-
-        1. Functions should be idempotent.
-        2. The workflow ID is what Ray uses to try to resume/restart if run a 
second time.
-        3. Nothing is run until the entire DAG is walked and setup and 
build_result is called.
-
-    Notes on scaling:
-    -----------------
-      - Multi-core on single machine ✅
-      - Distributed computation on a Ray cluster ✅
-      - Scales to any size of data ⛔️; you are LIMITED by the memory on the 
instance/computer 💻.
-
-    Function return object types supported:
-    ---------------------------------------
-      - Works for any python object that can be serialized by the Ray 
framework. ✅
-
-    Pandas?
-    --------
-      - ⛔️ Ray DOES NOT do anything special about Pandas.
-
-    CAVEATS
-    -------
-      - Serialization costs can outweigh the benefits of parallelism, so you 
should benchmark your code to see if it's\
-      worth it.
-
-    DISCLAIMER -- this class is experimental, so signature changes are a 
possibility!
-    """
-
-    def __init__(self, result_builder: base.ResultMixin, workflow_id: str):
-        """Constructor
-
-        :param result_builder: Required. An implementation of base.ResultMixin.
-        :param workflow_id: Required. An ID to give the ray workflow to 
identify it for durability purposes.
-        :param max_retries: Optional. The function will be retried for the 
given number of times if an
-            exception is raised.
-        """
-        self.result_builder = result_builder
-        self.workflow_id = workflow_id
-        if not self.result_builder:
-            raise ValueError(
-                "Error: ResultMixin object required. Please pass one in for 
`result_builder`."
-            )
-
-    @staticmethod
-    def check_input_type(node_type: typing.Type, input_value: typing.Any) -> 
bool:
-        # NOTE: the type of a raylet is unknown until they are computed
-        if isinstance(input_value, ray._raylet.ObjectRef):
-            return True
-        return htypes.check_input_type(node_type, input_value)
-
-    @staticmethod
-    def check_node_type_equivalence(node_type: typing.Type, input_type: 
typing.Type) -> bool:
-        return node_type == input_type
-
-    def execute_node(self, node: node.Node, kwargs: typing.Dict[str, 
typing.Any]) -> typing.Any:
-        """Function that is called as we walk the graph to determine how to 
execute a hamilton function.
-
-        :param node: the node from the graph.
-        :param kwargs: the arguments that should be passed to it.
-        :return: returns a ray object reference.
-        """
-        ray_options = parse_ray_remote_options_from_tags(node.tags)
-        return 
ray.remote(raify(node.callable)).options(**ray_options).bind(**kwargs)
-
-    def build_result(self, **outputs: typing.Dict[str, typing.Any]) -> 
typing.Any:
-        """Builds the result and brings it back to this running process.
-
-        :param outputs: the dictionary of key -> Union[ray object reference | 
value]
-        :return: The type of object returned by self.result_builder.
-        """
-        if logger.isEnabledFor(logging.DEBUG):
-            for k, v in outputs.items():
-                logger.debug(f"Got output {k}, with type [{type(v)}].")
-        # need to wrap our result builder in a remote call and then pass in 
what we want to build from.
-        remote_combine = 
ray.remote(self.result_builder.build_result).bind(**outputs)
-        result = workflow.run(
-            remote_combine, workflow_id=self.workflow_id
-        )  # this materializes the object locally
-        return result
-
-
 class RayTaskExecutor(executors.TaskExecutor):
     """Task executor using Ray for the new task-based execution mechanism in 
Hamilton.
     This is still experimental, so the API might change.
diff --git a/plugin_tests/h_ray/test_h_ray_workflow.py 
b/plugin_tests/h_ray/test_h_ray_workflow.py
deleted file mode 100644
index 050ebcc7..00000000
--- a/plugin_tests/h_ray/test_h_ray_workflow.py
+++ /dev/null
@@ -1,75 +0,0 @@
-import pandas as pd
-import pytest
-import ray
-from ray import workflow
-
-from hamilton import base, driver
-from hamilton.plugins import h_ray
-from plugin_tests.h_ray.resources import example_module, smoke_screen_module
-
-
[email protected](scope="module")
-def init():
-    # Do not need to call ray.init() when using a workflow now it seems?
-    yield "initialized"
-    ray.shutdown()
-
-
-# This does not work locally -- will ask Ray slack for support.
-def test_ray_workflow_graph_adapter(init):
-    workflow.init()
-    initial_columns = {
-        "signups": pd.Series([1, 10, 50, 100, 200, 400]),
-        "spend": pd.Series([10, 10, 20, 40, 40, 50]),
-    }
-    dr = driver.Driver(
-        initial_columns,
-        example_module,
-        adapter=h_ray.RayWorkflowGraphAdapter(
-            base.PandasDataFrameResult(), 
"test-test_ray_workflow_graph_adapter"
-        ),
-    )
-    output_columns = [
-        "spend",
-        "signups",
-        "avg_3wk_spend",
-        "spend_per_signup",
-    ]
-    df = dr.execute(output_columns)
-    assert set(df) == set(output_columns)
-    expected_column = pd.Series(
-        [0.0, 0.0, 13.33333, 23.33333, 33.33333, 43.33333], 
name="avg_3wk_spend"
-    )
-    pd.testing.assert_series_equal(
-        df.avg_3wk_spend.fillna(0.0), expected_column
-    )  # fill na to get around NaN
-    # TODO: do some more asserting?
-
-
-def test_smoke_screen_module(init):
-    workflow.init()
-    config = {"region": "US"}
-    dr = driver.Driver(
-        config,
-        smoke_screen_module,
-        adapter=h_ray.RayWorkflowGraphAdapter(
-            base.PandasDataFrameResult(), "test-test_smoke_screen_module"
-        ),
-    )
-    output_columns = [
-        "raw_acquisition_cost",
-        "pessimistic_net_acquisition_cost",
-        "neutral_net_acquisition_cost",
-        "optimistic_net_acquisition_cost",
-        "series_with_start_date_end_date",
-    ]
-    df = dr.execute(
-        inputs={"date_range": {"start_date": "20200101", "end_date": 
"20220801"}},
-        final_vars=output_columns,
-    )
-    epsilon = 0.00001
-    assert abs(df["raw_acquisition_cost"].mean() - 0.393808) < epsilon
-    assert abs(df["pessimistic_net_acquisition_cost"].mean() - 0.420769) < 
epsilon
-    assert abs(df["neutral_net_acquisition_cost"].mean() - 0.405582) < epsilon
-    assert abs(df["optimistic_net_acquisition_cost"].mean() - 0.399363) < 
epsilon
-    assert df["series_with_start_date_end_date"].iloc[0] == 
"date_20200101_date_20220801"

Reply via email to