This is an automated email from the ASF dual-hosted git repository.
mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new d177ba1d12 [#9531] feat(python-client): add function DTOs,
request/response, and error handler (#9944)
d177ba1d12 is described below
commit d177ba1d1252a02c818fdabe8307987e4407eb5e
Author: mchades <[email protected]>
AuthorDate: Wed Feb 25 16:19:48 2026 +0800
[#9531] feat(python-client): add function DTOs, request/response, and error
handler (#9944)
### What changes were proposed in this pull request?
Add the data transfer layer for function support:
- FunctionDTO, FunctionColumnDTO, FunctionDefinitionDTO,
FunctionImplDTO,
FunctionParamDTO, FunctionResourcesDTO
- FunctionRegisterRequest, FunctionUpdateRequest, FunctionUpdatesRequest
- FunctionResponse, FunctionListResponse
- FunctionErrorHandler for REST error mapping
- Unit tests for DTO serialization and request validation
### Why are the changes needed?
Fix: #9531
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
tests added
---
.../gravitino/dto/function/__init__.py | 16 ++
.../gravitino/dto/function/function_column_dto.py | 77 ++++++
.../dto/function/function_definition_dto.py | 202 +++++++++++++++
.../gravitino/dto/function/function_dto.py | 112 +++++++++
.../gravitino/dto/function/function_impl_dto.py | 271 +++++++++++++++++++++
.../gravitino/dto/function/function_param_dto.py | 124 ++++++++++
.../dto/function/function_resources_dto.py | 82 +++++++
.../dto/requests/function_register_request.py | 94 +++++++
.../dto/requests/function_update_request.py | 232 ++++++++++++++++++
.../dto/requests/function_updates_request.py | 49 ++++
.../dto/responses/function_list_response.py | 47 ++++
.../gravitino/dto/responses/function_response.py | 52 ++++
.../exceptions/handlers/function_error_handler.py | 66 +++++
.../unittests/dto/responses/test_responses.py | 77 ++++++
.../tests/unittests/dto/test_function_dto.py | 197 +++++++++++++++
.../client-python/tests/unittests/test_requests.py | 146 +++++++++++
16 files changed, 1844 insertions(+)
diff --git a/clients/client-python/gravitino/dto/function/__init__.py
b/clients/client-python/gravitino/dto/function/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/clients/client-python/gravitino/dto/function/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/clients/client-python/gravitino/dto/function/function_column_dto.py
b/clients/client-python/gravitino/dto/function/function_column_dto.py
new file mode 100644
index 0000000000..6c6d132128
--- /dev/null
+++ b/clients/client-python/gravitino/dto/function/function_column_dto.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import Optional
+
+from dataclasses_json import DataClassJsonMixin, config
+
+from gravitino.api.function.function_column import FunctionColumn
+from gravitino.api.rel.types.json_serdes.type_serdes import TypeSerdes
+from gravitino.api.rel.types.type import Type
+
+
+@dataclass
+class FunctionColumnDTO(DataClassJsonMixin):
+ """DTO for function column."""
+
+ _name: str = field(metadata=config(field_name="name"))
+ _data_type: Type = field(
+ metadata=config(
+ field_name="dataType",
+ encoder=TypeSerdes.serialize,
+ decoder=TypeSerdes.deserialize,
+ )
+ )
+ _comment: Optional[str] = field(default=None,
metadata=config(field_name="comment"))
+
+ def name(self) -> str:
+ """Returns the column name."""
+ return self._name
+
+ def data_type(self) -> Type:
+ """Returns the column type."""
+ return self._data_type
+
+ def comment(self) -> Optional[str]:
+ """Returns the optional column comment."""
+ return self._comment
+
+ def to_function_column(self) -> FunctionColumn:
+ """Convert this DTO to a FunctionColumn instance."""
+ return FunctionColumn.of(self._name, self._data_type, self._comment)
+
+ @classmethod
+ def from_function_column(cls, column: FunctionColumn) ->
"FunctionColumnDTO":
+ """Create a FunctionColumnDTO from a FunctionColumn instance."""
+ return cls(
+ _name=column.name(),
+ _data_type=column.data_type(),
+ _comment=column.comment(),
+ )
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, FunctionColumnDTO):
+ return False
+ return (
+ self._name == other._name
+ and self._data_type == other._data_type
+ and self._comment == other._comment
+ )
+
+ def __hash__(self) -> int:
+ return hash((self._name, self._data_type, self._comment))
diff --git
a/clients/client-python/gravitino/dto/function/function_definition_dto.py
b/clients/client-python/gravitino/dto/function/function_definition_dto.py
new file mode 100644
index 0000000000..84c90d67a8
--- /dev/null
+++ b/clients/client-python/gravitino/dto/function/function_definition_dto.py
@@ -0,0 +1,202 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import Any, Dict, List, Optional
+
+from dataclasses_json import DataClassJsonMixin, config
+
+from gravitino.api.function.function_column import FunctionColumn
+from gravitino.api.function.function_definition import (
+ FunctionDefinition,
+ FunctionDefinitions,
+)
+from gravitino.api.function.function_impl import FunctionImpl
+from gravitino.api.function.function_param import FunctionParam
+from gravitino.api.rel.types.json_serdes.type_serdes import TypeSerdes
+from gravitino.api.rel.types.type import Type
+from gravitino.dto.function.function_column_dto import FunctionColumnDTO
+from gravitino.dto.function.function_impl_dto import (
+ FunctionImplDTO,
+ JavaImplDTO,
+ PythonImplDTO,
+ SQLImplDTO,
+ function_impl_dto_from_function_impl,
+)
+from gravitino.dto.function.function_param_dto import FunctionParamDTO
+
+
+def _decode_impl(impl_dict: Optional[Dict[str, Any]]) ->
Optional[FunctionImplDTO]:
+ """Decode a function implementation DTO from a dictionary."""
+ if impl_dict is None:
+ return None
+
+ language = impl_dict.get("language")
+ if language == "SQL":
+ return SQLImplDTO.from_dict(impl_dict)
+ if language == "JAVA":
+ return JavaImplDTO.from_dict(impl_dict)
+ if language == "PYTHON":
+ return PythonImplDTO.from_dict(impl_dict)
+
+ raise ValueError(f"Unsupported implementation language: {language}")
+
+
+def _decode_impls(impls_list: List[Dict[str, Any]]) -> List[FunctionImplDTO]:
+ """Decode a list of function implementation DTOs from a list of
dictionaries."""
+ if impls_list is None:
+ return []
+ return [_decode_impl(impl) for impl in impls_list]
+
+
+def _encode_impl(impl: Optional[FunctionImplDTO]) -> Optional[Dict[str, Any]]:
+ """Encode a function implementation DTO to a dictionary."""
+ if impl is None:
+ return None
+ result = impl.to_dict()
+ result["language"] = impl.language().name
+ return result
+
+
+def _encode_impls(impls: List[FunctionImplDTO]) -> List[Dict[str, Any]]:
+ """Encode a list of function implementation DTOs to a list of
dictionaries."""
+ if impls is None:
+ return []
+ return [_encode_impl(impl) for impl in impls]
+
+
+@dataclass
+class FunctionDefinitionDTO(FunctionDefinition, DataClassJsonMixin):
+ """DTO for function definition."""
+
+ _parameters: Optional[List[FunctionParamDTO]] = field(
+ default=None, metadata=config(field_name="parameters")
+ )
+ _return_type: Optional[Type] = field(
+ default=None,
+ metadata=config(
+ field_name="returnType",
+ encoder=TypeSerdes.serialize,
+ decoder=TypeSerdes.deserialize,
+ ),
+ )
+ _return_columns: Optional[List[FunctionColumnDTO]] = field(
+ default=None, metadata=config(field_name="returnColumns")
+ )
+ _impls: Optional[List[FunctionImplDTO]] = field(
+ default=None,
+ metadata=config(
+ field_name="impls",
+ encoder=_encode_impls,
+ decoder=_decode_impls,
+ ),
+ )
+
+ def parameters(self) -> List[FunctionParam]:
+ """Returns the parameters for this definition."""
+ return list(self._parameters) if self._parameters else []
+
+ def return_type(self) -> Optional[Type]:
+ """Returns the return type for scalar or aggregate functions."""
+ return self._return_type
+
+ def return_columns(self) -> List[FunctionColumn]:
+ """Returns the output columns for a table-valued function."""
+ if self._return_columns is None:
+ return FunctionDefinition.EMPTY_COLUMNS
+ return [col.to_function_column() for col in self._return_columns]
+
+ def impls(self) -> List[FunctionImpl]:
+ """Returns the implementations associated with this definition."""
+ if self._impls is None:
+ return []
+ return [impl.to_function_impl() for impl in self._impls]
+
+ def to_function_definition(self) -> FunctionDefinition:
+ """Convert this DTO to a FunctionDefinition instance."""
+ params = (
+ [p.to_function_param() for p in self._parameters]
+ if self._parameters
+ else []
+ )
+ impls = [impl.to_function_impl() for impl in self._impls] if
self._impls else []
+
+ if self._return_type is not None:
+ return FunctionDefinitions.of(params, self._return_type, impls)
+ if self._return_columns and len(self._return_columns) > 0:
+ cols = [col.to_function_column() for col in self._return_columns]
+ return FunctionDefinitions.of_table(params, cols, impls)
+ # Fallback for backward compatibility
+ return FunctionDefinitions.SimpleFunctionDefinition(params, None,
None, impls)
+
+ @classmethod
+ def from_function_definition(
+ cls, definition: FunctionDefinition
+ ) -> "FunctionDefinitionDTO":
+ """Create a FunctionDefinitionDTO from a FunctionDefinition
instance."""
+ param_dtos = (
+ [
+ (
+ p
+ if isinstance(p, FunctionParamDTO)
+ else FunctionParamDTO.from_function_param(p)
+ )
+ for p in definition.parameters()
+ ]
+ if definition.parameters()
+ else []
+ )
+
+ return_column_dtos = None
+ if definition.return_columns() and len(definition.return_columns()) >
0:
+ return_column_dtos = [
+ FunctionColumnDTO.from_function_column(col)
+ for col in definition.return_columns()
+ ]
+
+ impl_dtos = (
+ [function_impl_dto_from_function_impl(impl) for impl in
definition.impls()]
+ if definition.impls()
+ else []
+ )
+
+ return cls(
+ _parameters=param_dtos,
+ _return_type=definition.return_type(),
+ _return_columns=return_column_dtos,
+ _impls=impl_dtos,
+ )
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, FunctionDefinitionDTO):
+ return False
+ return (
+ self._parameters == other._parameters
+ and self._return_type == other._return_type
+ and self._return_columns == other._return_columns
+ and self._impls == other._impls
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ tuple(self._parameters) if self._parameters else None,
+ self._return_type,
+ tuple(self._return_columns) if self._return_columns else None,
+ tuple(self._impls) if self._impls else None,
+ )
+ )
diff --git a/clients/client-python/gravitino/dto/function/function_dto.py
b/clients/client-python/gravitino/dto/function/function_dto.py
new file mode 100644
index 0000000000..20c76ac8bb
--- /dev/null
+++ b/clients/client-python/gravitino/dto/function/function_dto.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import List, Optional
+
+from dataclasses_json import DataClassJsonMixin, config
+
+from gravitino.api.function.function import Function
+from gravitino.api.function.function_definition import FunctionDefinition
+from gravitino.api.function.function_type import FunctionType
+from gravitino.dto.audit_dto import AuditDTO
+from gravitino.dto.function.function_definition_dto import
FunctionDefinitionDTO
+
+
+def _encode_function_type(func_type: FunctionType) -> str:
+ """Encode FunctionType to string."""
+ if func_type is None:
+ return None
+ return func_type.value
+
+
+def _decode_function_type(func_type_str: str) -> FunctionType:
+ """Decode string to FunctionType."""
+ return FunctionType.from_string(func_type_str)
+
+
+@dataclass
+class FunctionDTO(Function, DataClassJsonMixin):
+ """Represents a Function DTO (Data Transfer Object)."""
+
+ _name: str = field(metadata=config(field_name="name"))
+ _function_type: FunctionType = field(
+ metadata=config(
+ field_name="functionType",
+ encoder=_encode_function_type,
+ decoder=_decode_function_type,
+ )
+ )
+ _deterministic: bool = field(metadata=config(field_name="deterministic"))
+ _comment: Optional[str] = field(default=None,
metadata=config(field_name="comment"))
+ _definitions: Optional[List[FunctionDefinitionDTO]] = field(
+ default=None, metadata=config(field_name="definitions")
+ )
+ _audit: Optional[AuditDTO] = field(
+ default=None, metadata=config(field_name="audit")
+ )
+
+ def __post_init__(self):
+ if self._function_type is None:
+ raise ValueError("Function type cannot be null or empty")
+
+ def name(self) -> str:
+ """Returns the function name."""
+ return self._name
+
+ def function_type(self) -> FunctionType:
+ """Returns the function type."""
+ return self._function_type
+
+ def deterministic(self) -> bool:
+ """Returns whether the function is deterministic."""
+ return self._deterministic
+
+ def comment(self) -> Optional[str]:
+ """Returns the optional comment of the function."""
+ return self._comment
+
+ def definitions(self) -> Optional[List[FunctionDefinition]]:
+ """Returns the definitions of the function."""
+ if self._definitions is None:
+ return None
+ return list(self._definitions)
+
+ def audit_info(self) -> Optional[AuditDTO]:
+ """Returns the audit information."""
+ return self._audit
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, FunctionDTO):
+ return False
+ return (
+ self._name == other._name
+ and self._function_type == other._function_type
+ and self._deterministic == other._deterministic
+ and self._comment == other._comment
+ and self._definitions == other._definitions
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ self._name,
+ self._function_type,
+ self._deterministic,
+ self._comment,
+ )
+ )
diff --git a/clients/client-python/gravitino/dto/function/function_impl_dto.py
b/clients/client-python/gravitino/dto/function/function_impl_dto.py
new file mode 100644
index 0000000000..110e38cd8a
--- /dev/null
+++ b/clients/client-python/gravitino/dto/function/function_impl_dto.py
@@ -0,0 +1,271 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+from typing import Dict, Optional
+
+from dataclasses_json import DataClassJsonMixin, config
+
+from gravitino.api.function.function_impl import FunctionImpl
+from gravitino.api.function.java_impl import JavaImpl
+from gravitino.api.function.python_impl import PythonImpl
+from gravitino.api.function.sql_impl import SQLImpl
+from gravitino.dto.function.function_resources_dto import FunctionResourcesDTO
+
+
+class FunctionImplDTO(DataClassJsonMixin, ABC):
+ """Abstract DTO for function implementation."""
+
+ @abstractmethod
+ def language(self) -> FunctionImpl.Language:
+ """Returns the language of this implementation."""
+ pass
+
+ @abstractmethod
+ def to_function_impl(self) -> FunctionImpl:
+ """Convert this DTO to a FunctionImpl instance."""
+ pass
+
+
+@dataclass
+class SQLImplDTO(FunctionImplDTO):
+ """DTO for SQL function implementation."""
+
+ _runtime: str = field(metadata=config(field_name="runtime"))
+ _sql: str = field(metadata=config(field_name="sql"))
+ _resources: Optional[FunctionResourcesDTO] = field(
+ default=None, metadata=config(field_name="resources")
+ )
+ _properties: Optional[Dict[str, str]] = field(
+ default=None, metadata=config(field_name="properties")
+ )
+
+ def language(self) -> FunctionImpl.Language:
+ return FunctionImpl.Language.SQL
+
+ def runtime(self) -> str:
+ return self._runtime
+
+ def sql(self) -> str:
+ return self._sql
+
+ def resources(self) -> Optional[FunctionResourcesDTO]:
+ return self._resources
+
+ def properties(self) -> Optional[Dict[str, str]]:
+ return self._properties
+
+ def to_function_impl(self) -> FunctionImpl:
+ return SQLImpl(
+ runtime=FunctionImpl.RuntimeType.from_string(self._runtime),
+ sql=self._sql,
+ resources=(
+ self._resources.to_function_resources() if self._resources
else None
+ ),
+ properties=self._properties,
+ )
+
+ @classmethod
+ def from_function_impl(cls, impl) -> "SQLImplDTO":
+ """Create a SQLImplDTO from a SQLImpl instance."""
+ return cls(
+ _runtime=impl.runtime().name,
+ _sql=impl.sql(),
+
_resources=FunctionResourcesDTO.from_function_resources(impl.resources()),
+ _properties=impl.properties() if impl.properties() else None,
+ )
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, SQLImplDTO):
+ return False
+ return (
+ self._runtime == other._runtime
+ and self._sql == other._sql
+ and self._resources == other._resources
+ and self._properties == other._properties
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ self._runtime,
+ self._sql,
+ self._resources,
+ frozenset(self._properties.items()) if self._properties else
None,
+ )
+ )
+
+
+@dataclass
+class JavaImplDTO(FunctionImplDTO):
+ """DTO for Java function implementation."""
+
+ _runtime: str = field(metadata=config(field_name="runtime"))
+ _class_name: str = field(metadata=config(field_name="className"))
+ _resources: Optional[FunctionResourcesDTO] = field(
+ default=None, metadata=config(field_name="resources")
+ )
+ _properties: Optional[Dict[str, str]] = field(
+ default=None, metadata=config(field_name="properties")
+ )
+
+ def language(self) -> FunctionImpl.Language:
+ return FunctionImpl.Language.JAVA
+
+ def runtime(self) -> str:
+ return self._runtime
+
+ def class_name(self) -> str:
+ return self._class_name
+
+ def resources(self) -> Optional[FunctionResourcesDTO]:
+ return self._resources
+
+ def properties(self) -> Optional[Dict[str, str]]:
+ return self._properties
+
+ def to_function_impl(self) -> FunctionImpl:
+ return JavaImpl(
+ runtime=FunctionImpl.RuntimeType.from_string(self._runtime),
+ class_name=self._class_name,
+ resources=(
+ self._resources.to_function_resources() if self._resources
else None
+ ),
+ properties=self._properties,
+ )
+
+ @classmethod
+ def from_function_impl(cls, impl) -> "JavaImplDTO":
+ """Create a JavaImplDTO from a JavaImpl instance."""
+ return cls(
+ _runtime=impl.runtime().name,
+ _class_name=impl.class_name(),
+
_resources=FunctionResourcesDTO.from_function_resources(impl.resources()),
+ _properties=impl.properties() if impl.properties() else None,
+ )
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, JavaImplDTO):
+ return False
+ return (
+ self._runtime == other._runtime
+ and self._class_name == other._class_name
+ and self._resources == other._resources
+ and self._properties == other._properties
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ self._runtime,
+ self._class_name,
+ self._resources,
+ frozenset(self._properties.items()) if self._properties else
None,
+ )
+ )
+
+
+@dataclass
+class PythonImplDTO(FunctionImplDTO):
+ """DTO for Python function implementation."""
+
+ _runtime: str = field(metadata=config(field_name="runtime"))
+ _handler: str = field(metadata=config(field_name="handler"))
+ _code_block: Optional[str] = field(
+ default=None, metadata=config(field_name="codeBlock")
+ )
+ _resources: Optional[FunctionResourcesDTO] = field(
+ default=None, metadata=config(field_name="resources")
+ )
+ _properties: Optional[Dict[str, str]] = field(
+ default=None, metadata=config(field_name="properties")
+ )
+
+ def language(self) -> FunctionImpl.Language:
+ return FunctionImpl.Language.PYTHON
+
+ def runtime(self) -> str:
+ return self._runtime
+
+ def handler(self) -> str:
+ return self._handler
+
+ def code_block(self) -> Optional[str]:
+ return self._code_block
+
+ def resources(self) -> Optional[FunctionResourcesDTO]:
+ return self._resources
+
+ def properties(self) -> Optional[Dict[str, str]]:
+ return self._properties
+
+ def to_function_impl(self) -> FunctionImpl:
+ return PythonImpl(
+ runtime=FunctionImpl.RuntimeType.from_string(self._runtime),
+ handler=self._handler,
+ code_block=self._code_block,
+ resources=(
+ self._resources.to_function_resources() if self._resources
else None
+ ),
+ properties=self._properties,
+ )
+
+ @classmethod
+ def from_function_impl(cls, impl) -> "PythonImplDTO":
+ """Create a PythonImplDTO from a PythonImpl instance."""
+ return cls(
+ _runtime=impl.runtime().name,
+ _handler=impl.handler(),
+ _code_block=impl.code_block(),
+
_resources=FunctionResourcesDTO.from_function_resources(impl.resources()),
+ _properties=impl.properties() if impl.properties() else None,
+ )
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, PythonImplDTO):
+ return False
+ return (
+ self._runtime == other._runtime
+ and self._handler == other._handler
+ and self._code_block == other._code_block
+ and self._resources == other._resources
+ and self._properties == other._properties
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ self._runtime,
+ self._handler,
+ self._code_block,
+ self._resources,
+ frozenset(self._properties.items()) if self._properties else
None,
+ )
+ )
+
+
+def function_impl_dto_from_function_impl(impl: FunctionImpl) ->
FunctionImplDTO:
+ """Create a FunctionImplDTO from a FunctionImpl instance."""
+ if isinstance(impl, SQLImpl):
+ return SQLImplDTO.from_function_impl(impl)
+ if isinstance(impl, JavaImpl):
+ return JavaImplDTO.from_function_impl(impl)
+ if isinstance(impl, PythonImpl):
+ return PythonImplDTO.from_function_impl(impl)
+
+ raise ValueError(f"Unsupported implementation type: {type(impl)}")
diff --git a/clients/client-python/gravitino/dto/function/function_param_dto.py
b/clients/client-python/gravitino/dto/function/function_param_dto.py
new file mode 100644
index 0000000000..c56b902145
--- /dev/null
+++ b/clients/client-python/gravitino/dto/function/function_param_dto.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import Any, Dict, Optional
+
+from dataclasses_json import DataClassJsonMixin, config
+
+from gravitino.api.function.function_param import FunctionParam, FunctionParams
+from gravitino.api.rel.expressions.expression import Expression
+from gravitino.api.rel.types.json_serdes.type_serdes import TypeSerdes
+from gravitino.api.rel.types.type import Type
+from gravitino.dto.rel.expressions.function_arg import FunctionArg
+from gravitino.dto.rel.expressions.json_serdes._helper.serdes_utils import (
+ SerdesUtils as ExpressionSerdesUtils,
+)
+from gravitino.dto.util.dto_converters import DTOConverters
+
+
+def _encode_default_value(
+ default_value: Optional[Expression],
+) -> Optional[Dict[str, Any]]:
+ if default_value is None:
+ return None
+ return ExpressionSerdesUtils.write_function_arg(
+ DTOConverters.to_function_arg(default_value)
+ )
+
+
+def _decode_default_value(
+ default_value_dict: Optional[Dict[str, Any]]
+) -> Optional[FunctionArg]:
+ if default_value_dict is None:
+ return None
+ return ExpressionSerdesUtils.read_function_arg(default_value_dict)
+
+
+@dataclass
+class FunctionParamDTO(FunctionParam, DataClassJsonMixin):
+ """DTO for function parameter."""
+
+ _name: str = field(metadata=config(field_name="name"))
+ _data_type: Type = field(
+ metadata=config(
+ field_name="dataType",
+ encoder=TypeSerdes.serialize,
+ decoder=TypeSerdes.deserialize,
+ )
+ )
+ _comment: Optional[str] = field(default=None,
metadata=config(field_name="comment"))
+ _default_value: Optional[FunctionArg] = field(
+ default=None,
+ metadata=config(
+ field_name="defaultValue",
+ encoder=_encode_default_value,
+ decoder=_decode_default_value,
+ ),
+ )
+
+ def name(self) -> str:
+ """Returns the parameter name."""
+ return self._name
+
+ def data_type(self) -> Type:
+ """Returns the parameter data type."""
+ return self._data_type
+
+ def comment(self) -> Optional[str]:
+ """Returns the optional parameter comment."""
+ return self._comment
+
+ def default_value(self) -> Optional[Expression]:
+ """Returns the default value of the parameter if provided, otherwise
None."""
+ if self._default_value is None:
+ return None
+ return DTOConverters.from_function_arg(self._default_value)
+
+ def to_function_param(self) -> FunctionParam:
+ """Convert this DTO to a FunctionParam instance."""
+ return FunctionParams.of(
+ self._name, self._data_type, self._comment, self.default_value()
+ )
+
+ @classmethod
+ def from_function_param(cls, param: FunctionParam) -> "FunctionParamDTO":
+ """Create a FunctionParamDTO from a FunctionParam instance."""
+ default_value = param.default_value()
+ return cls(
+ _name=param.name(),
+ _data_type=param.data_type(),
+ _comment=param.comment(),
+ _default_value=(
+ None
+ if default_value is None
+ else DTOConverters.to_function_arg(default_value)
+ ),
+ )
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, FunctionParamDTO):
+ return False
+ return (
+ self._name == other._name
+ and self._data_type == other._data_type
+ and self._comment == other._comment
+ and self._default_value == other._default_value
+ )
+
+ def __hash__(self) -> int:
+ return hash((self._name, self._data_type, self._comment,
self._default_value))
diff --git
a/clients/client-python/gravitino/dto/function/function_resources_dto.py
b/clients/client-python/gravitino/dto/function/function_resources_dto.py
new file mode 100644
index 0000000000..b64268cb31
--- /dev/null
+++ b/clients/client-python/gravitino/dto/function/function_resources_dto.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import List, Optional
+
+from dataclasses_json import DataClassJsonMixin, config
+from gravitino.api.function.function_resources import FunctionResources
+
+
+@dataclass
+class FunctionResourcesDTO(DataClassJsonMixin):
+ """DTO for function resources."""
+
+ _jars: Optional[List[str]] = field(default=None,
metadata=config(field_name="jars"))
+ _files: Optional[List[str]] = field(
+ default=None, metadata=config(field_name="files")
+ )
+ _archives: Optional[List[str]] = field(
+ default=None, metadata=config(field_name="archives")
+ )
+
+ def jars(self) -> List[str]:
+ """Returns the jar resources."""
+ return list(self._jars) if self._jars else []
+
+ def files(self) -> List[str]:
+ """Returns the file resources."""
+ return list(self._files) if self._files else []
+
+ def archives(self) -> List[str]:
+ """Returns the archive resources."""
+ return list(self._archives) if self._archives else []
+
+ def to_function_resources(self):
+ """Convert this DTO to a FunctionResources instance."""
+ return FunctionResources.of(self._jars, self._files, self._archives)
+
+ @classmethod
+ def from_function_resources(
+ cls, resources: Optional[FunctionResources]
+ ) -> Optional["FunctionResourcesDTO"]:
+ """Create a FunctionResourcesDTO from a FunctionResources instance."""
+ if resources is None:
+ return None
+ return cls(
+ _jars=resources.jars() if resources.jars() else None,
+ _files=resources.files() if resources.files() else None,
+ _archives=resources.archives() if resources.archives() else None,
+ )
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, FunctionResourcesDTO):
+ return False
+ return (
+ self._jars == other._jars
+ and self._files == other._files
+ and self._archives == other._archives
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ tuple(self._jars) if self._jars else None,
+ tuple(self._files) if self._files else None,
+ tuple(self._archives) if self._archives else None,
+ )
+ )
diff --git
a/clients/client-python/gravitino/dto/requests/function_register_request.py
b/clients/client-python/gravitino/dto/requests/function_register_request.py
new file mode 100644
index 0000000000..732f6bc949
--- /dev/null
+++ b/clients/client-python/gravitino/dto/requests/function_register_request.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import List, Optional
+
+from dataclasses_json import config
+
+from gravitino.api.function.function_type import FunctionType
+from gravitino.dto.function.function_definition_dto import
FunctionDefinitionDTO
+from gravitino.dto.function.function_dto import (
+ _decode_function_type,
+ _encode_function_type,
+)
+from gravitino.exceptions.base import IllegalArgumentException
+from gravitino.rest.rest_message import RESTRequest
+
+
+@dataclass
+class FunctionRegisterRequest(RESTRequest):
+ """Represents a request to register a function."""
+
+ _name: str = field(metadata=config(field_name="name"))
+ _function_type: FunctionType = field(
+ metadata=config(
+ field_name="functionType",
+ encoder=_encode_function_type,
+ decoder=_decode_function_type,
+ )
+ )
+ _deterministic: bool = field(metadata=config(field_name="deterministic"))
+ _definitions: List[FunctionDefinitionDTO] = field(
+ metadata=config(field_name="definitions")
+ )
+ _comment: Optional[str] = field(default=None,
metadata=config(field_name="comment"))
+
+ def __init__(
+ self,
+ name: str,
+ function_type: FunctionType,
+ deterministic: bool,
+ definitions: List[FunctionDefinitionDTO],
+ comment: Optional[str] = None,
+ ):
+ self._name = name
+ self._function_type = function_type
+ self._deterministic = deterministic
+ self._definitions = definitions
+ self._comment = comment
+
+ def validate(self):
+ """Validates the request.
+
+ Raises:
+ IllegalArgumentException: If the request is invalid.
+ """
+ if not self._name:
+ raise IllegalArgumentException(
+ "'name' field is required and cannot be empty"
+ )
+ if self._function_type is None:
+ raise IllegalArgumentException("'functionType' field is required")
+ if not self._definitions:
+ raise IllegalArgumentException(
+ "'definitions' field is required and cannot be empty"
+ )
+
+ # Validate each definition has appropriate return type/columns based
on function type
+ for definition in self._definitions:
+ if self._function_type == FunctionType.TABLE:
+ return_columns = definition.return_columns()
+ if not return_columns:
+ raise IllegalArgumentException(
+ "'returnColumns' is required in each definition for
TABLE function type"
+ )
+ else:
+ if definition.return_type() is None:
+ raise IllegalArgumentException(
+ "'returnType' is required in each definition for
SCALAR or AGGREGATE function type"
+ )
diff --git
a/clients/client-python/gravitino/dto/requests/function_update_request.py
b/clients/client-python/gravitino/dto/requests/function_update_request.py
new file mode 100644
index 0000000000..7318262ab2
--- /dev/null
+++ b/clients/client-python/gravitino/dto/requests/function_update_request.py
@@ -0,0 +1,232 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+from typing import List
+
+from dataclasses_json import DataClassJsonMixin, config
+
+from gravitino.api.function.function_change import FunctionChange
+from gravitino.api.function.function_impl import FunctionImpl
+from gravitino.dto.function.function_definition_dto import
FunctionDefinitionDTO
+from gravitino.dto.function.function_impl_dto import FunctionImplDTO
+from gravitino.dto.function.function_param_dto import FunctionParamDTO
+from gravitino.exceptions.base import IllegalArgumentException
+from gravitino.rest.rest_message import RESTRequest
+
+
+@dataclass
+class FunctionUpdateRequest(RESTRequest, ABC):
+ """Abstract base class for function update requests."""
+
+ _type: str = field(metadata=config(field_name="@type"))
+
+ def __init__(self, action_type: str):
+ self._type = action_type
+
+ @abstractmethod
+ def function_change(self) -> FunctionChange:
+ """Returns the function change."""
+ pass
+
+
+@dataclass
+class UpdateCommentRequest(FunctionUpdateRequest, DataClassJsonMixin):
+ """Request to update the comment of a function."""
+
+ _new_comment: str = field(metadata=config(field_name="newComment"))
+
+ def __init__(self, new_comment: str):
+ super().__init__("updateComment")
+ self._new_comment = new_comment
+
+ def function_change(self) -> FunctionChange:
+ return FunctionChange.update_comment(self._new_comment)
+
+ def validate(self):
+ # newComment can be null or empty to clear the comment
+ return
+
+
+@dataclass
+class AddDefinitionRequest(FunctionUpdateRequest, DataClassJsonMixin):
+ """Request to add a definition to a function."""
+
+ _definition: FunctionDefinitionDTO =
field(metadata=config(field_name="definition"))
+
+ def __init__(self, definition: FunctionDefinitionDTO):
+ super().__init__("addDefinition")
+ self._definition = definition
+
+ def function_change(self) -> FunctionChange:
+ return
FunctionChange.add_definition(self._definition.to_function_definition())
+
+ def validate(self):
+ if self._definition is None:
+ raise IllegalArgumentException(
+ "'definition' field is required and cannot be null"
+ )
+
+
+@dataclass
+class RemoveDefinitionRequest(FunctionUpdateRequest, DataClassJsonMixin):
+ """Request to remove a definition from a function."""
+
+ _parameters: List[FunctionParamDTO] = field(
+ metadata=config(field_name="parameters")
+ )
+
+ def __init__(self, parameters: List[FunctionParamDTO]):
+ super().__init__("removeDefinition")
+ self._parameters = parameters
+
+ def function_change(self) -> FunctionChange:
+ params = (
+ [p.to_function_param() for p in self._parameters]
+ if self._parameters
+ else []
+ )
+ return FunctionChange.remove_definition(params)
+
+ def validate(self):
+ if self._parameters is None:
+ raise IllegalArgumentException(
+ "'parameters' field is required and cannot be null"
+ )
+
+
+@dataclass
+class AddImplRequest(FunctionUpdateRequest, DataClassJsonMixin):
+ """Request to add an implementation to a definition."""
+
+ _parameters: List[FunctionParamDTO] = field(
+ metadata=config(field_name="parameters")
+ )
+ _implementation: FunctionImplDTO = field(
+ metadata=config(field_name="implementation")
+ )
+
+ def __init__(
+ self, parameters: List[FunctionParamDTO], implementation:
FunctionImplDTO
+ ):
+ super().__init__("addImpl")
+ self._parameters = parameters
+ self._implementation = implementation
+
+ def function_change(self) -> FunctionChange:
+ params = (
+ [p.to_function_param() for p in self._parameters]
+ if self._parameters
+ else []
+ )
+ return FunctionChange.add_impl(params,
self._implementation.to_function_impl())
+
+ def validate(self):
+ if self._parameters is None:
+ raise IllegalArgumentException(
+ "'parameters' field is required and cannot be null"
+ )
+ if self._implementation is None:
+ raise IllegalArgumentException(
+ "'implementation' field is required and cannot be null"
+ )
+
+
+@dataclass
+class UpdateImplRequest(FunctionUpdateRequest, DataClassJsonMixin):
+ """Request to update an implementation in a definition."""
+
+ _parameters: List[FunctionParamDTO] = field(
+ metadata=config(field_name="parameters")
+ )
+ _runtime: str = field(metadata=config(field_name="runtime"))
+ _implementation: FunctionImplDTO = field(
+ metadata=config(field_name="implementation")
+ )
+
+ def __init__(
+ self,
+ parameters: List[FunctionParamDTO],
+ runtime: str,
+ implementation: FunctionImplDTO,
+ ):
+ super().__init__("updateImpl")
+ self._parameters = parameters
+ self._runtime = runtime
+ self._implementation = implementation
+
+ def function_change(self) -> FunctionChange:
+
+ params = (
+ [p.to_function_param() for p in self._parameters]
+ if self._parameters
+ else []
+ )
+ runtime_type = FunctionImpl.RuntimeType.from_string(self._runtime)
+ return FunctionChange.update_impl(
+ params, runtime_type, self._implementation.to_function_impl()
+ )
+
+ def validate(self):
+ if self._parameters is None:
+ raise IllegalArgumentException(
+ "'parameters' field is required and cannot be null"
+ )
+ if self._runtime is None:
+ raise IllegalArgumentException(
+ "'runtime' field is required and cannot be null"
+ )
+ if self._implementation is None:
+ raise IllegalArgumentException(
+ "'implementation' field is required and cannot be null"
+ )
+
+
+@dataclass
+class RemoveImplRequest(FunctionUpdateRequest, DataClassJsonMixin):
+ """Request to remove an implementation from a definition."""
+
+ _parameters: List[FunctionParamDTO] = field(
+ metadata=config(field_name="parameters")
+ )
+ _runtime: str = field(metadata=config(field_name="runtime"))
+
+ def __init__(self, parameters: List[FunctionParamDTO], runtime: str):
+ super().__init__("removeImpl")
+ self._parameters = parameters
+ self._runtime = runtime
+
+ def function_change(self) -> FunctionChange:
+
+ params = (
+ [p.to_function_param() for p in self._parameters]
+ if self._parameters
+ else []
+ )
+ runtime_type = FunctionImpl.RuntimeType.from_string(self._runtime)
+ return FunctionChange.remove_impl(params, runtime_type)
+
+ def validate(self):
+ if self._parameters is None:
+ raise IllegalArgumentException(
+ "'parameters' field is required and cannot be null"
+ )
+ if self._runtime is None:
+ raise IllegalArgumentException(
+ "'runtime' field is required and cannot be null"
+ )
diff --git
a/clients/client-python/gravitino/dto/requests/function_updates_request.py
b/clients/client-python/gravitino/dto/requests/function_updates_request.py
new file mode 100644
index 0000000000..3d2032a341
--- /dev/null
+++ b/clients/client-python/gravitino/dto/requests/function_updates_request.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import List
+
+from dataclasses_json import config
+
+from gravitino.dto.requests.function_update_request import
FunctionUpdateRequest
+from gravitino.rest.rest_message import RESTRequest
+
+
+@dataclass
+class FunctionUpdatesRequest(RESTRequest):
+ """Represents a request with multiple function updates."""
+
+ _updates: List[FunctionUpdateRequest] =
field(metadata=config(field_name="updates"))
+
+ def __init__(self, updates: List[FunctionUpdateRequest]):
+ self._updates = updates
+
+ def validate(self):
+ """Validates the request.
+
+ Raises:
+ IllegalArgumentException: If the request is invalid.
+ """
+ if self._updates is None:
+ raise ValueError("Updates cannot be null")
+ for update in self._updates:
+ update.validate()
+
+ def updates(self) -> List[FunctionUpdateRequest]:
+ """Returns the list of updates."""
+ return self._updates
diff --git
a/clients/client-python/gravitino/dto/responses/function_list_response.py
b/clients/client-python/gravitino/dto/responses/function_list_response.py
new file mode 100644
index 0000000000..4ff9c8d68c
--- /dev/null
+++ b/clients/client-python/gravitino/dto/responses/function_list_response.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import List
+
+from dataclasses_json import config
+
+from gravitino.dto.function.function_dto import FunctionDTO
+from gravitino.dto.responses.base_response import BaseResponse
+from gravitino.exceptions.base import IllegalArgumentException
+
+
+@dataclass
+class FunctionListResponse(BaseResponse):
+ """Response wrapper for multiple functions."""
+
+ _functions: List[FunctionDTO] =
field(metadata=config(field_name="functions"))
+
+ def functions(self) -> List[FunctionDTO]:
+ """Returns the list of functions."""
+ return self._functions
+
+ def validate(self):
+ """Validates the response data.
+
+ Raises:
+ IllegalArgumentException: If functions is null.
+ """
+ super().validate()
+
+ if self._functions is None:
+ raise IllegalArgumentException("functions must not be null")
diff --git a/clients/client-python/gravitino/dto/responses/function_response.py
b/clients/client-python/gravitino/dto/responses/function_response.py
new file mode 100644
index 0000000000..3a2ed2964c
--- /dev/null
+++ b/clients/client-python/gravitino/dto/responses/function_response.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+
+from dataclasses_json import config
+
+from gravitino.dto.function.function_dto import FunctionDTO
+from gravitino.dto.responses.base_response import BaseResponse
+from gravitino.exceptions.base import IllegalArgumentException
+
+
+@dataclass
+class FunctionResponse(BaseResponse):
+ """Response object for function-related operations."""
+
+ _function: FunctionDTO = field(metadata=config(field_name="function"))
+
+ def function(self) -> FunctionDTO:
+ """Returns the function DTO object."""
+ return self._function
+
+ def validate(self):
+ """Validates the response data.
+
+ Raises:
+ IllegalArgumentException: If function identifiers are not set.
+ """
+ super().validate()
+
+ if self._function is None:
+ raise IllegalArgumentException("function must not be null")
+ if not self._function.name():
+ raise IllegalArgumentException("function 'name' must not be null
or empty")
+ if self._function.function_type() is None:
+ raise IllegalArgumentException("function 'functionType' must not
be null")
+ if self._function.definitions() is None:
+ raise IllegalArgumentException("function 'definitions' must not be
null")
diff --git
a/clients/client-python/gravitino/exceptions/handlers/function_error_handler.py
b/clients/client-python/gravitino/exceptions/handlers/function_error_handler.py
new file mode 100644
index 0000000000..8e69f8b862
--- /dev/null
+++
b/clients/client-python/gravitino/exceptions/handlers/function_error_handler.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from gravitino.constants.error import ErrorConstants
+from gravitino.dto.responses.error_response import ErrorResponse
+from gravitino.exceptions.base import (
+ AlreadyExistsException,
+ CatalogNotInUseException,
+ FunctionAlreadyExistsException,
+ MetalakeNotInUseException,
+ NoSuchFunctionException,
+ NoSuchSchemaException,
+ NotFoundException,
+ NotInUseException,
+)
+from gravitino.exceptions.handlers.rest_error_handler import RestErrorHandler
+
+
+class FunctionErrorHandler(RestErrorHandler):
+ """Error handler for function-related operations."""
+
+ def handle(self, error_response: ErrorResponse):
+ error_message = error_response.format_error_message()
+ code = error_response.code()
+ exception_type = error_response.type()
+
+ if code == ErrorConstants.NOT_FOUND_CODE:
+ if exception_type == NoSuchSchemaException.__name__:
+ raise NoSuchSchemaException(error_message)
+ if exception_type == NoSuchFunctionException.__name__:
+ raise NoSuchFunctionException(error_message)
+
+ raise NotFoundException(error_message)
+
+ if code == ErrorConstants.ALREADY_EXISTS_CODE:
+ if exception_type == FunctionAlreadyExistsException.__name__:
+ raise FunctionAlreadyExistsException(error_message)
+
+ raise AlreadyExistsException(error_message)
+
+ if code == ErrorConstants.NOT_IN_USE_CODE:
+ if exception_type == CatalogNotInUseException.__name__:
+ raise CatalogNotInUseException(error_message)
+ if exception_type == MetalakeNotInUseException.__name__:
+ raise MetalakeNotInUseException(error_message)
+
+ raise NotInUseException(error_message)
+
+ super().handle(error_response)
+
+
+FUNCTION_ERROR_HANDLER = FunctionErrorHandler()
diff --git
a/clients/client-python/tests/unittests/dto/responses/test_responses.py
b/clients/client-python/tests/unittests/dto/responses/test_responses.py
index 9b807db384..4b6930b69c 100644
--- a/clients/client-python/tests/unittests/dto/responses/test_responses.py
+++ b/clients/client-python/tests/unittests/dto/responses/test_responses.py
@@ -29,6 +29,8 @@ from gravitino.dto.responses.model_response import
ModelResponse
from gravitino.dto.responses.model_version_list_response import
ModelVersionListResponse
from gravitino.dto.responses.model_version_response import ModelVersionResponse
from gravitino.dto.responses.model_version_uri_response import
ModelVersionUriResponse
+from gravitino.dto.responses.function_list_response import FunctionListResponse
+from gravitino.dto.responses.function_response import FunctionResponse
from gravitino.dto.responses.partition_list_response import
PartitionListResponse
from gravitino.dto.responses.partition_name_list_response import (
PartitionNameListResponse,
@@ -502,3 +504,78 @@ class TestResponses(unittest.TestCase):
"""
resp: TableResponse = TableResponse.from_json(json_string)
resp.validate()
+
+ def test_function_response(self):
+ """Test FunctionResponse."""
+ json_str = """
+ {
+ "code": 0,
+ "function": {
+ "name": "func1",
+ "functionType": "SCALAR",
+ "deterministic": true,
+ "definitions": [
+ {
+ "parameters": [],
+ "returnType": "integer",
+ "impls": [
+ {
+ "language": "SQL",
+ "runtime": "SPARK",
+ "sql": "SELECT 1"
+ }
+ ]
+ }
+ ],
+ "comment": "comment",
+ "audit": {
+ "creator": "anonymous",
+ "createTime": "2024-04-05T10:10:35.218Z"
+ }
+ }
+ }
+ """
+ resp = FunctionResponse.from_json(json_str)
+ resp.validate()
+ self.assertEqual("func1", resp.function().name())
+ self.assertEqual("SCALAR", resp.function().function_type().name)
+
+ with self.assertRaises(IllegalArgumentException):
+ FunctionResponse.from_json('{"code": 0, "function":
null}').validate()
+
+ def test_function_list_response(self):
+ """Test FunctionListResponse."""
+ json_str = """
+ {
+ "code": 0,
+ "functions": [
+ {
+ "name": "func1",
+ "functionType": "SCALAR",
+ "deterministic": true,
+ "definitions": [
+ {
+ "parameters": [],
+ "returnType": "integer",
+ "impls": [
+ {
+ "language": "SQL",
+ "runtime": "SPARK",
+ "sql": "SELECT 1"
+ }
+ ]
+ }
+ ],
+ "comment": "comment",
+ "audit": {
+ "creator": "anonymous",
+ "createTime": "2024-04-05T10:10:35.218Z"
+ }
+ }
+ ]
+ }
+ """
+ resp = FunctionListResponse.from_json(json_str)
+ resp.validate()
+ self.assertEqual(1, len(resp.functions()))
+ self.assertEqual("func1", resp.functions()[0].name())
diff --git a/clients/client-python/tests/unittests/dto/test_function_dto.py
b/clients/client-python/tests/unittests/dto/test_function_dto.py
new file mode 100644
index 0000000000..7df4086096
--- /dev/null
+++ b/clients/client-python/tests/unittests/dto/test_function_dto.py
@@ -0,0 +1,197 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License a
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Tests for Function DTOs."""
+
+import unittest
+
+from gravitino.api.function.function_type import FunctionType
+from gravitino.api.rel.types.types import Types
+from gravitino.dto.audit_dto import AuditDTO
+from gravitino.dto.function.function_column_dto import FunctionColumnDTO
+from gravitino.dto.function.function_definition_dto import
FunctionDefinitionDTO
+from gravitino.dto.function.function_dto import FunctionDTO
+from gravitino.dto.function.function_impl_dto import (
+ SQLImplDTO,
+ PythonImplDTO,
+ JavaImplDTO,
+)
+from gravitino.dto.function.function_param_dto import FunctionParamDTO
+from gravitino.dto.function.function_resources_dto import FunctionResourcesDTO
+from gravitino.dto.rel.expressions.literal_dto import LiteralDTO
+
+
+class TestFunctionDTO(unittest.TestCase):
+ """Tests for Function DTOs."""
+
+ def test_function_dto(self):
+ """Test FunctionDTO serialization and deserialization."""
+ audit = AuditDTO(
+ "creator", "2022-01-01T00:00:00Z", "modifier",
"2022-01-01T00:00:00Z"
+ )
+ params = [FunctionParamDTO(_name="param1",
_data_type=Types.IntegerType.get())]
+ impl = SQLImplDTO(
+ _runtime="SPARK",
+ _sql="SELECT param1 + 1",
+ _resources=None,
+ _properties={},
+ )
+ definition = FunctionDefinitionDTO(
+ _parameters=params, _return_type=Types.IntegerType.get(),
_impls=[impl]
+ )
+
+ function_dto = FunctionDTO(
+ _name="func1",
+ _definitions=[definition],
+ _function_type=FunctionType.SCALAR,
+ _deterministic=True,
+ _comment="comment",
+ _audit=audit,
+ )
+
+ json_str = function_dto.to_json()
+ deserialized = FunctionDTO.from_json(json_str)
+ self.assertEqual(function_dto.name(), deserialized.name())
+ self.assertEqual(function_dto.comment(), deserialized.comment())
+ self.assertEqual(
+ function_dto.audit_info().creator(),
deserialized.audit_info().creator()
+ )
+ self.assertEqual(
+ function_dto.definitions()[0].impls()[0].sql(),
+ deserialized.definitions()[0].impls()[0].sql(),
+ )
+ self.assertEqual(
+ function_dto.definitions()[0].return_type(),
+ deserialized.definitions()[0].return_type(),
+ )
+
+ def test_function_definition_dto(self):
+ """Test FunctionDefinitionDTO serialization and deserialization."""
+ params = [FunctionParamDTO(_name="param1",
_data_type=Types.IntegerType.get())]
+ impl = SQLImplDTO(
+ _runtime="SPARK",
+ _sql="SELECT param1 + 1",
+ _resources=None,
+ _properties={},
+ )
+ definition = FunctionDefinitionDTO(
+ _parameters=params, _return_type=Types.IntegerType.get(),
_impls=[impl]
+ )
+
+ json_str = definition.to_json()
+ deserialized = FunctionDefinitionDTO.from_json(json_str)
+ self.assertEqual(
+ definition.parameters()[0].name(),
deserialized.parameters()[0].name()
+ )
+ self.assertEqual(definition.impls()[0].sql(),
deserialized.impls()[0].sql())
+ self.assertEqual(definition.return_type(), deserialized.return_type())
+
+ def test_function_param_dto_default_value(self):
+ """Test FunctionParamDTO supports defaultValue serdes."""
+ default_value = (
+ LiteralDTO.builder()
+ .with_data_type(Types.IntegerType.get())
+ .with_value("0")
+ .build()
+ )
+ param = FunctionParamDTO(
+ _name="x",
+ _data_type=Types.IntegerType.get(),
+ _comment="comment",
+ _default_value=default_value,
+ )
+
+ json_str = param.to_json()
+ deserialized = FunctionParamDTO.from_json(json_str)
+ self.assertEqual(param, deserialized)
+ self.assertIsNotNone(deserialized.default_value())
+
+ def test_function_definition_hash_with_impls(self):
+ """Test FunctionDefinitionDTO hash works when impl list is not
empty."""
+ definition = FunctionDefinitionDTO(
+ _parameters=[
+ FunctionParamDTO(_name="param1",
_data_type=Types.IntegerType.get())
+ ],
+ _return_type=Types.IntegerType.get(),
+ _impls=[
+ SQLImplDTO(
+ _runtime="SPARK",
+ _sql="SELECT param1 + 1",
+ _resources=None,
+ _properties={},
+ )
+ ],
+ )
+
+ self.assertIsInstance(hash(definition), int)
+
+ def test_function_impl_dto(self):
+ """Test FunctionImplDTO serialization and deserialization."""
+ sql_impl = SQLImplDTO(
+ _runtime="SPARK",
+ _sql="SELECT 1",
+ _resources=None,
+ _properties={},
+ )
+ json_str = sql_impl.to_json()
+ deserialized = SQLImplDTO.from_json(json_str)
+ self.assertEqual(sql_impl.sql(), deserialized.sql())
+ self.assertEqual(sql_impl.runtime(), deserialized.runtime())
+
+ python_impl = PythonImplDTO(
+ _runtime="PYTHON",
+ _handler="test_module.test_func",
+ _code_block="def test_func(): pass",
+ _resources=None,
+ _properties={},
+ )
+ json_str = python_impl.to_json()
+ deserialized = PythonImplDTO.from_json(json_str)
+ self.assertEqual(python_impl.handler(), deserialized.handler())
+ self.assertEqual(python_impl.code_block(), deserialized.code_block())
+
+ java_impl = JavaImplDTO(
+ _runtime="JAVA",
+ _class_name="com.test.TestClass",
+ _resources=None,
+ _properties={},
+ )
+ json_str = java_impl.to_json()
+ deserialized = JavaImplDTO.from_json(json_str)
+ self.assertEqual(java_impl.class_name(), deserialized.class_name())
+
+ def test_function_resources_dto(self):
+ """Test FunctionResourcesDTO serialization and deserialization."""
+ resources = FunctionResourcesDTO(_jars=["v1"], _files=None,
_archives=None)
+ json_str = resources.to_json()
+ deserialized = FunctionResourcesDTO.from_json(json_str)
+ self.assertEqual(resources.jars(), deserialized.jars())
+
+ def test_function_column_dto(self):
+ """Test FunctionColumnDTO serialization and deserialization."""
+ column = FunctionColumnDTO(_name="col1",
_data_type=Types.IntegerType.get())
+ json_str = column.to_json()
+ deserialized = FunctionColumnDTO.from_json(json_str)
+ self.assertEqual(column.name(), deserialized.name())
+ self.assertEqual(column.data_type(), deserialized.data_type())
+
+ def test_function_type_cannot_be_null(self):
+ """Test FunctionDTO rejects null functionType during
deserialization."""
+ json_str = (
+
'{"name":"func1","functionType":null,"deterministic":true,"definitions":[]}'
+ )
+ with self.assertRaises(ValueError):
+ FunctionDTO.from_json(json_str)
diff --git a/clients/client-python/tests/unittests/test_requests.py
b/clients/client-python/tests/unittests/test_requests.py
index f3c2b19d1f..4d1f047e3b 100644
--- a/clients/client-python/tests/unittests/test_requests.py
+++ b/clients/client-python/tests/unittests/test_requests.py
@@ -19,7 +19,23 @@ import json
import unittest
from typing import cast
+from gravitino.api.function.function_type import FunctionType
+from gravitino.api.rel.types.types import Types
+from gravitino.dto.function.function_column_dto import FunctionColumnDTO
+from gravitino.dto.function.function_definition_dto import
FunctionDefinitionDTO
+from gravitino.dto.function.function_impl_dto import SQLImplDTO
+from gravitino.dto.function.function_param_dto import FunctionParamDTO
from gravitino.dto.requests.add_partitions_request import AddPartitionsRequest
+from gravitino.dto.requests.function_register_request import
FunctionRegisterRequest
+from gravitino.dto.requests.function_update_request import (
+ AddDefinitionRequest,
+ AddImplRequest,
+ RemoveDefinitionRequest,
+ RemoveImplRequest,
+ UpdateCommentRequest,
+ UpdateImplRequest,
+)
+from gravitino.dto.requests.function_updates_request import
FunctionUpdatesRequest
from gravitino.dto.requests.table_create_request import TableCreateRequest
from gravitino.exceptions.base import IllegalArgumentException
@@ -266,3 +282,133 @@ class TestRequests(unittest.TestCase):
with self.assertRaisesRegex(IllegalArgumentException,
exception_str):
req = TableCreateRequest.from_json(json_str)
req.validate()
+
+ def test_function_register_request(self):
+ """Test FunctionRegisterRequest."""
+ scalar_definition = FunctionDefinitionDTO(
+ _parameters=[],
+ _return_type=Types.IntegerType.get(),
+ _impls=[],
+ )
+ table_definition = FunctionDefinitionDTO(
+ _parameters=[],
+ _return_columns=[
+ FunctionColumnDTO(_name="out_col",
_data_type=Types.IntegerType.get())
+ ],
+ _impls=[],
+ )
+
+ req = FunctionRegisterRequest(
+ name="func1",
+ function_type=FunctionType.SCALAR,
+ deterministic=True,
+ definitions=[scalar_definition],
+ comment="comment",
+ )
+ req.validate()
+ json_data = json.loads(req.to_json())
+ self.assertEqual("func1", json_data["name"])
+ self.assertEqual("scalar", json_data["functionType"])
+ self.assertTrue(json_data["deterministic"])
+ self.assertEqual("comment", json_data["comment"])
+
+ table_req = FunctionRegisterRequest(
+ name="func_table",
+ function_type=FunctionType.TABLE,
+ deterministic=False,
+ definitions=[table_definition],
+ )
+ table_req.validate()
+
+ # Test request without required fields and type-specific validation
+ with self.assertRaises(IllegalArgumentException):
+ FunctionRegisterRequest(
+ name="func1",
+ function_type=None,
+ deterministic=True,
+ definitions=[],
+ ).validate()
+ with self.assertRaises(IllegalArgumentException):
+ FunctionRegisterRequest(
+ name="func1",
+ function_type=FunctionType.SCALAR,
+ deterministic=True,
+ definitions=[],
+ ).validate()
+ with self.assertRaises(IllegalArgumentException):
+ FunctionRegisterRequest(
+ name="func1",
+ function_type=FunctionType.TABLE,
+ deterministic=True,
+ definitions=[scalar_definition],
+ ).validate()
+ with self.assertRaises(IllegalArgumentException):
+ FunctionRegisterRequest(
+ name="func1",
+ function_type=FunctionType.SCALAR,
+ deterministic=True,
+ definitions=[table_definition],
+ ).validate()
+ with self.assertRaises(IllegalArgumentException):
+ FunctionRegisterRequest(
+ name="func1",
+ function_type=FunctionType.AGGREGATE,
+ deterministic=True,
+ definitions=[table_definition],
+ ).validate()
+
+ def test_function_update_request(self):
+ """Test FunctionUpdateRequest."""
+ parameters = [
+ FunctionParamDTO(_name="p1", _data_type=Types.IntegerType.get()),
+ ]
+ definition = FunctionDefinitionDTO(
+ _parameters=parameters,
+ _return_type=Types.IntegerType.get(),
+ _impls=[],
+ )
+ impl = SQLImplDTO(
+ _runtime="SPARK",
+ _sql="SELECT 1",
+ _resources=None,
+ _properties={},
+ )
+
+ comment_req = UpdateCommentRequest("new comment")
+ json_data = json.loads(comment_req.to_json())
+ self.assertEqual("updateComment", json_data["@type"])
+ self.assertEqual("new comment", json_data["newComment"])
+ comment_req.validate()
+ UpdateCommentRequest(None).validate()
+ UpdateCommentRequest(" ").validate()
+
+ AddDefinitionRequest(definition).validate()
+ with self.assertRaises(IllegalArgumentException):
+ AddDefinitionRequest(None).validate()
+
+ RemoveDefinitionRequest(parameters).validate()
+ with self.assertRaises(IllegalArgumentException):
+ RemoveDefinitionRequest(None).validate()
+
+ AddImplRequest(parameters, impl).validate()
+ with self.assertRaises(IllegalArgumentException):
+ AddImplRequest(parameters, None).validate()
+
+ UpdateImplRequest(parameters, "SPARK", impl).validate()
+ with self.assertRaises(IllegalArgumentException):
+ UpdateImplRequest(parameters, None, impl).validate()
+
+ RemoveImplRequest(parameters, "SPARK").validate()
+ with self.assertRaises(IllegalArgumentException):
+ RemoveImplRequest(parameters, None).validate()
+
+ def test_function_updates_request(self):
+ """Test FunctionUpdatesRequest."""
+ updates = [UpdateCommentRequest("new comment")]
+ req = FunctionUpdatesRequest(updates)
+ req.validate()
+ self.assertEqual(1, len(req.updates()))
+ self.assertIsInstance(req.updates()[0], UpdateCommentRequest)
+ with self.assertRaises(ValueError):
+ FunctionUpdatesRequest(None).validate()
+ FunctionUpdatesRequest([]).validate()