This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new c0c3e4e175 [#9531] feat(python-client): add function client operations 
and catalog integration (#9945)
c0c3e4e175 is described below

commit c0c3e4e175d69543790c845836c5405005cc53ce
Author: mchades <[email protected]>
AuthorDate: Thu Feb 26 20:28:01 2026 +0800

    [#9531] feat(python-client): add function client operations and catalog 
integration (#9945)
    
    ### What changes were proposed in this pull request?
    
    Add the client layer for function CRUD operations:
    - GenericFunction wrapper and GenericFunctionCatalog
    (FunctionCatalogOperations)
    - Integrate FunctionCatalog into BaseSchemaCatalog with delegation
    - Update FilesetCatalog and RelationalCatalog for new inheritance
    - Unit tests for function catalog client operations
    - Integration tests for end-to-end function lifecycle
    
    ### Why are the changes needed?
    
    Fix: #9531
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, function management is now available for the Python client
    
    ### How was this patch tested?
    
    tests added
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../gravitino/client/base_schema_catalog.py        |  52 ++-
 .../gravitino/client/fileset_catalog.py            |   4 +-
 .../client/function_catalog_operations.py          | 382 +++++++++++++++++++++
 .../gravitino/client/generic_function.py           |  60 ++++
 .../gravitino/client/relational_catalog.py         |   4 +-
 .../gravitino/dto/function/function_impl_dto.py    |   3 +
 .../tests/integration/test_function_catalog.py     | 295 ++++++++++++++++
 .../tests/unittests/test_function_catalog.py       | 252 ++++++++++++++
 8 files changed, 1048 insertions(+), 4 deletions(-)

diff --git a/clients/client-python/gravitino/client/base_schema_catalog.py 
b/clients/client-python/gravitino/client/base_schema_catalog.py
index 74dc5e8911..3160f5bd6e 100644
--- a/clients/client-python/gravitino/client/base_schema_catalog.py
+++ b/clients/client-python/gravitino/client/base_schema_catalog.py
@@ -16,14 +16,20 @@
 # under the License.
 
 import logging
-from typing import Dict, List
+from typing import Dict, List, Optional
 
 from gravitino.api.catalog import Catalog
 from gravitino.api.metadata_object import MetadataObject
 from gravitino.api.metadata_objects import MetadataObjects
+from gravitino.api.function.function import Function
+from gravitino.api.function.function_catalog import FunctionCatalog
+from gravitino.api.function.function_change import FunctionChange
+from gravitino.api.function.function_definition import FunctionDefinition
+from gravitino.api.function.function_type import FunctionType
 from gravitino.api.schema import Schema
 from gravitino.api.schema_change import SchemaChange
 from gravitino.api.supports_schemas import SupportsSchemas
+from gravitino.client.function_catalog_operations import 
FunctionCatalogOperations
 from gravitino.client.metadata_object_credential_operations import (
     MetadataObjectCredentialOperations,
 )
@@ -37,6 +43,7 @@ from gravitino.dto.responses.entity_list_response import 
EntityListResponse
 from gravitino.dto.responses.schema_response import SchemaResponse
 from gravitino.exceptions.base import IllegalArgumentException
 from gravitino.exceptions.handlers.schema_error_handler import 
SCHEMA_ERROR_HANDLER
+from gravitino.name_identifier import NameIdentifier
 from gravitino.namespace import Namespace
 from gravitino.rest.rest_utils import encode_string
 from gravitino.utils import HTTPClient
@@ -44,7 +51,7 @@ from gravitino.utils import HTTPClient
 logger = logging.getLogger(__name__)
 
 
-class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
+class BaseSchemaCatalog(CatalogDTO, SupportsSchemas, FunctionCatalog):
     """
     BaseSchemaCatalog is the base abstract class for all the catalog with 
schema. It provides the
     common methods for managing schemas in a catalog. With BaseSchemaCatalog, 
users can list,
@@ -60,6 +67,8 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
     # The metadata object credential operations
     _object_credential_operations: MetadataObjectCredentialOperations
 
+    _function_operations: FunctionCatalogOperations
+
     def __init__(
         self,
         catalog_namespace: Namespace,
@@ -86,12 +95,18 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
         self._object_credential_operations = 
MetadataObjectCredentialOperations(
             catalog_namespace.level(0), metadata_object, rest_client
         )
+        self._function_operations = FunctionCatalogOperations(
+            rest_client, catalog_namespace, self.name()
+        )
 
         self.validate()
 
     def as_schemas(self):
         return self
 
+    def as_function_catalog(self):
+        return self
+
     def list_schemas(self) -> List[str]:
         """List all the schemas under the given catalog namespace.
 
@@ -222,6 +237,39 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
         drop_resp.validate()
         return drop_resp.dropped()
 
+    def list_functions(self, namespace: Namespace) -> List[NameIdentifier]:
+        return self._function_operations.list_functions(namespace)
+
+    def list_function_infos(self, namespace: Namespace) -> List[Function]:
+        return self._function_operations.list_function_infos(namespace)
+
+    def get_function(self, ident: NameIdentifier) -> Function:
+        return self._function_operations.get_function(ident)
+
+    def register_function(
+        self,
+        ident: NameIdentifier,
+        comment: Optional[str],
+        function_type: FunctionType,
+        deterministic: bool,
+        definitions: List[FunctionDefinition],
+    ) -> Function:
+        return self._function_operations.register_function(
+            ident,
+            comment,
+            function_type,
+            deterministic,
+            definitions,
+        )
+
+    def alter_function(
+        self, ident: NameIdentifier, *changes: FunctionChange
+    ) -> Function:
+        return self._function_operations.alter_function(ident, *changes)
+
+    def drop_function(self, ident: NameIdentifier) -> bool:
+        return self._function_operations.drop_function(ident)
+
     def _schema_namespace(self) -> Namespace:
         return Namespace.of(self._catalog_namespace.level(0), self.name())
 
diff --git a/clients/client-python/gravitino/client/fileset_catalog.py 
b/clients/client-python/gravitino/client/fileset_catalog.py
index 4ef8e0ecca..40ee47ae6b 100644
--- a/clients/client-python/gravitino/client/fileset_catalog.py
+++ b/clients/client-python/gravitino/client/fileset_catalog.py
@@ -44,7 +44,9 @@ from gravitino.utils import HTTPClient
 logger = logging.getLogger(__name__)
 
 
-class FilesetCatalog(BaseSchemaCatalog, SupportsCredentials):
+class FilesetCatalog(
+    BaseSchemaCatalog, SupportsCredentials
+):  # pylint: disable=too-many-ancestors
     """
     Fileset catalog is a catalog implementation that supports fileset like 
metadata operations, for
     example, schemas and filesets list, creation, update and deletion. A 
Fileset catalog is under the metalake.
diff --git 
a/clients/client-python/gravitino/client/function_catalog_operations.py 
b/clients/client-python/gravitino/client/function_catalog_operations.py
new file mode 100644
index 0000000000..46050c7812
--- /dev/null
+++ b/clients/client-python/gravitino/client/function_catalog_operations.py
@@ -0,0 +1,382 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List, Optional
+
+from gravitino.api.function.function import Function
+from gravitino.api.function.function_catalog import FunctionCatalog
+from gravitino.api.function.function_change import (
+    AddDefinition,
+    AddImpl,
+    FunctionChange,
+    RemoveDefinition,
+    RemoveImpl,
+    UpdateComment,
+    UpdateImpl,
+)
+from gravitino.api.function.function_definition import FunctionDefinition
+from gravitino.api.function.function_type import FunctionType
+from gravitino.client.generic_function import GenericFunction
+from gravitino.dto.function.function_definition_dto import 
FunctionDefinitionDTO
+from gravitino.dto.function.function_impl_dto import (
+    function_impl_dto_from_function_impl,
+)
+from gravitino.dto.function.function_param_dto import FunctionParamDTO
+from gravitino.dto.requests.function_register_request import 
FunctionRegisterRequest
+from gravitino.dto.requests.function_update_request import (
+    AddDefinitionRequest,
+    AddImplRequest,
+    FunctionUpdateRequest,
+    RemoveDefinitionRequest,
+    RemoveImplRequest,
+    UpdateCommentRequest,
+    UpdateImplRequest,
+)
+from gravitino.dto.requests.function_updates_request import 
FunctionUpdatesRequest
+from gravitino.dto.responses.drop_response import DropResponse
+from gravitino.dto.responses.entity_list_response import EntityListResponse
+from gravitino.dto.responses.function_list_response import FunctionListResponse
+from gravitino.dto.responses.function_response import FunctionResponse
+from gravitino.exceptions.base import IllegalArgumentException
+from gravitino.exceptions.handlers.function_error_handler import (
+    FUNCTION_ERROR_HANDLER,
+)
+from gravitino.name_identifier import NameIdentifier
+from gravitino.namespace import Namespace
+from gravitino.rest import rest_utils
+
+
+class FunctionCatalogOperations(FunctionCatalog):
+    """Function catalog operations helper class that provides implementations 
for function management.
+
+    This class is used by catalogs that support function operations.
+    """
+
+    def __init__(self, rest_client, catalog_namespace: Namespace, 
catalog_name: str):
+        """Create a FunctionCatalogOperations instance.
+
+        Args:
+            rest_client: The REST client for making API calls.
+            catalog_namespace: The namespace of the catalog.
+            catalog_name: The name of the catalog.
+        """
+        self._rest_client = rest_client
+        self._catalog_namespace = catalog_namespace
+        self._catalog_name = catalog_name
+
+    def list_functions(self, namespace: Namespace) -> List[NameIdentifier]:
+        """List the functions in a schema namespace from the catalog.
+
+        Args:
+            namespace: A schema namespace. This namespace should have 1 level,
+                       which is the schema name.
+
+        Returns:
+            A list of function identifiers in the namespace.
+
+        Raises:
+            NoSuchSchemaException: If the schema does not exist.
+        """
+        self._check_function_namespace(namespace)
+
+        full_namespace = self._get_function_full_namespace(namespace)
+        resp = self._rest_client.get(
+            self._format_function_request_path(full_namespace),
+            error_handler=FUNCTION_ERROR_HANDLER,
+        )
+
+        entity_list_response = EntityListResponse.from_json(resp.body)
+        entity_list_response.validate()
+
+        return [
+            NameIdentifier.of(ident.namespace().level(2), ident.name())
+            for ident in entity_list_response.identifiers()
+        ]
+
+    def list_function_infos(self, namespace: Namespace) -> List[Function]:
+        """List the functions with details in a schema namespace from the 
catalog.
+
+        Args:
+            namespace: A namespace.
+
+        Returns:
+            A list of functions in the namespace.
+
+        Raises:
+            NoSuchSchemaException: If the schema does not exist.
+        """
+        self._check_function_namespace(namespace)
+
+        full_namespace = self._get_function_full_namespace(namespace)
+        params = {"details": "true"}
+        resp = self._rest_client.get(
+            self._format_function_request_path(full_namespace),
+            params=params,
+            error_handler=FUNCTION_ERROR_HANDLER,
+        )
+
+        function_list_response = FunctionListResponse.from_json(resp.body)
+        function_list_response.validate()
+
+        return [GenericFunction(func) for func in 
function_list_response.functions()]
+
+    def get_function(self, ident: NameIdentifier) -> Function:
+        """Get a function by NameIdentifier from the catalog.
+
+        Args:
+            ident: A function identifier, which should be "schema.function" 
format.
+
+        Returns:
+            The function metadata.
+
+        Raises:
+            NoSuchFunctionException: If the function does not exist.
+        """
+        self._check_function_name_identifier(ident)
+
+        full_namespace = self._get_function_full_namespace(ident.namespace())
+        resp = self._rest_client.get(
+            f"{self._format_function_request_path(full_namespace)}/"
+            f"{rest_utils.encode_string(ident.name())}",
+            error_handler=FUNCTION_ERROR_HANDLER,
+        )
+
+        function_response = FunctionResponse.from_json(resp.body)
+        function_response.validate()
+
+        return GenericFunction(function_response.function())
+
+    def register_function(
+        self,
+        ident: NameIdentifier,
+        comment: Optional[str],
+        function_type: FunctionType,
+        deterministic: bool,
+        definitions: List[FunctionDefinition],
+    ) -> Function:
+        """Register a function with one or more definitions (overloads).
+
+        Args:
+            ident: The function identifier.
+            comment: The optional function comment.
+            function_type: The function type.
+            deterministic: Whether the function is deterministic.
+            definitions: The function definitions.
+
+        Returns:
+            The registered function.
+
+        Raises:
+            NoSuchSchemaException: If the schema does not exist.
+            FunctionAlreadyExistsException: If the function already exists.
+        """
+        self._check_function_name_identifier(ident)
+
+        full_namespace = self._get_function_full_namespace(ident.namespace())
+
+        # Convert definitions to DTOs
+        definition_dtos = [
+            FunctionDefinitionDTO.from_function_definition(d) for d in 
definitions
+        ]
+
+        req = FunctionRegisterRequest(
+            name=ident.name(),
+            function_type=function_type,
+            deterministic=deterministic,
+            definitions=definition_dtos,
+            comment=comment,
+        )
+        req.validate()
+
+        resp = self._rest_client.post(
+            self._format_function_request_path(full_namespace),
+            req,
+            error_handler=FUNCTION_ERROR_HANDLER,
+        )
+
+        function_response = FunctionResponse.from_json(resp.body)
+        function_response.validate()
+
+        return GenericFunction(function_response.function())
+
+    def alter_function(
+        self, ident: NameIdentifier, *changes: FunctionChange
+    ) -> Function:
+        """Applies FunctionChange changes to a function in the catalog.
+
+        Args:
+            ident: The NameIdentifier instance of the function to alter.
+            changes: The FunctionChange instances to apply to the function.
+
+        Returns:
+            The updated Function instance.
+
+        Raises:
+            NoSuchFunctionException: If the function does not exist.
+            IllegalArgumentException: If the change is rejected by the 
implementation.
+        """
+        self._check_function_name_identifier(ident)
+        if not changes:
+            raise IllegalArgumentException(
+                "At least one FunctionChange must be provided to 
alter_function"
+            )
+
+        full_namespace = self._get_function_full_namespace(ident.namespace())
+        updates = [self._to_function_update_request(change) for change in 
changes]
+        req = FunctionUpdatesRequest(updates)
+        req.validate()
+
+        resp = self._rest_client.put(
+            f"{self._format_function_request_path(full_namespace)}/"
+            f"{rest_utils.encode_string(ident.name())}",
+            req,
+            error_handler=FUNCTION_ERROR_HANDLER,
+        )
+
+        function_response = FunctionResponse.from_json(resp.body)
+        function_response.validate()
+
+        return GenericFunction(function_response.function())
+
+    def drop_function(self, ident: NameIdentifier) -> bool:
+        """Drop a function from the catalog.
+
+        Args:
+            ident: A function identifier, which should be "schema.function" 
format.
+
+        Returns:
+            True if the function is dropped, False if the function did not 
exist.
+        """
+        self._check_function_name_identifier(ident)
+
+        full_namespace = self._get_function_full_namespace(ident.namespace())
+        resp = self._rest_client.delete(
+            f"{self._format_function_request_path(full_namespace)}/"
+            f"{rest_utils.encode_string(ident.name())}",
+            error_handler=FUNCTION_ERROR_HANDLER,
+        )
+
+        drop_response = DropResponse.from_json(resp.body)
+        drop_response.validate()
+
+        return drop_response.dropped()
+
+    def _format_function_request_path(self, ns: Namespace) -> str:
+        """Format the request path for function operations.
+
+        Args:
+            ns: The full namespace (metalake.catalog.schema).
+
+        Returns:
+            The request path.
+        """
+        return (
+            f"api/metalakes/{rest_utils.encode_string(ns.level(0))}"
+            f"/catalogs/{rest_utils.encode_string(ns.level(1))}"
+            f"/schemas/{rest_utils.encode_string(ns.level(2))}"
+            f"/functions"
+        )
+
+    def _check_function_namespace(self, namespace: Namespace):
+        """Check whether the namespace of a function is valid.
+
+        Args:
+            namespace: The namespace to check.
+
+        Raises:
+            IllegalArgumentException: If the namespace is invalid.
+        """
+        if namespace is None or namespace.length() != 1:
+            raise IllegalArgumentException(
+                f"Function namespace must be non-null and have 1 level, "
+                f"the input namespace is {namespace}"
+            )
+
+    def _check_function_name_identifier(self, ident: NameIdentifier):
+        """Check whether the NameIdentifier of a function is valid.
+
+        Args:
+            ident: The NameIdentifier to check.
+
+        Raises:
+            IllegalArgumentException: If the identifier is invalid.
+        """
+        if ident is None:
+            raise IllegalArgumentException("NameIdentifier must not be null")
+        if not ident.name():
+            raise IllegalArgumentException("NameIdentifier name must not be 
empty")
+        self._check_function_namespace(ident.namespace())
+
+    def _get_function_full_namespace(self, function_namespace: Namespace) -> 
Namespace:
+        """Get the full namespace of the function with the given function's 
short namespace.
+
+        Args:
+            function_namespace: The function's short namespace (schema name).
+
+        Returns:
+            The full namespace (metalake.catalog.schema).
+        """
+        return Namespace.of(
+            self._catalog_namespace.level(0),
+            self._catalog_name,
+            function_namespace.level(0),
+        )
+
+    def _to_function_update_request(
+        self, change: FunctionChange
+    ) -> FunctionUpdateRequest:
+        """Convert a FunctionChange to a FunctionUpdateRequest.
+
+        Args:
+            change: The function change.
+
+        Returns:
+            The corresponding update request.
+
+        Raises:
+            IllegalArgumentException: If the change type is not supported.
+        """
+        if isinstance(change, UpdateComment):
+            return UpdateCommentRequest(change.new_comment())
+        if isinstance(change, AddDefinition):
+            definition_dto = FunctionDefinitionDTO.from_function_definition(
+                change.definition()
+            )
+            return AddDefinitionRequest(definition_dto)
+        if isinstance(change, RemoveDefinition):
+            param_dtos = [
+                FunctionParamDTO.from_function_param(p) for p in 
change.parameters()
+            ]
+            return RemoveDefinitionRequest(param_dtos)
+        if isinstance(change, AddImpl):
+            param_dtos = [
+                FunctionParamDTO.from_function_param(p) for p in 
change.parameters()
+            ]
+            impl_dto = 
function_impl_dto_from_function_impl(change.implementation())
+            return AddImplRequest(param_dtos, impl_dto)
+        if isinstance(change, UpdateImpl):
+            param_dtos = [
+                FunctionParamDTO.from_function_param(p) for p in 
change.parameters()
+            ]
+            impl_dto = 
function_impl_dto_from_function_impl(change.implementation())
+            return UpdateImplRequest(param_dtos, change.runtime().name, 
impl_dto)
+        if isinstance(change, RemoveImpl):
+            param_dtos = [
+                FunctionParamDTO.from_function_param(p) for p in 
change.parameters()
+            ]
+            return RemoveImplRequest(param_dtos, change.runtime().name)
+        raise IllegalArgumentException(f"Unknown function change type: 
{type(change)}")
diff --git a/clients/client-python/gravitino/client/generic_function.py 
b/clients/client-python/gravitino/client/generic_function.py
new file mode 100644
index 0000000000..634084b291
--- /dev/null
+++ b/clients/client-python/gravitino/client/generic_function.py
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List, Optional
+
+from gravitino.api.function.function import Function
+from gravitino.api.function.function_definition import FunctionDefinition
+from gravitino.api.function.function_type import FunctionType
+from gravitino.dto.audit_dto import AuditDTO
+from gravitino.dto.function.function_dto import FunctionDTO
+
+
+class GenericFunction(Function):
+    """A generic implementation of the Function interface."""
+
+    def __init__(self, function_dto: FunctionDTO):
+        """Create a GenericFunction from a FunctionDTO.
+
+        Args:
+            function_dto: The function DTO.
+        """
+        self._function_dto = function_dto
+
+    def name(self) -> str:
+        """Returns the function name."""
+        return self._function_dto.name()
+
+    def function_type(self) -> FunctionType:
+        """Returns the function type."""
+        return self._function_dto.function_type()
+
+    def deterministic(self) -> bool:
+        """Returns whether the function is deterministic."""
+        return self._function_dto.deterministic()
+
+    def comment(self) -> Optional[str]:
+        """Returns the optional comment of the function."""
+        return self._function_dto.comment()
+
+    def definitions(self) -> List[FunctionDefinition]:
+        """Returns the definitions of the function."""
+        return self._function_dto.definitions()
+
+    def audit_info(self) -> Optional[AuditDTO]:
+        """Returns the audit information."""
+        return self._function_dto.audit_info()
diff --git a/clients/client-python/gravitino/client/relational_catalog.py 
b/clients/client-python/gravitino/client/relational_catalog.py
index 54d0265591..2330334589 100644
--- a/clients/client-python/gravitino/client/relational_catalog.py
+++ b/clients/client-python/gravitino/client/relational_catalog.py
@@ -40,7 +40,9 @@ from gravitino.rest.rest_utils import encode_string
 from gravitino.utils import HTTPClient
 
 
-class RelationalCatalog(BaseSchemaCatalog, TableCatalog):
+class RelationalCatalog(
+    BaseSchemaCatalog, TableCatalog
+):  # pylint: disable=too-many-ancestors
     """Relational catalog is a catalog implementation
 
     The `RelationalCatalog` supports relational database like metadata 
operations,
diff --git a/clients/client-python/gravitino/dto/function/function_impl_dto.py 
b/clients/client-python/gravitino/dto/function/function_impl_dto.py
index 110e38cd8a..4834923dcd 100644
--- a/clients/client-python/gravitino/dto/function/function_impl_dto.py
+++ b/clients/client-python/gravitino/dto/function/function_impl_dto.py
@@ -48,6 +48,7 @@ class SQLImplDTO(FunctionImplDTO):
 
     _runtime: str = field(metadata=config(field_name="runtime"))
     _sql: str = field(metadata=config(field_name="sql"))
+    _language: str = field(default="SQL", 
metadata=config(field_name="language"))
     _resources: Optional[FunctionResourcesDTO] = field(
         default=None, metadata=config(field_name="resources")
     )
@@ -117,6 +118,7 @@ class JavaImplDTO(FunctionImplDTO):
 
     _runtime: str = field(metadata=config(field_name="runtime"))
     _class_name: str = field(metadata=config(field_name="className"))
+    _language: str = field(default="JAVA", 
metadata=config(field_name="language"))
     _resources: Optional[FunctionResourcesDTO] = field(
         default=None, metadata=config(field_name="resources")
     )
@@ -186,6 +188,7 @@ class PythonImplDTO(FunctionImplDTO):
 
     _runtime: str = field(metadata=config(field_name="runtime"))
     _handler: str = field(metadata=config(field_name="handler"))
+    _language: str = field(default="PYTHON", 
metadata=config(field_name="language"))
     _code_block: Optional[str] = field(
         default=None, metadata=config(field_name="codeBlock")
     )
diff --git a/clients/client-python/tests/integration/test_function_catalog.py 
b/clients/client-python/tests/integration/test_function_catalog.py
new file mode 100644
index 0000000000..ec515785ea
--- /dev/null
+++ b/clients/client-python/tests/integration/test_function_catalog.py
@@ -0,0 +1,295 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from random import randint
+
+from gravitino import Catalog, GravitinoAdminClient, GravitinoClient, 
NameIdentifier
+from gravitino.api.function.function_change import FunctionChange
+from gravitino.api.function.function_definition import FunctionDefinitions
+from gravitino.api.function.function_impl import FunctionImpl
+from gravitino.api.function.function_param import FunctionParams
+from gravitino.api.function.function_type import FunctionType
+from gravitino.api.function.sql_impl import SQLImpl
+from gravitino.api.rel.types.types import Types
+from gravitino.exceptions.base import (
+    FunctionAlreadyExistsException,
+    NoSuchFunctionException,
+)
+from gravitino.namespace import Namespace
+from tests.integration.integration_test_env import IntegrationTestEnv
+
+
+class TestFunctionCatalog(IntegrationTestEnv):
+    """Integration tests for Function catalog operations."""
+
+    _metalake_name: str = "function_it_metalake" + str(randint(0, 1000))
+    _catalog_name: str = "function_it_catalog" + str(randint(0, 1000))
+    _schema_name: str = "function_it_schema" + str(randint(0, 1000))
+
+    _gravitino_admin_client: GravitinoAdminClient = None
+    _gravitino_client: GravitinoClient = None
+    _catalog: Catalog = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Set up the integration test environment."""
+        super().setUpClass()
+
+        cls._gravitino_admin_client = 
GravitinoAdminClient(uri="http://localhost:8090";)
+        cls._gravitino_admin_client.create_metalake(
+            cls._metalake_name, comment="comment", properties={}
+        )
+
+        cls._gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";, metalake_name=cls._metalake_name
+        )
+        cls._catalog = cls._gravitino_client.create_catalog(
+            name=cls._catalog_name,
+            catalog_type=Catalog.Type.FILESET,
+            provider="hadoop",
+            comment="comment",
+            properties={},
+        )
+
+    @classmethod
+    def tearDownClass(cls):
+        """Tear down the integration test environment."""
+        cls._gravitino_client.drop_catalog(name=cls._catalog_name, force=True)
+        cls._gravitino_admin_client.drop_metalake(name=cls._metalake_name, 
force=True)
+
+        super().tearDownClass()
+
+    def setUp(self):
+        """Set up schema before each test."""
+        self._catalog.as_schemas().create_schema(self._schema_name, "comment", 
{})
+
+    def tearDown(self):
+        """Drop schema after each test."""
+        self._catalog.as_schemas().drop_schema(self._schema_name, True)
+
+    def _create_spark_impl(self, sql: str = "SELECT 1") -> SQLImpl:
+        """Create a Spark SQL implementation for testing."""
+        return (
+            SQLImpl.builder()
+            .with_runtime_type(SQLImpl.RuntimeType.SPARK)
+            .with_sql(sql)
+            .build()
+        )
+
+    def _create_trino_impl(self, sql: str = "SELECT 1") -> SQLImpl:
+        """Create a Trino SQL implementation for testing."""
+        return (
+            SQLImpl.builder()
+            .with_runtime_type(SQLImpl.RuntimeType.TRINO)
+            .with_sql(sql)
+            .build()
+        )
+
+    def _create_scalar_definition(self):
+        """Create a scalar function definition with no parameters for 
testing."""
+        return FunctionDefinitions.of(
+            [], Types.IntegerType.get(), [self._create_spark_impl()]
+        )
+
+    def test_create_get_function(self):
+        """Test creating and retrieving a function."""
+        function_name = "function_it_function" + str(randint(0, 1000))
+        function_ident = NameIdentifier.of(self._schema_name, function_name)
+        comment = "comment"
+        definition = self._create_scalar_definition()
+
+        # Test create function
+        function = self._catalog.as_function_catalog().register_function(
+            ident=function_ident,
+            comment=comment,
+            function_type=FunctionType.SCALAR,
+            deterministic=True,
+            definitions=[definition],
+        )
+        self.assertEqual(function_name, function.name())
+        self.assertEqual(comment, function.comment())
+        self.assertEqual(FunctionType.SCALAR, function.function_type())
+        self.assertTrue(function.deterministic())
+
+        # Test get function
+        function = 
self._catalog.as_function_catalog().get_function(function_ident)
+        self.assertEqual(function_name, function.name())
+        self.assertEqual(comment, function.comment())
+        self.assertEqual(FunctionType.SCALAR, function.function_type())
+        self.assertEqual(
+            Types.IntegerType.get(), function.definitions()[0].return_type()
+        )
+
+        # Test create exists function
+        with self.assertRaises(FunctionAlreadyExistsException):
+            self._catalog.as_function_catalog().register_function(
+                ident=function_ident,
+                comment=comment,
+                function_type=FunctionType.SCALAR,
+                deterministic=True,
+                definitions=[definition],
+            )
+
+    def test_list_functions(self):
+        """Test listing functions in a schema."""
+        function_name1 = "function_it_function1" + str(randint(0, 1000))
+        function_name2 = "function_it_function2" + str(randint(0, 1000))
+        function_ident1 = NameIdentifier.of(self._schema_name, function_name1)
+        function_ident2 = NameIdentifier.of(self._schema_name, function_name2)
+        definition = self._create_scalar_definition()
+
+        self._catalog.as_function_catalog().register_function(
+            ident=function_ident1,
+            comment="comment",
+            function_type=FunctionType.SCALAR,
+            deterministic=True,
+            definitions=[definition],
+        )
+        self._catalog.as_function_catalog().register_function(
+            ident=function_ident2,
+            comment="comment",
+            function_type=FunctionType.SCALAR,
+            deterministic=True,
+            definitions=[definition],
+        )
+
+        idents = self._catalog.as_function_catalog().list_functions(
+            Namespace.of(self._schema_name)
+        )
+        self.assertEqual(2, len(idents))
+        self.assertTrue(function_ident1 in idents)
+        self.assertTrue(function_ident2 in idents)
+
+        functions = self._catalog.as_function_catalog().list_function_infos(
+            Namespace.of(self._schema_name)
+        )
+        self.assertEqual(2, len(functions))
+        self.assertEqual(function_name1, functions[0].name())
+        self.assertEqual(function_name2, functions[1].name())
+
+    def test_alter_function(self):
+        """Test altering a function with all supported FunctionChange types."""
+        function_name = "function_it_function" + str(randint(0, 1000))
+        function_ident = NameIdentifier.of(self._schema_name, function_name)
+        definition = self._create_scalar_definition()
+
+        self._catalog.as_function_catalog().register_function(
+            ident=function_ident,
+            comment="comment",
+            function_type=FunctionType.SCALAR,
+            deterministic=True,
+            definitions=[definition],
+        )
+
+        # Test update comment
+        function = self._catalog.as_function_catalog().alter_function(
+            function_ident, FunctionChange.update_comment("new comment")
+        )
+        self.assertEqual("new comment", function.comment())
+        function = 
self._catalog.as_function_catalog().get_function(function_ident)
+        self.assertEqual("new comment", function.comment())
+
+        # Test add_definition: add an overload with one parameter
+        x_param = FunctionParams.of("x", Types.IntegerType.get())
+        new_definition = FunctionDefinitions.of(
+            [x_param], Types.IntegerType.get(), 
[self._create_spark_impl("SELECT x")]
+        )
+        function = self._catalog.as_function_catalog().alter_function(
+            function_ident, FunctionChange.add_definition(new_definition)
+        )
+        self.assertEqual(2, len(function.definitions()))
+
+        function = 
self._catalog.as_function_catalog().get_function(function_ident)
+        self.assertEqual(2, len(function.definitions()))
+
+        # Test add_impl: add a Trino implementation to the no-arg definition
+        trino_impl = self._create_trino_impl()
+        function = self._catalog.as_function_catalog().alter_function(
+            function_ident,
+            FunctionChange.add_impl([], trino_impl),
+        )
+        no_arg_def = next(d for d in function.definitions() if not 
d.parameters())
+        impl_runtimes = [i.runtime() for i in no_arg_def.impls()]
+        self.assertIn(FunctionImpl.RuntimeType.TRINO, impl_runtimes)
+
+        function = 
self._catalog.as_function_catalog().get_function(function_ident)
+        no_arg_def = next(d for d in function.definitions() if not 
d.parameters())
+        impl_runtimes = [i.runtime() for i in no_arg_def.impls()]
+        self.assertIn(FunctionImpl.RuntimeType.TRINO, impl_runtimes)
+
+        # Test update_impl: replace the Trino implementation with new SQL
+        updated_trino_impl = self._create_trino_impl("SELECT 2")
+        function = self._catalog.as_function_catalog().alter_function(
+            function_ident,
+            FunctionChange.update_impl([], FunctionImpl.RuntimeType.TRINO, 
updated_trino_impl),
+        )
+        no_arg_def = next(d for d in function.definitions() if not 
d.parameters())
+        trino_impls = [i for i in no_arg_def.impls() if i.runtime() == 
FunctionImpl.RuntimeType.TRINO]
+        self.assertEqual(1, len(trino_impls))
+        self.assertEqual("SELECT 2", trino_impls[0].sql())
+
+        function = 
self._catalog.as_function_catalog().get_function(function_ident)
+        no_arg_def = next(d for d in function.definitions() if not 
d.parameters())
+        trino_impls = [i for i in no_arg_def.impls() if i.runtime() == 
FunctionImpl.RuntimeType.TRINO]
+        self.assertEqual("SELECT 2", trino_impls[0].sql())
+
+        # Test remove_impl: remove the Trino implementation from the no-arg 
definition
+        function = self._catalog.as_function_catalog().alter_function(
+            function_ident,
+            FunctionChange.remove_impl([], FunctionImpl.RuntimeType.TRINO),
+        )
+        no_arg_def = next(d for d in function.definitions() if not 
d.parameters())
+        impl_runtimes = [i.runtime() for i in no_arg_def.impls()]
+        self.assertNotIn(FunctionImpl.RuntimeType.TRINO, impl_runtimes)
+
+        function = 
self._catalog.as_function_catalog().get_function(function_ident)
+        no_arg_def = next(d for d in function.definitions() if not 
d.parameters())
+        impl_runtimes = [i.runtime() for i in no_arg_def.impls()]
+        self.assertNotIn(FunctionImpl.RuntimeType.TRINO, impl_runtimes)
+
+        # Test remove_definition: remove the parameterized overload
+        function = self._catalog.as_function_catalog().alter_function(
+            function_ident, FunctionChange.remove_definition([x_param])
+        )
+        self.assertEqual(1, len(function.definitions()))
+
+        function = 
self._catalog.as_function_catalog().get_function(function_ident)
+        self.assertEqual(1, len(function.definitions()))
+
+    def test_drop_function(self):
+        """Test dropping a function."""
+        function_name = "function_it_function" + str(randint(0, 1000))
+        function_ident = NameIdentifier.of(self._schema_name, function_name)
+        definition = self._create_scalar_definition()
+
+        self._catalog.as_function_catalog().register_function(
+            ident=function_ident,
+            comment="comment",
+            function_type=FunctionType.SCALAR,
+            deterministic=True,
+            definitions=[definition],
+        )
+
+        self.assertTrue(
+            self._catalog.as_function_catalog().drop_function(function_ident)
+        )
+        self.assertFalse(
+            self._catalog.as_function_catalog().drop_function(function_ident)
+        )
+
+        with self.assertRaises(NoSuchFunctionException):
+            self._catalog.as_function_catalog().get_function(function_ident)
diff --git a/clients/client-python/tests/unittests/test_function_catalog.py 
b/clients/client-python/tests/unittests/test_function_catalog.py
new file mode 100644
index 0000000000..a29da86c9c
--- /dev/null
+++ b/clients/client-python/tests/unittests/test_function_catalog.py
@@ -0,0 +1,252 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest.mock import patch, Mock
+
+from gravitino.api.function.function_change import FunctionChange
+from gravitino.api.function.function_type import FunctionType
+from gravitino.api.rel.types.types import Types
+from gravitino.client.relational_catalog import RelationalCatalog
+from gravitino.dto.audit_dto import AuditDTO
+from gravitino.dto.function.function_definition_dto import 
FunctionDefinitionDTO
+from gravitino.dto.function.function_dto import FunctionDTO
+from gravitino.dto.function.function_impl_dto import (
+    SQLImplDTO,
+)
+from gravitino.dto.function.function_param_dto import FunctionParamDTO
+from gravitino.dto.responses.drop_response import DropResponse
+from gravitino.dto.responses.entity_list_response import EntityListResponse
+from gravitino.dto.responses.function_list_response import FunctionListResponse
+from gravitino.dto.responses.function_response import FunctionResponse
+from gravitino.exceptions.base import (
+    NoSuchSchemaException,
+    NoSuchFunctionException,
+    FunctionAlreadyExistsException,
+)
+from gravitino.name_identifier import NameIdentifier
+from gravitino.namespace import Namespace
+from gravitino.utils import HTTPClient, Response
+
+
+class TestFunctionCatalog(unittest.TestCase):
+    metalake_name = "test_metalake"
+    catalog_name = "test_catalog"
+    schema_name = "test_schema"
+    catalog_namespace = Namespace.of(metalake_name)
+
+    @classmethod
+    def setUpClass(cls) -> None:
+        cls.rest_client = HTTPClient("http://localhost:8090";)
+        cls.catalog = RelationalCatalog(
+            catalog_namespace=cls.catalog_namespace,
+            name=cls.catalog_name,
+            catalog_type=RelationalCatalog.Type.RELATIONAL,
+            provider="test_provider",
+            audit=AuditDTO("anonymous"),
+            rest_client=cls.rest_client,
+        )
+
+    def _get_mock_http_resp(self, json_str: str, return_code: int = 200):
+        mock_http_resp = Mock()
+        mock_http_resp.getcode.return_value = return_code
+        mock_http_resp.read.return_value = json_str.encode("utf-8")
+        mock_http_resp.info.return_value = None
+        mock_http_resp.url = None
+        mock_resp = Response(mock_http_resp)
+        return mock_resp
+
+    def _mock_function_dto(
+        self, name, function_type, comment, deterministic
+    ) -> FunctionDTO:
+        params = [FunctionParamDTO(_name="param1", 
_data_type=Types.IntegerType.get())]
+        impl = SQLImplDTO(
+            _runtime="SPARK",
+            _sql="SELECT param1 + 1",
+            _resources=None,
+            _properties={},
+        )
+        definition = FunctionDefinitionDTO(
+            _parameters=params, _return_type=Types.IntegerType.get(), 
_impls=[impl]
+        )
+        return FunctionDTO(
+            _name=name,
+            _definitions=[definition],
+            _function_type=function_type,
+            _deterministic=deterministic,
+            _comment=comment,
+            _audit=AuditDTO(
+                "creator", "2022-01-01T00:00:00Z", "modifier", 
"2022-01-01T00:00:00Z"
+            ),
+        )
+
+    def test_list_functions(self):
+        func1 = NameIdentifier.of(
+            self.metalake_name, self.catalog_name, self.schema_name, "func1"
+        )
+        func2 = NameIdentifier.of(
+            self.metalake_name, self.catalog_name, self.schema_name, "func2"
+        )
+
+        resp_body = EntityListResponse(_code=0, _idents=[func1, func2])
+        mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.get", 
return_value=mock_resp
+        ):
+            functions = self.catalog.as_function_catalog().list_functions(
+                Namespace.of(self.schema_name)
+            )
+            self.assertEqual(2, len(functions))
+            self.assertEqual("func1", functions[0].name())
+            self.assertEqual("func2", functions[1].name())
+
+        # Test NoSuchSchemaException
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.get",
+            side_effect=NoSuchSchemaException("schema not found"),
+        ):
+            with self.assertRaises(NoSuchSchemaException):
+                self.catalog.as_function_catalog().list_functions(
+                    Namespace.of(self.schema_name)
+                )
+
+    def test_list_function_infos(self):
+        func1 = self._mock_function_dto("func1", FunctionType.SCALAR, 
"comment1", True)
+        func2 = self._mock_function_dto("func2", FunctionType.SCALAR, 
"comment2", False)
+
+        resp_body = FunctionListResponse(_code=0, _functions=[func1, func2])
+        mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.get", 
return_value=mock_resp
+        ):
+            functions = self.catalog.as_function_catalog().list_function_infos(
+                Namespace.of(self.schema_name)
+            )
+            self.assertEqual(2, len(functions))
+            self.assertEqual("func1", functions[0].name())
+            self.assertEqual("func2", functions[1].name())
+            self.assertEqual("comment1", functions[0].comment())
+            self.assertEqual("comment2", functions[1].comment())
+
+    def test_get_function(self):
+        ident = NameIdentifier.of(self.schema_name, "func1")
+        mock_function = self._mock_function_dto(
+            "func1", FunctionType.SCALAR, "mock comment", True
+        )
+        resp_body = FunctionResponse(_code=0, _function=mock_function)
+        mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.get", 
return_value=mock_resp
+        ):
+            func = self.catalog.as_function_catalog().get_function(ident)
+            self.assertIsNotNone(func)
+            self.assertEqual("func1", func.name())
+            self.assertEqual("mock comment", func.comment())
+
+        # Test NoSuchFunctionException
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.get",
+            side_effect=NoSuchFunctionException("function not found"),
+        ):
+            with self.assertRaises(NoSuchFunctionException):
+                self.catalog.as_function_catalog().get_function(ident)
+
+    def test_register_function(self):
+        ident = NameIdentifier.of(self.schema_name, "func1")
+        mock_function = self._mock_function_dto(
+            "func1", FunctionType.SCALAR, "mock comment", True
+        )
+        resp_body = FunctionResponse(_code=0, _function=mock_function)
+        mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.post", 
return_value=mock_resp
+        ):
+            func = self.catalog.as_function_catalog().register_function(
+                ident,
+                "mock comment",
+                FunctionType.SCALAR,
+                True,
+                definitions=mock_function.definitions(),
+            )
+            self.assertIsNotNone(func)
+            self.assertEqual("func1", func.name())
+            self.assertEqual("mock comment", func.comment())
+
+        # Test FunctionAlreadyExistsException
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.post",
+            side_effect=FunctionAlreadyExistsException("function already 
exists"),
+        ):
+            with self.assertRaises(FunctionAlreadyExistsException):
+                self.catalog.as_function_catalog().register_function(
+                    ident,
+                    "mock comment",
+                    FunctionType.SCALAR,
+                    True,
+                    definitions=mock_function.definitions(),
+                )
+
+    def test_alter_function(self):
+        ident = NameIdentifier.of(self.schema_name, "func1")
+        mock_function = self._mock_function_dto(
+            "func1", FunctionType.SCALAR, "updated comment", True
+        )
+        resp_body = FunctionResponse(_code=0, _function=mock_function)
+        mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.put", 
return_value=mock_resp
+        ):
+            func = self.catalog.as_function_catalog().alter_function(
+                ident, FunctionChange.update_comment("updated comment")
+            )
+            self.assertIsNotNone(func)
+            self.assertEqual("updated comment", func.comment())
+
+        # Test NoSuchFunctionException
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.put",
+            side_effect=NoSuchFunctionException("function not found"),
+        ):
+            with self.assertRaises(NoSuchFunctionException):
+                self.catalog.as_function_catalog().alter_function(
+                    ident, FunctionChange.update_comment("updated comment")
+                )
+
+    def test_drop_function(self):
+        ident = NameIdentifier.of(self.schema_name, "func1")
+        resp_body = DropResponse(_code=0, _dropped=True)
+        mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.delete", 
return_value=mock_resp
+        ):
+            dropped = self.catalog.as_function_catalog().drop_function(ident)
+            self.assertTrue(dropped)
+
+        # Test function not exists
+        resp_body = DropResponse(_code=0, _dropped=False)
+        mock_resp = self._get_mock_http_resp(resp_body.to_json())
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.delete", 
return_value=mock_resp
+        ):
+            dropped = self.catalog.as_function_catalog().drop_function(ident)
+            self.assertFalse(dropped)

Reply via email to