wenjin272 commented on code in PR #373:
URL: https://github.com/apache/flink-agents/pull/373#discussion_r2621494563
##########
python/flink_agents/plan/resource_provider.py:
##########
@@ -89,34 +90,29 @@ class PythonResourceProvider(ResourceProvider):
The initialization arguments of the resource.
"""
- module: str
- clazz: str
- kwargs: Dict[str, Any]
+ descriptor: ResourceDescriptor
@staticmethod
def get(name: str, descriptor: ResourceDescriptor) ->
"PythonResourceProvider":
"""Create PythonResourceProvider instance."""
- clazz = descriptor.clazz
+ clazz = descriptor.clazz # 获取实际的类对象
Review Comment:
Chinese note
##########
python/flink_agents/api/resource.py:
##########
@@ -73,23 +74,177 @@ def validate_serializable(self) -> "SerializableResource":
return self
-class ResourceDescriptor:
- """Descriptor of resource, includes the class and the initialize
arguments."""
+class ResourceDescriptor(BaseModel):
+ """Descriptor for Resource instances, storing metadata for serialization
and
+ instantiation.
- _clazz: Type[Resource]
- _arguments: Dict[str, Any]
-
- def __init__(self, *, clazz: Type[Resource], **arguments: Any) -> None:
- """Init method."""
- self._clazz = clazz
- self._arguments = arguments
+ Attributes:
+ python_clazz: The Python class name (e.g., 'YourResourceClass').
+ python_module: The Python module path (e.g., 'your.module.path').
+ java_clazz: The Java class full path (e.g.,
'com.example.YourJavaClass').
+ Empty string for Python-only resources.
+ arguments: Dictionary containing resource initialization parameters.
+ """
+ python_clazz: str
+ python_module: str
+ java_clazz: str
+ arguments: Dict[str, Any]
+ _clazz: Type[Resource] | None = PrivateAttr(default=None)
+
+ def __init__(self, /,
+ python_clazz: str | None = None,
+ python_module: str | None = None,
+ java_clazz: str | None = "",
+ arguments: Dict[str, Any] | None = None,
+ *,
+ clazz: Type[Resource] | None = None,
+ **kwargs: Dict[str, Any]) -> None:
+ """Initialize ResourceDescriptor.
+
+ Args:
+ python_clazz: The Python class name (not the full path).
+ Usually auto-generated from clazz, users typically
don't need
+ to set this manually.
+ python_module: The Python module path (not including the class
name).
+ Usually auto-generated from clazz, users typically
don't need
+ to set this manually.
+ java_clazz: The Java class full path for cross-platform
compatibility.
+ Defaults to empty string for Python-only resources.
+ Users typically don't need to set this manually.
+ arguments: Dictionary containing resource initialization
parameters.
Review Comment:
The doc string is not explicit. If users typically don't need to set this
manually, the arguments should not set in the init function.
Actually, when user want to declare a java resource in python, they must set
`java_clazz`.
##########
python/flink_agents/api/resource.py:
##########
@@ -73,23 +74,177 @@ def validate_serializable(self) -> "SerializableResource":
return self
-class ResourceDescriptor:
- """Descriptor of resource, includes the class and the initialize
arguments."""
+class ResourceDescriptor(BaseModel):
+ """Descriptor for Resource instances, storing metadata for serialization
and
+ instantiation.
- _clazz: Type[Resource]
- _arguments: Dict[str, Any]
-
- def __init__(self, *, clazz: Type[Resource], **arguments: Any) -> None:
- """Init method."""
- self._clazz = clazz
- self._arguments = arguments
+ Attributes:
+ python_clazz: The Python class name (e.g., 'YourResourceClass').
+ python_module: The Python module path (e.g., 'your.module.path').
+ java_clazz: The Java class full path (e.g.,
'com.example.YourJavaClass').
+ Empty string for Python-only resources.
+ arguments: Dictionary containing resource initialization parameters.
+ """
+ python_clazz: str
+ python_module: str
+ java_clazz: str
+ arguments: Dict[str, Any]
+ _clazz: Type[Resource] | None = PrivateAttr(default=None)
+
+ def __init__(self, /,
+ python_clazz: str | None = None,
+ python_module: str | None = None,
+ java_clazz: str | None = "",
+ arguments: Dict[str, Any] | None = None,
+ *,
+ clazz: Type[Resource] | None = None,
+ **kwargs: Dict[str, Any]) -> None:
Review Comment:
The keyed arguments should be `**kwargs: Any`
Besides, if we need both `python_clazz` and `java_clazz`? For it won't be
set in the same time, and the type of the `ResourceProvider` can help
distinguish the `clazz` is python or java class. Maybe just `target_module` and
`target_clazz` is enough.
##########
python/flink_agents/plan/resource_provider.py:
##########
@@ -158,21 +154,60 @@ def provide(self, get_resource: Callable, config:
AgentConfiguration) -> Resourc
self.resource = clazz.model_validate(self.serialized)
return self.resource
+JAVA_RESOURCE_MAPPING: dict[ResourceType, str] = {
+ ResourceType.CHAT_MODEL:
"flink_agents.runtime.java.java_chat_model.JavaChatModelSetupImpl",
+ ResourceType.CHAT_MODEL_CONNECTION:
"flink_agents.runtime.java.java_chat_model.JavaChatModelConnectionImpl",
+}
-# TODO: implementation
class JavaResourceProvider(ResourceProvider):
"""Represent Resource Provider declared by Java.
Currently, this class only used for deserializing Java agent plan json
"""
+ descriptor: ResourceDescriptor
+ _j_resource_adapter: Any = None
+
+ @staticmethod
+ def get(name: str, descriptor: ResourceDescriptor) ->
"JavaResourceProvider":
+ """Create JavaResourceProvider instance."""
+ wrapper_clazz = descriptor.clazz
+ kwargs = {}
+ kwargs.update(descriptor.arguments)
+
+ clazz = descriptor.java_clazz
+ if len(clazz) <1:
+ err_msg = f"java_class are not set for {wrapper_clazz.__name__}"
+ raise KeyError(err_msg)
+
+ return JavaResourceProvider(
+ name=name,
+ type=wrapper_clazz.resource_type(),
+ descriptor=descriptor,
+ )
+
def provide(self, get_resource: Callable, config: AgentConfiguration) ->
Resource:
"""Create resource in runtime."""
- err_msg = (
- "Currently, flink-agents doesn't support create resource "
- "by JavaResourceProvider in python."
- )
- raise NotImplementedError(err_msg)
+ if not self._j_resource_adapter:
+ err_msg = "java resource adapter is not set"
+ raise RuntimeError(err_msg)
+
+ j_resource = self._j_resource_adapter.getResource(self.name,
self.type.value)
+
+ class_path = JAVA_RESOURCE_MAPPING.get(self.type)
+ if not class_path:
+ err_msg = f"No Java resource mapping found for {self.type.value}"
+ raise ValueError(err_msg)
+ module_path, class_name = class_path.rsplit(".", 1)
+ cls = get_resource_class(module_path, class_name)
+ kwargs = self.descriptor.arguments
+ print(kwargs)
Review Comment:
test print
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]