This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new c0c3e4e175 [#9531] feat(python-client): add function client operations
and catalog integration (#9945)
c0c3e4e175 is described below
commit c0c3e4e175d69543790c845836c5405005cc53ce
Author: mchades <[email protected]>
AuthorDate: Thu Feb 26 20:28:01 2026 +0800
[#9531] feat(python-client): add function client operations and catalog
integration (#9945)
### What changes were proposed in this pull request?
Add the client layer for function CRUD operations:
- GenericFunction wrapper and GenericFunctionCatalog
(FunctionCatalogOperations)
- Integrate FunctionCatalog into BaseSchemaCatalog with delegation
- Update FilesetCatalog and RelationalCatalog for new inheritance
- Unit tests for function catalog client operations
- Integration tests for end-to-end function lifecycle
### Why are the changes needed?
Fix: #9531
### Does this PR introduce _any_ user-facing change?
yes, function management is now available for the Python client
### How was this patch tested?
tests added
---------
Co-authored-by: Copilot <[email protected]>
---
.../gravitino/client/base_schema_catalog.py | 52 ++-
.../gravitino/client/fileset_catalog.py | 4 +-
.../client/function_catalog_operations.py | 382 +++++++++++++++++++++
.../gravitino/client/generic_function.py | 60 ++++
.../gravitino/client/relational_catalog.py | 4 +-
.../gravitino/dto/function/function_impl_dto.py | 3 +
.../tests/integration/test_function_catalog.py | 295 ++++++++++++++++
.../tests/unittests/test_function_catalog.py | 252 ++++++++++++++
8 files changed, 1048 insertions(+), 4 deletions(-)
diff --git a/clients/client-python/gravitino/client/base_schema_catalog.py
b/clients/client-python/gravitino/client/base_schema_catalog.py
index 74dc5e8911..3160f5bd6e 100644
--- a/clients/client-python/gravitino/client/base_schema_catalog.py
+++ b/clients/client-python/gravitino/client/base_schema_catalog.py
@@ -16,14 +16,20 @@
# under the License.
import logging
-from typing import Dict, List
+from typing import Dict, List, Optional
from gravitino.api.catalog import Catalog
from gravitino.api.metadata_object import MetadataObject
from gravitino.api.metadata_objects import MetadataObjects
+from gravitino.api.function.function import Function
+from gravitino.api.function.function_catalog import FunctionCatalog
+from gravitino.api.function.function_change import FunctionChange
+from gravitino.api.function.function_definition import FunctionDefinition
+from gravitino.api.function.function_type import FunctionType
from gravitino.api.schema import Schema
from gravitino.api.schema_change import SchemaChange
from gravitino.api.supports_schemas import SupportsSchemas
+from gravitino.client.function_catalog_operations import
FunctionCatalogOperations
from gravitino.client.metadata_object_credential_operations import (
MetadataObjectCredentialOperations,
)
@@ -37,6 +43,7 @@ from gravitino.dto.responses.entity_list_response import
EntityListResponse
from gravitino.dto.responses.schema_response import SchemaResponse
from gravitino.exceptions.base import IllegalArgumentException
from gravitino.exceptions.handlers.schema_error_handler import
SCHEMA_ERROR_HANDLER
+from gravitino.name_identifier import NameIdentifier
from gravitino.namespace import Namespace
from gravitino.rest.rest_utils import encode_string
from gravitino.utils import HTTPClient
@@ -44,7 +51,7 @@ from gravitino.utils import HTTPClient
logger = logging.getLogger(__name__)
-class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
+class BaseSchemaCatalog(CatalogDTO, SupportsSchemas, FunctionCatalog):
"""
BaseSchemaCatalog is the base abstract class for all the catalog with
schema. It provides the
common methods for managing schemas in a catalog. With BaseSchemaCatalog,
users can list,
@@ -60,6 +67,8 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
# The metadata object credential operations
_object_credential_operations: MetadataObjectCredentialOperations
+ _function_operations: FunctionCatalogOperations
+
def __init__(
self,
catalog_namespace: Namespace,
@@ -86,12 +95,18 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
self._object_credential_operations =
MetadataObjectCredentialOperations(
catalog_namespace.level(0), metadata_object, rest_client
)
+ self._function_operations = FunctionCatalogOperations(
+ rest_client, catalog_namespace, self.name()
+ )
self.validate()
def as_schemas(self):
return self
+ def as_function_catalog(self):
+ return self
+
def list_schemas(self) -> List[str]:
"""List all the schemas under the given catalog namespace.
@@ -222,6 +237,39 @@ class BaseSchemaCatalog(CatalogDTO, SupportsSchemas):
drop_resp.validate()
return drop_resp.dropped()
+ def list_functions(self, namespace: Namespace) -> List[NameIdentifier]:
+ return self._function_operations.list_functions(namespace)
+
+ def list_function_infos(self, namespace: Namespace) -> List[Function]:
+ return self._function_operations.list_function_infos(namespace)
+
+ def get_function(self, ident: NameIdentifier) -> Function:
+ return self._function_operations.get_function(ident)
+
+ def register_function(
+ self,
+ ident: NameIdentifier,
+ comment: Optional[str],
+ function_type: FunctionType,
+ deterministic: bool,
+ definitions: List[FunctionDefinition],
+ ) -> Function:
+ return self._function_operations.register_function(
+ ident,
+ comment,
+ function_type,
+ deterministic,
+ definitions,
+ )
+
+ def alter_function(
+ self, ident: NameIdentifier, *changes: FunctionChange
+ ) -> Function:
+ return self._function_operations.alter_function(ident, *changes)
+
+ def drop_function(self, ident: NameIdentifier) -> bool:
+ return self._function_operations.drop_function(ident)
+
def _schema_namespace(self) -> Namespace:
return Namespace.of(self._catalog_namespace.level(0), self.name())
diff --git a/clients/client-python/gravitino/client/fileset_catalog.py
b/clients/client-python/gravitino/client/fileset_catalog.py
index 4ef8e0ecca..40ee47ae6b 100644
--- a/clients/client-python/gravitino/client/fileset_catalog.py
+++ b/clients/client-python/gravitino/client/fileset_catalog.py
@@ -44,7 +44,9 @@ from gravitino.utils import HTTPClient
logger = logging.getLogger(__name__)
-class FilesetCatalog(BaseSchemaCatalog, SupportsCredentials):
+class FilesetCatalog(
+ BaseSchemaCatalog, SupportsCredentials
+): # pylint: disable=too-many-ancestors
"""
Fileset catalog is a catalog implementation that supports fileset like
metadata operations, for
example, schemas and filesets list, creation, update and deletion. A
Fileset catalog is under the metalake.
diff --git
a/clients/client-python/gravitino/client/function_catalog_operations.py
b/clients/client-python/gravitino/client/function_catalog_operations.py
new file mode 100644
index 0000000000..46050c7812
--- /dev/null
+++ b/clients/client-python/gravitino/client/function_catalog_operations.py
@@ -0,0 +1,382 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List, Optional
+
+from gravitino.api.function.function import Function
+from gravitino.api.function.function_catalog import FunctionCatalog
+from gravitino.api.function.function_change import (
+ AddDefinition,
+ AddImpl,
+ FunctionChange,
+ RemoveDefinition,
+ RemoveImpl,
+ UpdateComment,
+ UpdateImpl,
+)
+from gravitino.api.function.function_definition import FunctionDefinition
+from gravitino.api.function.function_type import FunctionType
+from gravitino.client.generic_function import GenericFunction
+from gravitino.dto.function.function_definition_dto import
FunctionDefinitionDTO
+from gravitino.dto.function.function_impl_dto import (
+ function_impl_dto_from_function_impl,
+)
+from gravitino.dto.function.function_param_dto import FunctionParamDTO
+from gravitino.dto.requests.function_register_request import
FunctionRegisterRequest
+from gravitino.dto.requests.function_update_request import (
+ AddDefinitionRequest,
+ AddImplRequest,
+ FunctionUpdateRequest,
+ RemoveDefinitionRequest,
+ RemoveImplRequest,
+ UpdateCommentRequest,
+ UpdateImplRequest,
+)
+from gravitino.dto.requests.function_updates_request import
FunctionUpdatesRequest
+from gravitino.dto.responses.drop_response import DropResponse
+from gravitino.dto.responses.entity_list_response import EntityListResponse
+from gravitino.dto.responses.function_list_response import FunctionListResponse
+from gravitino.dto.responses.function_response import FunctionResponse
+from gravitino.exceptions.base import IllegalArgumentException
+from gravitino.exceptions.handlers.function_error_handler import (
+ FUNCTION_ERROR_HANDLER,
+)
+from gravitino.name_identifier import NameIdentifier
+from gravitino.namespace import Namespace
+from gravitino.rest import rest_utils
+
+
+class FunctionCatalogOperations(FunctionCatalog):
+ """Function catalog operations helper class that provides implementations
for function management.
+
+ This class is used by catalogs that support function operations.
+ """
+
+ def __init__(self, rest_client, catalog_namespace: Namespace,
catalog_name: str):
+ """Create a FunctionCatalogOperations instance.
+
+ Args:
+ rest_client: The REST client for making API calls.
+ catalog_namespace: The namespace of the catalog.
+ catalog_name: The name of the catalog.
+ """
+ self._rest_client = rest_client
+ self._catalog_namespace = catalog_namespace
+ self._catalog_name = catalog_name
+
+ def list_functions(self, namespace: Namespace) -> List[NameIdentifier]:
+ """List the functions in a schema namespace from the catalog.
+
+ Args:
+ namespace: A schema namespace. This namespace should have 1 level,
+ which is the schema name.
+
+ Returns:
+ A list of function identifiers in the namespace.
+
+ Raises:
+ NoSuchSchemaException: If the schema does not exist.
+ """
+ self._check_function_namespace(namespace)
+
+ full_namespace = self._get_function_full_namespace(namespace)
+ resp = self._rest_client.get(
+ self._format_function_request_path(full_namespace),
+ error_handler=FUNCTION_ERROR_HANDLER,
+ )
+
+ entity_list_response = EntityListResponse.from_json(resp.body)
+ entity_list_response.validate()
+
+ return [
+ NameIdentifier.of(ident.namespace().level(2), ident.name())
+ for ident in entity_list_response.identifiers()
+ ]
+
+ def list_function_infos(self, namespace: Namespace) -> List[Function]:
+ """List the functions with details in a schema namespace from the
catalog.
+
+ Args:
+ namespace: A namespace.
+
+ Returns:
+ A list of functions in the namespace.
+
+ Raises:
+ NoSuchSchemaException: If the schema does not exist.
+ """
+ self._check_function_namespace(namespace)
+
+ full_namespace = self._get_function_full_namespace(namespace)
+ params = {"details": "true"}
+ resp = self._rest_client.get(
+ self._format_function_request_path(full_namespace),
+ params=params,
+ error_handler=FUNCTION_ERROR_HANDLER,
+ )
+
+ function_list_response = FunctionListResponse.from_json(resp.body)
+ function_list_response.validate()
+
+ return [GenericFunction(func) for func in
function_list_response.functions()]
+
+ def get_function(self, ident: NameIdentifier) -> Function:
+ """Get a function by NameIdentifier from the catalog.
+
+ Args:
+ ident: A function identifier, which should be "schema.function"
format.
+
+ Returns:
+ The function metadata.
+
+ Raises:
+ NoSuchFunctionException: If the function does not exist.
+ """
+ self._check_function_name_identifier(ident)
+
+ full_namespace = self._get_function_full_namespace(ident.namespace())
+ resp = self._rest_client.get(
+ f"{self._format_function_request_path(full_namespace)}/"
+ f"{rest_utils.encode_string(ident.name())}",
+ error_handler=FUNCTION_ERROR_HANDLER,
+ )
+
+ function_response = FunctionResponse.from_json(resp.body)
+ function_response.validate()
+
+ return GenericFunction(function_response.function())
+
+ def register_function(
+ self,
+ ident: NameIdentifier,
+ comment: Optional[str],
+ function_type: FunctionType,
+ deterministic: bool,
+ definitions: List[FunctionDefinition],
+ ) -> Function:
+ """Register a function with one or more definitions (overloads).
+
+ Args:
+ ident: The function identifier.
+ comment: The optional function comment.
+ function_type: The function type.
+ deterministic: Whether the function is deterministic.
+ definitions: The function definitions.
+
+ Returns:
+ The registered function.
+
+ Raises:
+ NoSuchSchemaException: If the schema does not exist.
+ FunctionAlreadyExistsException: If the function already exists.
+ """
+ self._check_function_name_identifier(ident)
+
+ full_namespace = self._get_function_full_namespace(ident.namespace())
+
+ # Convert definitions to DTOs
+ definition_dtos = [
+ FunctionDefinitionDTO.from_function_definition(d) for d in
definitions
+ ]
+
+ req = FunctionRegisterRequest(
+ name=ident.name(),
+ function_type=function_type,
+ deterministic=deterministic,
+ definitions=definition_dtos,
+ comment=comment,
+ )
+ req.validate()
+
+ resp = self._rest_client.post(
+ self._format_function_request_path(full_namespace),
+ req,
+ error_handler=FUNCTION_ERROR_HANDLER,
+ )
+
+ function_response = FunctionResponse.from_json(resp.body)
+ function_response.validate()
+
+ return GenericFunction(function_response.function())
+
+ def alter_function(
+ self, ident: NameIdentifier, *changes: FunctionChange
+ ) -> Function:
+ """Applies FunctionChange changes to a function in the catalog.
+
+ Args:
+ ident: The NameIdentifier instance of the function to alter.
+ changes: The FunctionChange instances to apply to the function.
+
+ Returns:
+ The updated Function instance.
+
+ Raises:
+ NoSuchFunctionException: If the function does not exist.
+ IllegalArgumentException: If the change is rejected by the
implementation.
+ """
+ self._check_function_name_identifier(ident)
+ if not changes:
+ raise IllegalArgumentException(
+ "At least one FunctionChange must be provided to
alter_function"
+ )
+
+ full_namespace = self._get_function_full_namespace(ident.namespace())
+ updates = [self._to_function_update_request(change) for change in
changes]
+ req = FunctionUpdatesRequest(updates)
+ req.validate()
+
+ resp = self._rest_client.put(
+ f"{self._format_function_request_path(full_namespace)}/"
+ f"{rest_utils.encode_string(ident.name())}",
+ req,
+ error_handler=FUNCTION_ERROR_HANDLER,
+ )
+
+ function_response = FunctionResponse.from_json(resp.body)
+ function_response.validate()
+
+ return GenericFunction(function_response.function())
+
+ def drop_function(self, ident: NameIdentifier) -> bool:
+ """Drop a function from the catalog.
+
+ Args:
+ ident: A function identifier, which should be "schema.function"
format.
+
+ Returns:
+ True if the function is dropped, False if the function did not
exist.
+ """
+ self._check_function_name_identifier(ident)
+
+ full_namespace = self._get_function_full_namespace(ident.namespace())
+ resp = self._rest_client.delete(
+ f"{self._format_function_request_path(full_namespace)}/"
+ f"{rest_utils.encode_string(ident.name())}",
+ error_handler=FUNCTION_ERROR_HANDLER,
+ )
+
+ drop_response = DropResponse.from_json(resp.body)
+ drop_response.validate()
+
+ return drop_response.dropped()
+
+ def _format_function_request_path(self, ns: Namespace) -> str:
+ """Format the request path for function operations.
+
+ Args:
+ ns: The full namespace (metalake.catalog.schema).
+
+ Returns:
+ The request path.
+ """
+ return (
+ f"api/metalakes/{rest_utils.encode_string(ns.level(0))}"
+ f"/catalogs/{rest_utils.encode_string(ns.level(1))}"
+ f"/schemas/{rest_utils.encode_string(ns.level(2))}"
+ f"/functions"
+ )
+
+ def _check_function_namespace(self, namespace: Namespace):
+ """Check whether the namespace of a function is valid.
+
+ Args:
+ namespace: The namespace to check.
+
+ Raises:
+ IllegalArgumentException: If the namespace is invalid.
+ """
+ if namespace is None or namespace.length() != 1:
+ raise IllegalArgumentException(
+ f"Function namespace must be non-null and have 1 level, "
+ f"the input namespace is {namespace}"
+ )
+
+ def _check_function_name_identifier(self, ident: NameIdentifier):
+ """Check whether the NameIdentifier of a function is valid.
+
+ Args:
+ ident: The NameIdentifier to check.
+
+ Raises:
+ IllegalArgumentException: If the identifier is invalid.
+ """
+ if ident is None:
+ raise IllegalArgumentException("NameIdentifier must not be null")
+ if not ident.name():
+ raise IllegalArgumentException("NameIdentifier name must not be
empty")
+ self._check_function_namespace(ident.namespace())
+
+ def _get_function_full_namespace(self, function_namespace: Namespace) ->
Namespace:
+ """Get the full namespace of the function with the given function's
short namespace.
+
+ Args:
+ function_namespace: The function's short namespace (schema name).
+
+ Returns:
+ The full namespace (metalake.catalog.schema).
+ """
+ return Namespace.of(
+ self._catalog_namespace.level(0),
+ self._catalog_name,
+ function_namespace.level(0),
+ )
+
+ def _to_function_update_request(
+ self, change: FunctionChange
+ ) -> FunctionUpdateRequest:
+ """Convert a FunctionChange to a FunctionUpdateRequest.
+
+ Args:
+ change: The function change.
+
+ Returns:
+ The corresponding update request.
+
+ Raises:
+ IllegalArgumentException: If the change type is not supported.
+ """
+ if isinstance(change, UpdateComment):
+ return UpdateCommentRequest(change.new_comment())
+ if isinstance(change, AddDefinition):
+ definition_dto = FunctionDefinitionDTO.from_function_definition(
+ change.definition()
+ )
+ return AddDefinitionRequest(definition_dto)
+ if isinstance(change, RemoveDefinition):
+ param_dtos = [
+ FunctionParamDTO.from_function_param(p) for p in
change.parameters()
+ ]
+ return RemoveDefinitionRequest(param_dtos)
+ if isinstance(change, AddImpl):
+ param_dtos = [
+ FunctionParamDTO.from_function_param(p) for p in
change.parameters()
+ ]
+ impl_dto =
function_impl_dto_from_function_impl(change.implementation())
+ return AddImplRequest(param_dtos, impl_dto)
+ if isinstance(change, UpdateImpl):
+ param_dtos = [
+ FunctionParamDTO.from_function_param(p) for p in
change.parameters()
+ ]
+ impl_dto =
function_impl_dto_from_function_impl(change.implementation())
+ return UpdateImplRequest(param_dtos, change.runtime().name,
impl_dto)
+ if isinstance(change, RemoveImpl):
+ param_dtos = [
+ FunctionParamDTO.from_function_param(p) for p in
change.parameters()
+ ]
+ return RemoveImplRequest(param_dtos, change.runtime().name)
+ raise IllegalArgumentException(f"Unknown function change type:
{type(change)}")
diff --git a/clients/client-python/gravitino/client/generic_function.py
b/clients/client-python/gravitino/client/generic_function.py
new file mode 100644
index 0000000000..634084b291
--- /dev/null
+++ b/clients/client-python/gravitino/client/generic_function.py
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List, Optional
+
+from gravitino.api.function.function import Function
+from gravitino.api.function.function_definition import FunctionDefinition
+from gravitino.api.function.function_type import FunctionType
+from gravitino.dto.audit_dto import AuditDTO
+from gravitino.dto.function.function_dto import FunctionDTO
+
+
+class GenericFunction(Function):
+ """A generic implementation of the Function interface."""
+
+ def __init__(self, function_dto: FunctionDTO):
+ """Create a GenericFunction from a FunctionDTO.
+
+ Args:
+ function_dto: The function DTO.
+ """
+ self._function_dto = function_dto
+
+ def name(self) -> str:
+ """Returns the function name."""
+ return self._function_dto.name()
+
+ def function_type(self) -> FunctionType:
+ """Returns the function type."""
+ return self._function_dto.function_type()
+
+ def deterministic(self) -> bool:
+ """Returns whether the function is deterministic."""
+ return self._function_dto.deterministic()
+
+ def comment(self) -> Optional[str]:
+ """Returns the optional comment of the function."""
+ return self._function_dto.comment()
+
+ def definitions(self) -> List[FunctionDefinition]:
+ """Returns the definitions of the function."""
+ return self._function_dto.definitions()
+
+ def audit_info(self) -> Optional[AuditDTO]:
+ """Returns the audit information."""
+ return self._function_dto.audit_info()
diff --git a/clients/client-python/gravitino/client/relational_catalog.py
b/clients/client-python/gravitino/client/relational_catalog.py
index 54d0265591..2330334589 100644
--- a/clients/client-python/gravitino/client/relational_catalog.py
+++ b/clients/client-python/gravitino/client/relational_catalog.py
@@ -40,7 +40,9 @@ from gravitino.rest.rest_utils import encode_string
from gravitino.utils import HTTPClient
-class RelationalCatalog(BaseSchemaCatalog, TableCatalog):
+class RelationalCatalog(
+ BaseSchemaCatalog, TableCatalog
+): # pylint: disable=too-many-ancestors
"""Relational catalog is a catalog implementation
The `RelationalCatalog` supports relational database like metadata
operations,
diff --git a/clients/client-python/gravitino/dto/function/function_impl_dto.py
b/clients/client-python/gravitino/dto/function/function_impl_dto.py
index 110e38cd8a..4834923dcd 100644
--- a/clients/client-python/gravitino/dto/function/function_impl_dto.py
+++ b/clients/client-python/gravitino/dto/function/function_impl_dto.py
@@ -48,6 +48,7 @@ class SQLImplDTO(FunctionImplDTO):
_runtime: str = field(metadata=config(field_name="runtime"))
_sql: str = field(metadata=config(field_name="sql"))
+ _language: str = field(default="SQL",
metadata=config(field_name="language"))
_resources: Optional[FunctionResourcesDTO] = field(
default=None, metadata=config(field_name="resources")
)
@@ -117,6 +118,7 @@ class JavaImplDTO(FunctionImplDTO):
_runtime: str = field(metadata=config(field_name="runtime"))
_class_name: str = field(metadata=config(field_name="className"))
+ _language: str = field(default="JAVA",
metadata=config(field_name="language"))
_resources: Optional[FunctionResourcesDTO] = field(
default=None, metadata=config(field_name="resources")
)
@@ -186,6 +188,7 @@ class PythonImplDTO(FunctionImplDTO):
_runtime: str = field(metadata=config(field_name="runtime"))
_handler: str = field(metadata=config(field_name="handler"))
+ _language: str = field(default="PYTHON",
metadata=config(field_name="language"))
_code_block: Optional[str] = field(
default=None, metadata=config(field_name="codeBlock")
)
diff --git a/clients/client-python/tests/integration/test_function_catalog.py
b/clients/client-python/tests/integration/test_function_catalog.py
new file mode 100644
index 0000000000..ec515785ea
--- /dev/null
+++ b/clients/client-python/tests/integration/test_function_catalog.py
@@ -0,0 +1,295 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from random import randint
+
+from gravitino import Catalog, GravitinoAdminClient, GravitinoClient,
NameIdentifier
+from gravitino.api.function.function_change import FunctionChange
+from gravitino.api.function.function_definition import FunctionDefinitions
+from gravitino.api.function.function_impl import FunctionImpl
+from gravitino.api.function.function_param import FunctionParams
+from gravitino.api.function.function_type import FunctionType
+from gravitino.api.function.sql_impl import SQLImpl
+from gravitino.api.rel.types.types import Types
+from gravitino.exceptions.base import (
+ FunctionAlreadyExistsException,
+ NoSuchFunctionException,
+)
+from gravitino.namespace import Namespace
+from tests.integration.integration_test_env import IntegrationTestEnv
+
+
+class TestFunctionCatalog(IntegrationTestEnv):
+ """Integration tests for Function catalog operations."""
+
+ _metalake_name: str = "function_it_metalake" + str(randint(0, 1000))
+ _catalog_name: str = "function_it_catalog" + str(randint(0, 1000))
+ _schema_name: str = "function_it_schema" + str(randint(0, 1000))
+
+ _gravitino_admin_client: GravitinoAdminClient = None
+ _gravitino_client: GravitinoClient = None
+ _catalog: Catalog = None
+
+ @classmethod
+ def setUpClass(cls):
+ """Set up the integration test environment."""
+ super().setUpClass()
+
+ cls._gravitino_admin_client =
GravitinoAdminClient(uri="http://localhost:8090")
+ cls._gravitino_admin_client.create_metalake(
+ cls._metalake_name, comment="comment", properties={}
+ )
+
+ cls._gravitino_client = GravitinoClient(
+ uri="http://localhost:8090", metalake_name=cls._metalake_name
+ )
+ cls._catalog = cls._gravitino_client.create_catalog(
+ name=cls._catalog_name,
+ catalog_type=Catalog.Type.FILESET,
+ provider="hadoop",
+ comment="comment",
+ properties={},
+ )
+
+ @classmethod
+ def tearDownClass(cls):
+ """Tear down the integration test environment."""
+ cls._gravitino_client.drop_catalog(name=cls._catalog_name, force=True)
+ cls._gravitino_admin_client.drop_metalake(name=cls._metalake_name,
force=True)
+
+ super().tearDownClass()
+
+ def setUp(self):
+ """Set up schema before each test."""
+ self._catalog.as_schemas().create_schema(self._schema_name, "comment",
{})
+
+ def tearDown(self):
+ """Drop schema after each test."""
+ self._catalog.as_schemas().drop_schema(self._schema_name, True)
+
+ def _create_spark_impl(self, sql: str = "SELECT 1") -> SQLImpl:
+ """Create a Spark SQL implementation for testing."""
+ return (
+ SQLImpl.builder()
+ .with_runtime_type(SQLImpl.RuntimeType.SPARK)
+ .with_sql(sql)
+ .build()
+ )
+
+ def _create_trino_impl(self, sql: str = "SELECT 1") -> SQLImpl:
+ """Create a Trino SQL implementation for testing."""
+ return (
+ SQLImpl.builder()
+ .with_runtime_type(SQLImpl.RuntimeType.TRINO)
+ .with_sql(sql)
+ .build()
+ )
+
+ def _create_scalar_definition(self):
+ """Create a scalar function definition with no parameters for
testing."""
+ return FunctionDefinitions.of(
+ [], Types.IntegerType.get(), [self._create_spark_impl()]
+ )
+
+ def test_create_get_function(self):
+ """Test creating and retrieving a function."""
+ function_name = "function_it_function" + str(randint(0, 1000))
+ function_ident = NameIdentifier.of(self._schema_name, function_name)
+ comment = "comment"
+ definition = self._create_scalar_definition()
+
+ # Test create function
+ function = self._catalog.as_function_catalog().register_function(
+ ident=function_ident,
+ comment=comment,
+ function_type=FunctionType.SCALAR,
+ deterministic=True,
+ definitions=[definition],
+ )
+ self.assertEqual(function_name, function.name())
+ self.assertEqual(comment, function.comment())
+ self.assertEqual(FunctionType.SCALAR, function.function_type())
+ self.assertTrue(function.deterministic())
+
+ # Test get function
+ function =
self._catalog.as_function_catalog().get_function(function_ident)
+ self.assertEqual(function_name, function.name())
+ self.assertEqual(comment, function.comment())
+ self.assertEqual(FunctionType.SCALAR, function.function_type())
+ self.assertEqual(
+ Types.IntegerType.get(), function.definitions()[0].return_type()
+ )
+
+ # Test create exists function
+ with self.assertRaises(FunctionAlreadyExistsException):
+ self._catalog.as_function_catalog().register_function(
+ ident=function_ident,
+ comment=comment,
+ function_type=FunctionType.SCALAR,
+ deterministic=True,
+ definitions=[definition],
+ )
+
+ def test_list_functions(self):
+ """Test listing functions in a schema."""
+ function_name1 = "function_it_function1" + str(randint(0, 1000))
+ function_name2 = "function_it_function2" + str(randint(0, 1000))
+ function_ident1 = NameIdentifier.of(self._schema_name, function_name1)
+ function_ident2 = NameIdentifier.of(self._schema_name, function_name2)
+ definition = self._create_scalar_definition()
+
+ self._catalog.as_function_catalog().register_function(
+ ident=function_ident1,
+ comment="comment",
+ function_type=FunctionType.SCALAR,
+ deterministic=True,
+ definitions=[definition],
+ )
+ self._catalog.as_function_catalog().register_function(
+ ident=function_ident2,
+ comment="comment",
+ function_type=FunctionType.SCALAR,
+ deterministic=True,
+ definitions=[definition],
+ )
+
+ idents = self._catalog.as_function_catalog().list_functions(
+ Namespace.of(self._schema_name)
+ )
+ self.assertEqual(2, len(idents))
+ self.assertTrue(function_ident1 in idents)
+ self.assertTrue(function_ident2 in idents)
+
+ functions = self._catalog.as_function_catalog().list_function_infos(
+ Namespace.of(self._schema_name)
+ )
+ self.assertEqual(2, len(functions))
+ self.assertEqual(function_name1, functions[0].name())
+ self.assertEqual(function_name2, functions[1].name())
+
+ def test_alter_function(self):
+ """Test altering a function with all supported FunctionChange types."""
+ function_name = "function_it_function" + str(randint(0, 1000))
+ function_ident = NameIdentifier.of(self._schema_name, function_name)
+ definition = self._create_scalar_definition()
+
+ self._catalog.as_function_catalog().register_function(
+ ident=function_ident,
+ comment="comment",
+ function_type=FunctionType.SCALAR,
+ deterministic=True,
+ definitions=[definition],
+ )
+
+ # Test update comment
+ function = self._catalog.as_function_catalog().alter_function(
+ function_ident, FunctionChange.update_comment("new comment")
+ )
+ self.assertEqual("new comment", function.comment())
+ function =
self._catalog.as_function_catalog().get_function(function_ident)
+ self.assertEqual("new comment", function.comment())
+
+ # Test add_definition: add an overload with one parameter
+ x_param = FunctionParams.of("x", Types.IntegerType.get())
+ new_definition = FunctionDefinitions.of(
+ [x_param], Types.IntegerType.get(),
[self._create_spark_impl("SELECT x")]
+ )
+ function = self._catalog.as_function_catalog().alter_function(
+ function_ident, FunctionChange.add_definition(new_definition)
+ )
+ self.assertEqual(2, len(function.definitions()))
+
+ function =
self._catalog.as_function_catalog().get_function(function_ident)
+ self.assertEqual(2, len(function.definitions()))
+
+ # Test add_impl: add a Trino implementation to the no-arg definition
+ trino_impl = self._create_trino_impl()
+ function = self._catalog.as_function_catalog().alter_function(
+ function_ident,
+ FunctionChange.add_impl([], trino_impl),
+ )
+ no_arg_def = next(d for d in function.definitions() if not
d.parameters())
+ impl_runtimes = [i.runtime() for i in no_arg_def.impls()]
+ self.assertIn(FunctionImpl.RuntimeType.TRINO, impl_runtimes)
+
+ function =
self._catalog.as_function_catalog().get_function(function_ident)
+ no_arg_def = next(d for d in function.definitions() if not
d.parameters())
+ impl_runtimes = [i.runtime() for i in no_arg_def.impls()]
+ self.assertIn(FunctionImpl.RuntimeType.TRINO, impl_runtimes)
+
+ # Test update_impl: replace the Trino implementation with new SQL
+ updated_trino_impl = self._create_trino_impl("SELECT 2")
+ function = self._catalog.as_function_catalog().alter_function(
+ function_ident,
+ FunctionChange.update_impl([], FunctionImpl.RuntimeType.TRINO,
updated_trino_impl),
+ )
+ no_arg_def = next(d for d in function.definitions() if not
d.parameters())
+ trino_impls = [i for i in no_arg_def.impls() if i.runtime() ==
FunctionImpl.RuntimeType.TRINO]
+ self.assertEqual(1, len(trino_impls))
+ self.assertEqual("SELECT 2", trino_impls[0].sql())
+
+ function =
self._catalog.as_function_catalog().get_function(function_ident)
+ no_arg_def = next(d for d in function.definitions() if not
d.parameters())
+ trino_impls = [i for i in no_arg_def.impls() if i.runtime() ==
FunctionImpl.RuntimeType.TRINO]
+ self.assertEqual("SELECT 2", trino_impls[0].sql())
+
+ # Test remove_impl: remove the Trino implementation from the no-arg
definition
+ function = self._catalog.as_function_catalog().alter_function(
+ function_ident,
+ FunctionChange.remove_impl([], FunctionImpl.RuntimeType.TRINO),
+ )
+ no_arg_def = next(d for d in function.definitions() if not
d.parameters())
+ impl_runtimes = [i.runtime() for i in no_arg_def.impls()]
+ self.assertNotIn(FunctionImpl.RuntimeType.TRINO, impl_runtimes)
+
+ function =
self._catalog.as_function_catalog().get_function(function_ident)
+ no_arg_def = next(d for d in function.definitions() if not
d.parameters())
+ impl_runtimes = [i.runtime() for i in no_arg_def.impls()]
+ self.assertNotIn(FunctionImpl.RuntimeType.TRINO, impl_runtimes)
+
+ # Test remove_definition: remove the parameterized overload
+ function = self._catalog.as_function_catalog().alter_function(
+ function_ident, FunctionChange.remove_definition([x_param])
+ )
+ self.assertEqual(1, len(function.definitions()))
+
+ function =
self._catalog.as_function_catalog().get_function(function_ident)
+ self.assertEqual(1, len(function.definitions()))
+
+ def test_drop_function(self):
+ """Test dropping a function."""
+ function_name = "function_it_function" + str(randint(0, 1000))
+ function_ident = NameIdentifier.of(self._schema_name, function_name)
+ definition = self._create_scalar_definition()
+
+ self._catalog.as_function_catalog().register_function(
+ ident=function_ident,
+ comment="comment",
+ function_type=FunctionType.SCALAR,
+ deterministic=True,
+ definitions=[definition],
+ )
+
+ self.assertTrue(
+ self._catalog.as_function_catalog().drop_function(function_ident)
+ )
+ self.assertFalse(
+ self._catalog.as_function_catalog().drop_function(function_ident)
+ )
+
+ with self.assertRaises(NoSuchFunctionException):
+ self._catalog.as_function_catalog().get_function(function_ident)
diff --git a/clients/client-python/tests/unittests/test_function_catalog.py
b/clients/client-python/tests/unittests/test_function_catalog.py
new file mode 100644
index 0000000000..a29da86c9c
--- /dev/null
+++ b/clients/client-python/tests/unittests/test_function_catalog.py
@@ -0,0 +1,252 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest.mock import patch, Mock
+
+from gravitino.api.function.function_change import FunctionChange
+from gravitino.api.function.function_type import FunctionType
+from gravitino.api.rel.types.types import Types
+from gravitino.client.relational_catalog import RelationalCatalog
+from gravitino.dto.audit_dto import AuditDTO
+from gravitino.dto.function.function_definition_dto import
FunctionDefinitionDTO
+from gravitino.dto.function.function_dto import FunctionDTO
+from gravitino.dto.function.function_impl_dto import (
+ SQLImplDTO,
+)
+from gravitino.dto.function.function_param_dto import FunctionParamDTO
+from gravitino.dto.responses.drop_response import DropResponse
+from gravitino.dto.responses.entity_list_response import EntityListResponse
+from gravitino.dto.responses.function_list_response import FunctionListResponse
+from gravitino.dto.responses.function_response import FunctionResponse
+from gravitino.exceptions.base import (
+ NoSuchSchemaException,
+ NoSuchFunctionException,
+ FunctionAlreadyExistsException,
+)
+from gravitino.name_identifier import NameIdentifier
+from gravitino.namespace import Namespace
+from gravitino.utils import HTTPClient, Response
+
+
+class TestFunctionCatalog(unittest.TestCase):
+ metalake_name = "test_metalake"
+ catalog_name = "test_catalog"
+ schema_name = "test_schema"
+ catalog_namespace = Namespace.of(metalake_name)
+
+ @classmethod
+ def setUpClass(cls) -> None:
+ cls.rest_client = HTTPClient("http://localhost:8090")
+ cls.catalog = RelationalCatalog(
+ catalog_namespace=cls.catalog_namespace,
+ name=cls.catalog_name,
+ catalog_type=RelationalCatalog.Type.RELATIONAL,
+ provider="test_provider",
+ audit=AuditDTO("anonymous"),
+ rest_client=cls.rest_client,
+ )
+
+ def _get_mock_http_resp(self, json_str: str, return_code: int = 200):
+ mock_http_resp = Mock()
+ mock_http_resp.getcode.return_value = return_code
+ mock_http_resp.read.return_value = json_str.encode("utf-8")
+ mock_http_resp.info.return_value = None
+ mock_http_resp.url = None
+ mock_resp = Response(mock_http_resp)
+ return mock_resp
+
+ def _mock_function_dto(
+ self, name, function_type, comment, deterministic
+ ) -> FunctionDTO:
+ params = [FunctionParamDTO(_name="param1",
_data_type=Types.IntegerType.get())]
+ impl = SQLImplDTO(
+ _runtime="SPARK",
+ _sql="SELECT param1 + 1",
+ _resources=None,
+ _properties={},
+ )
+ definition = FunctionDefinitionDTO(
+ _parameters=params, _return_type=Types.IntegerType.get(),
_impls=[impl]
+ )
+ return FunctionDTO(
+ _name=name,
+ _definitions=[definition],
+ _function_type=function_type,
+ _deterministic=deterministic,
+ _comment=comment,
+ _audit=AuditDTO(
+ "creator", "2022-01-01T00:00:00Z", "modifier",
"2022-01-01T00:00:00Z"
+ ),
+ )
+
+ def test_list_functions(self):
+ func1 = NameIdentifier.of(
+ self.metalake_name, self.catalog_name, self.schema_name, "func1"
+ )
+ func2 = NameIdentifier.of(
+ self.metalake_name, self.catalog_name, self.schema_name, "func2"
+ )
+
+ resp_body = EntityListResponse(_code=0, _idents=[func1, func2])
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
return_value=mock_resp
+ ):
+ functions = self.catalog.as_function_catalog().list_functions(
+ Namespace.of(self.schema_name)
+ )
+ self.assertEqual(2, len(functions))
+ self.assertEqual("func1", functions[0].name())
+ self.assertEqual("func2", functions[1].name())
+
+ # Test NoSuchSchemaException
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
+ side_effect=NoSuchSchemaException("schema not found"),
+ ):
+ with self.assertRaises(NoSuchSchemaException):
+ self.catalog.as_function_catalog().list_functions(
+ Namespace.of(self.schema_name)
+ )
+
+ def test_list_function_infos(self):
+ func1 = self._mock_function_dto("func1", FunctionType.SCALAR,
"comment1", True)
+ func2 = self._mock_function_dto("func2", FunctionType.SCALAR,
"comment2", False)
+
+ resp_body = FunctionListResponse(_code=0, _functions=[func1, func2])
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
return_value=mock_resp
+ ):
+ functions = self.catalog.as_function_catalog().list_function_infos(
+ Namespace.of(self.schema_name)
+ )
+ self.assertEqual(2, len(functions))
+ self.assertEqual("func1", functions[0].name())
+ self.assertEqual("func2", functions[1].name())
+ self.assertEqual("comment1", functions[0].comment())
+ self.assertEqual("comment2", functions[1].comment())
+
+ def test_get_function(self):
+ ident = NameIdentifier.of(self.schema_name, "func1")
+ mock_function = self._mock_function_dto(
+ "func1", FunctionType.SCALAR, "mock comment", True
+ )
+ resp_body = FunctionResponse(_code=0, _function=mock_function)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
return_value=mock_resp
+ ):
+ func = self.catalog.as_function_catalog().get_function(ident)
+ self.assertIsNotNone(func)
+ self.assertEqual("func1", func.name())
+ self.assertEqual("mock comment", func.comment())
+
+ # Test NoSuchFunctionException
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
+ side_effect=NoSuchFunctionException("function not found"),
+ ):
+ with self.assertRaises(NoSuchFunctionException):
+ self.catalog.as_function_catalog().get_function(ident)
+
+ def test_register_function(self):
+ ident = NameIdentifier.of(self.schema_name, "func1")
+ mock_function = self._mock_function_dto(
+ "func1", FunctionType.SCALAR, "mock comment", True
+ )
+ resp_body = FunctionResponse(_code=0, _function=mock_function)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.post",
return_value=mock_resp
+ ):
+ func = self.catalog.as_function_catalog().register_function(
+ ident,
+ "mock comment",
+ FunctionType.SCALAR,
+ True,
+ definitions=mock_function.definitions(),
+ )
+ self.assertIsNotNone(func)
+ self.assertEqual("func1", func.name())
+ self.assertEqual("mock comment", func.comment())
+
+ # Test FunctionAlreadyExistsException
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.post",
+ side_effect=FunctionAlreadyExistsException("function already
exists"),
+ ):
+ with self.assertRaises(FunctionAlreadyExistsException):
+ self.catalog.as_function_catalog().register_function(
+ ident,
+ "mock comment",
+ FunctionType.SCALAR,
+ True,
+ definitions=mock_function.definitions(),
+ )
+
+ def test_alter_function(self):
+ ident = NameIdentifier.of(self.schema_name, "func1")
+ mock_function = self._mock_function_dto(
+ "func1", FunctionType.SCALAR, "updated comment", True
+ )
+ resp_body = FunctionResponse(_code=0, _function=mock_function)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.put",
return_value=mock_resp
+ ):
+ func = self.catalog.as_function_catalog().alter_function(
+ ident, FunctionChange.update_comment("updated comment")
+ )
+ self.assertIsNotNone(func)
+ self.assertEqual("updated comment", func.comment())
+
+ # Test NoSuchFunctionException
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.put",
+ side_effect=NoSuchFunctionException("function not found"),
+ ):
+ with self.assertRaises(NoSuchFunctionException):
+ self.catalog.as_function_catalog().alter_function(
+ ident, FunctionChange.update_comment("updated comment")
+ )
+
+ def test_drop_function(self):
+ ident = NameIdentifier.of(self.schema_name, "func1")
+ resp_body = DropResponse(_code=0, _dropped=True)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.delete",
return_value=mock_resp
+ ):
+ dropped = self.catalog.as_function_catalog().drop_function(ident)
+ self.assertTrue(dropped)
+
+ # Test function not exists
+ resp_body = DropResponse(_code=0, _dropped=False)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.delete",
return_value=mock_resp
+ ):
+ dropped = self.catalog.as_function_catalog().drop_function(ident)
+ self.assertFalse(dropped)