weiqingy commented on code in PR #688: URL: https://github.com/apache/flink-agents/pull/688#discussion_r3296132823
########## python/flink_agents/api/tests/test_event_listener.py: ########## @@ -0,0 +1,101 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import unittest + +from flink_agents.api.event_context import EventContext +from flink_agents.api.events.event import Event +from flink_agents.api.listener.event_listener import EventListener + + +class GlobalListener(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + pass + + +class MainListenerMock(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + pass + + +class TestEventListener(unittest.TestCase): + def test_global_listener_str(self): + """Test the string representation of a global EventListener class.""" + module_name = GlobalListener.__module__ + expected = f"{module_name}:GlobalListener.on_event_processed" + assert str(GlobalListener) == expected + + def test_top_level_nested_listener_str(self): + """Test the string representation of a nested EventListener class defined at module level.""" + global TopOuter + + class TopOuter: + class TopInner(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + pass + + module_name = TopOuter.TopInner.__module__ + expected = f"{module_name}:TopOuter.TopInner.on_event_processed" + assert str(TopOuter.TopInner) == expected + + def test_local_listener_raises_error(self): + """Test that defining an EventListener in a local scope raises a ValueError.""" + import pytest + + def some_function() -> type: + class LocalListener(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + pass + + return LocalListener + + local_listener_cls = some_function() + with pytest.raises(ValueError, match="Cannot instantiate local class"): + str(local_listener_cls) + + def test_main_module_with_file_handling(self): + """Test string representation when the module is '__main__' and has a '__file__' attribute.""" + from unittest.mock import MagicMock, patch + + mock_module = MagicMock() + mock_module.__name__ = "__main__" + mock_module.__file__ = "/path/to/my_script.py" + + with patch("inspect.getmodule", return_value=mock_module): + assert str(MainListenerMock) == "my_script:MainListenerMock.on_event_processed" + + def test_main_module_without_file_handling(self): + """Test string representation when the module is '__main__' but lacks a '__file__' attribute.""" + from unittest.mock import MagicMock, patch + + mock_module = MagicMock() + mock_module.__name__ = "__main__" + # __file__ attribute is missing + + with patch("inspect.getmodule", return_value=mock_module): + # Should fallback to "__main__" if __file__ is missing + assert str(MainListenerMock) == "__main__:MainListenerMock.on_event_processed" Review Comment: This test claims to cover the missing-`__file__` branch, but `MagicMock()` auto-creates `__file__` on access — `hasattr(module_obj, "__file__")` is always `True` and the fallback branch never runs. The assertion happens to pass because `inspect.getmodule` is patched, but for the wrong reason. Switching to `spec=ModuleType` and explicitly deleting `__file__` would make the test actually exercise the fallback path. ########## python/flink_agents/runtime/python_java_utils.py: ########## @@ -321,6 +322,73 @@ def get_java_tool_metadata_from_tool(tool: Tool) -> typing.Dict[str, str]: } +def from_java_event_context(eventType: str, timestamp: str) -> EventContext: + """Create an ``EventContext`` from Java ``EventContext``'s property.""" + return EventContext(eventType=eventType, timestamp=timestamp) + + +def _resolve_module(module_name: str, class_qualname: str) -> Any: + """Resolve the module for the given module name and class qualname. + + This helper handles special cases like '__main__' and falls back to + searching in sys.modules if standard import fails. + """ + import importlib + import sys + + if module_name == "__main__": + module = sys.modules["__main__"] + first_part = class_qualname.split(".")[0] + if not hasattr(module, first_part): + for m in list(sys.modules.values()): Review Comment: Both the `__main__` and `ImportError` fallbacks walk `sys.modules.values()` and return the first module that has `class_qualname.split(".")[0]` as an attribute. If two modules top-level-export a class with the same name — not an unusual user mistake — the result is non-deterministic and silent: wrong listener class instantiated and notified. What invariant are we relying on here? If it's "module top-level uniqueness across the whole interpreter," can we raise on collision instead of letting first-match win? ########## runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java: ########## @@ -77,6 +78,11 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { static final String INVOKE_PYTHON_TOOL = PYTHON_MODULE_PREFIX + "invoke_python_tool"; + static final String FROM_JAVA_EVENT_CONTEXT = PYTHON_MODULE_PREFIX + "from_java_event_context"; + + static final String INSTANTIATE_PYTHON_EVENT_LISTER = Review Comment: Typo — `INSTANTIATE_PYTHON_EVENT_LISTER` is missing an `N`. Should be `INSTANTIATE_PYTHON_EVENT_LISTENER`. ########## python/flink_agents/api/event_context.py: ########## @@ -0,0 +1,35 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from pydantic import BaseModel + + +class EventContext(BaseModel): + """Contextual information about an event, such as its type and timestamp. + + Attributes: + ---------- + eventType : str + The routing key for the event, matching the ``EVENT_TYPE`` constant or + type string. + timestamp : str + Timestamp of when the event occurred. + """ + + eventType: str Review Comment: camelCase here clashes with the rest of `flink_agents/api`, which is snake_case throughout. User code reading `context.eventType` will stand out. Would `event_type` with `alias="eventType"` satisfy the Java JSON contract while keeping the Python attribute idiomatic? ########## runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java: ########## @@ -301,4 +290,76 @@ public void close() throws Exception { pythonEnvironmentManager.close(); } } + + /** + * A wrapper class that implements {@link EventListener} to delegate events to Python-side + * listeners. + * + * <p>Similar to {@link EventRouter#notifyEventProcessed(Event)}, this wrapper handles event + * notification, but specifically for Python listeners. To optimize resource usage and avoid + * redundant Java-to-Python conversions, this wrapper converts the {@link Event} and {@link + * EventContext} into Python objects only once per event notification. It then iterates through + * all registered Python listener entries to invoke their respective methods using the + * pre-converted Python objects. + */ + final class PythonEventListenerWrapper implements EventListener { + + private final List<Map.Entry<Object, PythonFunction>> listenerEntries; + + PythonEventListenerWrapper(List<Map.Entry<Object, PythonFunction>> listenerEntries) { + this.listenerEntries = listenerEntries; + } + + /** + * Processes the event by converting it and its context to Python objects once, then + * delegating to all registered Python listeners. + * + * @param context the event context + * @param event the event to process + */ + @Override + public void onEventProcessed(EventContext context, Event event) { + try { + // Convert java event to python event + final Object pythonEvent = pythonActionExecutor.convertJsonToPythonEvent(event); + // Convert java event context to python event context + final Object pythonEventContext = + pythonResourceAdapter.toPythonEventContext(context); + for (Map.Entry<Object, PythonFunction> entry : this.listenerEntries) { + final PythonFunction listenerFunction = entry.getValue(); + final Object listenerObject = entry.getKey(); + listenerFunction.call(listenerObject, pythonEventContext, pythonEvent); + } + } catch (Exception e) { + throw new RuntimeException(e); Review Comment: `throw new RuntimeException(e)` strips the descriptor of the listener that failed. The wrapper already holds `listenerEntries` — including the current entry's `module:qualname` in the message would turn "which of N listeners threw?" from a multi-minute debug into a one-line answer. ########## python/flink_agents/runtime/local_runner.py: ########## @@ -358,6 +381,15 @@ def run(self, **data: Dict[str, Any]) -> Any: raise return key + def _trigger_listener( + self, listener: EventListener, event_context: EventContext, event: Event + ) -> None: + """Trigger a single listener with error handling.""" + try: + listener.on_event_processed(event_context, event) + except Exception: + logger.exception("Error in EventListener execution") Review Comment: This swallows and logs; `EventRouter#notifyEventProcessed` on the Java side lets listener exceptions propagate. Same interface, two failure semantics depending on the runner. What's the rationale for diverging here? If it's deliberate, worth documenting the asymmetry in `event_listener.md` so users of both runners aren't surprised. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
