xintongsong commented on code in PR #6:
URL: https://github.com/apache/flink-agents/pull/6#discussion_r2141355531


##########
python/flink_agents/plan/function.py:
##########
@@ -0,0 +1,129 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+import importlib
+import inspect
+from abc import ABC, abstractmethod
+from typing import Any, Callable, Dict, Tuple
+
+from pydantic import BaseModel
+
+
+class Function(BaseModel, ABC):
+    """Base interface for user defined functions, includes python and java."""
+
+    @abstractmethod
+    def check_signature(self, checker: Callable) -> None:
+        """Check function signature is legal or not."""
+
+    @abstractmethod
+    def __call__(self, *args: Tuple[Any, ...], **kwargs: Dict[str, Any]) -> 
Any:
+        """Execute function."""
+
+class PythonFunction(Function):
+    """Descriptor for a python callable function, storing module and qualified 
name for
+    dynamic retrieval.
+
+    This class allows serialization and lazy loading of functions by storing 
their
+    module and qualified name. The actual callable is loaded on-demand when 
the instance
+    is called.
+
+    Attributes:
+    ----------
+    module : str
+        Name of the Python module where the function is defined.
+    qualname : str
+        Qualified name of the function (e.g., 'ClassName.method' for class 
methods).
+    """
+
+    module: str
+    qualname: str
+    __func: Callable = None
+
+    @staticmethod
+    def from_callable(func: Callable) -> Function:
+        """Create a Function descriptor from an existing callable.
+
+        Parameters
+        ----------
+        func : Callable
+            The function or method to be wrapped.
+
+        Returns:
+        -------
+        Function
+            A Function instance with module and qualname populated based on 
the input
+            callable.
+        """
+        return PythonFunction(
+            module=inspect.getmodule(func).__name__,
+            qualname=func.__qualname__,
+            __func=func,
+        )
+
+    def check_signature(self, checker: Callable) -> None:
+        """Apply external check logic to function signature."""
+        checker(self.__get_func())
+
+    def __call__(self, *args: Tuple[Any, ...], **kwargs: Dict[str, Any]) -> 
Any:
+        """Execute the stored function with provided arguments.
+
+        Lazily loads the function from its module and qualified name if not 
already
+        cached.
+
+        Parameters
+        ----------
+        *args : tuple
+            Positional arguments to pass to the function.
+        **kwargs : dict
+            Keyword arguments to pass to the function.
+
+        Returns:
+        -------
+        Any
+            The result of calling the resolved function with the provided 
arguments.
+
+        Notes:
+        -----
+        If the function is a method (qualified name contains a class 
reference), it will
+        resolve the method from the corresponding class.
+        """
+        return self.__get_func()(*args, **kwargs)
+
+    def __get_func(self) -> Callable:
+        if self.__func is None:
+            module = importlib.import_module(self.module)
+            if "." in self.qualname:
+                # Handle class methods (e.g., 'ClassName.method')
+                classname, methodname = self.qualname.rsplit(".", 1)
+                clazz = getattr(module, classname)
+                self.__func = getattr(clazz, methodname)
+            else:
+                # Handle standalone functions
+                self.__func = getattr(module, self.qualname)
+        return self.__func
+
+
+class JavaFunction(Function):
+    """Descriptor for a java callable function."""
+
+    def __call__(self, *args: Tuple[Any, ...], **kwargs: Dict[str, Any]) -> 
Any:
+        """Execute the stored function with provided arguments."""
+
+    def check_signature(self, checker: Callable) -> None:
+        """Check function signature is legal or not."""

Review Comment:
   Mark with TODO



##########
python/flink_agents/plan/action.py:
##########
@@ -0,0 +1,70 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import inspect
+from typing import Callable, List, Type
+
+from pydantic import BaseModel
+
+from flink_agents.api.event import Event
+from flink_agents.plan.function import Function
+
+
+class Action(BaseModel):
+    """Representation of a workflow action with event listening and function 
execution.
+
+    This class encapsulates a named workflow action that listens for specific 
event
+    types and executes an associated function when those events occur.
+
+    Attributes:
+    ----------
+    name : str
+        Name/identifier of the workflow Action.
+    exec : Function
+        To be executed when the Action is triggered.
+    listen_event_types : List[Type[Event]]
+        List of event types that will trigger this Action's execution.
+    """
+
+    name: str
+    exec: Function
+    listen_event_types: List[Type[Event]]
+
+    def __init__(
+            self,
+            name: str,
+            exec: Function,
+            listen_event_types: List[Type[Event]],
+    ) -> None:
+        """Action will check function signature when init."""
+        super().__init__(name=name, exec=exec, 
listen_event_types=listen_event_types)
+        exec.check_signature(self.check_signature)
+
+    @classmethod
+    def check_signature(cls, func: Callable) -> None:
+        """" Checker for action function signature."""
+        #TODO: update check logic after import State and RunnerContext.
+        params = inspect.signature(func).parameters
+        if len(params) != 1:
+            err_msg = "Action function must have exactly 1 parameter"
+            raise TypeError(err_msg)
+        for i, param in enumerate(params.values()):
+            if i == 0:
+                if not issubclass(param.annotation, Event):
+                    err_msg = "Action function first parameter must be Event"
+                    raise TypeError(err_msg)

Review Comment:
   What if `exec` is a java function?



##########
python/flink_agents/plan/tests/test_action.py:
##########
@@ -0,0 +1,47 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import pytest
+
+from flink_agents.api.event import InputEvent, OutputEvent
+from flink_agents.plan.action import Action
+from flink_agents.plan.function import PythonFunction
+
+
+def increment(event: InputEvent) -> OutputEvent: # noqa: D103
+    value = event.input
+    value += 1
+    return OutputEvent(output=value)
+
+def decrement(value: int) -> OutputEvent: # noqa: D103
+    value -= 1
+    return OutputEvent(output=value)
+
+def test_action_signature() -> None: # noqa: D103

Review Comment:
   Separate into two text cases



##########
python/flink_agents/api/event.py:
##########
@@ -0,0 +1,65 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import json
+from abc import ABC
+from typing import Any
+from uuid import UUID, uuid4
+
+from pydantic import BaseModel, Field, model_validator
+
+
+class Event(BaseModel, ABC, extra="allow"):
+    """Base class for all event types in the system. Event allow extra 
properties, but
+    these properties are required isinstance of BaseModel, or json 
serializable.
+
+    Attributes:
+    ----------
+    id : UUID
+        Unique identifier for the event, automatically generated using uuid4.
+    """
+    id: UUID = Field(default_factory=uuid4)
+
+    @model_validator(mode='after')
+    def validate_extra(self) -> 'Event':

Review Comment:
   This could be a reusable util. There could be more classes that we need to 
guarantee serializable.



##########
python/flink_agents/plan/action.py:
##########
@@ -0,0 +1,70 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import inspect
+from typing import Callable, List, Type
+
+from pydantic import BaseModel
+
+from flink_agents.api.event import Event
+from flink_agents.plan.function import Function
+
+
+class Action(BaseModel):
+    """Representation of a workflow action with event listening and function 
execution.
+
+    This class encapsulates a named workflow action that listens for specific 
event
+    types and executes an associated function when those events occur.
+
+    Attributes:
+    ----------
+    name : str
+        Name/identifier of the workflow Action.
+    exec : Function
+        To be executed when the Action is triggered.
+    listen_event_types : List[Type[Event]]
+        List of event types that will trigger this Action's execution.
+    """
+
+    name: str
+    exec: Function
+    listen_event_types: List[Type[Event]]
+
+    def __init__(
+            self,
+            name: str,
+            exec: Function,
+            listen_event_types: List[Type[Event]],
+    ) -> None:
+        """Action will check function signature when init."""
+        super().__init__(name=name, exec=exec, 
listen_event_types=listen_event_types)
+        exec.check_signature(self.check_signature)
+
+    @classmethod
+    def check_signature(cls, func: Callable) -> None:
+        """" Checker for action function signature."""
+        #TODO: update check logic after import State and RunnerContext.
+        params = inspect.signature(func).parameters
+        if len(params) != 1:
+            err_msg = "Action function must have exactly 1 parameter"
+            raise TypeError(err_msg)
+        for i, param in enumerate(params.values()):
+            if i == 0:
+                if not issubclass(param.annotation, Event):
+                    err_msg = "Action function first parameter must be Event"

Review Comment:
   Why having these 2 different error messages? I think it would be more clear 
to just show the expected and current signatures.



##########
python/flink_agents/plan/tests/test_action.py:
##########
@@ -0,0 +1,47 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import pytest
+
+from flink_agents.api.event import InputEvent, OutputEvent
+from flink_agents.plan.action import Action
+from flink_agents.plan.function import PythonFunction
+
+
+def increment(event: InputEvent) -> OutputEvent: # noqa: D103

Review Comment:
   increment -> legal_action
   decrement -> illegal_action



##########
python/flink_agents/api/event.py:
##########
@@ -0,0 +1,65 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import json
+from abc import ABC
+from typing import Any
+from uuid import UUID, uuid4
+
+from pydantic import BaseModel, Field, model_validator
+
+
+class Event(BaseModel, ABC, extra="allow"):
+    """Base class for all event types in the system. Event allow extra 
properties, but
+    these properties are required isinstance of BaseModel, or json 
serializable.
+
+    Attributes:
+    ----------
+    id : UUID
+        Unique identifier for the event, automatically generated using uuid4.
+    """
+    id: UUID = Field(default_factory=uuid4)
+
+    @model_validator(mode='after')
+    def validate_extra(self) -> 'Event':
+        """Make sure all extra properties are serializable."""
+        for value in self.__pydantic_extra__.values():
+            if isinstance(value, BaseModel):

Review Comment:
   What if value as a BaseModel contains non-serializable extras?



##########
python/flink_agents/plan/tests/test_action.py:
##########
@@ -0,0 +1,47 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import pytest
+
+from flink_agents.api.event import InputEvent, OutputEvent
+from flink_agents.plan.action import Action
+from flink_agents.plan.function import PythonFunction
+
+
+def increment(event: InputEvent) -> OutputEvent: # noqa: D103
+    value = event.input
+    value += 1
+    return OutputEvent(output=value)

Review Comment:
   Should not have return value



##########
python/flink_agents/api/event.py:
##########
@@ -0,0 +1,65 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import json
+from abc import ABC
+from typing import Any
+from uuid import UUID, uuid4
+
+from pydantic import BaseModel, Field, model_validator
+
+
+class Event(BaseModel, ABC, extra="allow"):
+    """Base class for all event types in the system. Event allow extra 
properties, but
+    these properties are required isinstance of BaseModel, or json 
serializable.
+
+    Attributes:
+    ----------
+    id : UUID
+        Unique identifier for the event, automatically generated using uuid4.
+    """
+    id: UUID = Field(default_factory=uuid4)
+
+    @model_validator(mode='after')

Review Comment:
   When is this validator called?



##########
python/flink_agents/api/tests/test_event.py:
##########
@@ -0,0 +1,33 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Type
+
+import pytest
+
+from flink_agents.api.event import Event
+
+
+def test_event_serializable() -> None: #noqa D103

Review Comment:
   Why do we need this `#noqa D103`?



##########
python/flink_agents/api/event.py:
##########
@@ -0,0 +1,65 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import json
+from abc import ABC
+from typing import Any
+from uuid import UUID, uuid4
+
+from pydantic import BaseModel, Field, model_validator
+
+
+class Event(BaseModel, ABC, extra="allow"):
+    """Base class for all event types in the system. Event allow extra 
properties, but
+    these properties are required isinstance of BaseModel, or json 
serializable.
+
+    Attributes:
+    ----------
+    id : UUID
+        Unique identifier for the event, automatically generated using uuid4.
+    """
+    id: UUID = Field(default_factory=uuid4)
+
+    @model_validator(mode='after')
+    def validate_extra(self) -> 'Event':
+        """Make sure all extra properties are serializable."""
+        for value in self.__pydantic_extra__.values():
+            if isinstance(value, BaseModel):
+                continue
+            json.dumps(value)
+        return self
+
+
+class InputEvent(Event):
+    """Event generated by the framework, carrying an input data that
+    arrives at the workflow.
+    """
+
+    input: Any

Review Comment:
   Is `Any` serializable?



##########
python/flink_agents/plan/tests/test_workflow_plan.py:
##########
@@ -0,0 +1,47 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+from flink_agents.api.event import InputEvent, OutputEvent
+from flink_agents.plan.action import Action
+from flink_agents.plan.function import Function
+from flink_agents.plan.workflow_plan import WorkflowPlan
+
+
+def increment(event: InputEvent) -> OutputEvent: # noqa: D103
+    value = event.input
+    value += 1
+    return OutputEvent(isLegal=True, result=value)

Review Comment:
   I'd suggest to remove it for now. And we probably should also raise a 
warning when the action has a return value, as it will be ignored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to