This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5427a958c7 [python] Support REST function API in pypaimon (#7559)
5427a958c7 is described below
commit 5427a958c73fdbcbb1a30e61b65e9d9c3d0f1b95
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Apr 2 20:03:35 2026 +0800
[python] Support REST function API in pypaimon (#7559)
---
paimon-python/pypaimon/api/api_request.py | 62 ++++
paimon-python/pypaimon/api/api_response.py | 226 ++++++++++++++
paimon-python/pypaimon/api/resource_paths.py | 16 +
paimon-python/pypaimon/api/rest_api.py | 154 +++++++++-
.../pypaimon/catalog/catalog_exception.py | 5 +
.../pypaimon/catalog/rest/rest_catalog.py | 107 ++++++-
paimon-python/pypaimon/common/identifier.py | 3 +
paimon-python/pypaimon/function/__init__.py | 23 ++
paimon-python/pypaimon/function/function.py | 122 ++++++++
paimon-python/pypaimon/function/function_change.py | 146 +++++++++
.../pypaimon/function/function_definition.py | 192 ++++++++++++
.../pypaimon/tests/rest/rest_function_test.py | 327 +++++++++++++++++++++
paimon-python/pypaimon/tests/rest/rest_server.py | 223 +++++++++++++-
13 files changed, 1600 insertions(+), 6 deletions(-)
diff --git a/paimon-python/pypaimon/api/api_request.py
b/paimon-python/pypaimon/api/api_request.py
index 6d1af78894..b7c7c1966b 100644
--- a/paimon-python/pypaimon/api/api_request.py
+++ b/paimon-python/pypaimon/api/api_request.py
@@ -22,6 +22,9 @@ from typing import Dict, List, Optional
from pypaimon.common.identifier import Identifier
from pypaimon.common.json_util import json_field
+from pypaimon.function.function_change import FunctionChange
+from pypaimon.function.function_definition import FunctionDefinition
+from pypaimon.schema.data_types import DataField
from pypaimon.schema.schema import Schema
from pypaimon.schema.schema_change import SchemaChange
from pypaimon.snapshot.snapshot import Snapshot
@@ -94,3 +97,62 @@ class RollbackTableRequest(RESTRequest):
instant: Instant = json_field(FIELD_INSTANT)
from_snapshot: Optional[int] = json_field(FIELD_FROM_SNAPSHOT)
+
+
+@dataclass
+class CreateFunctionRequest(RESTRequest):
+ FIELD_NAME = "name"
+ FIELD_INPUT_PARAMS = "inputParams"
+ FIELD_RETURN_PARAMS = "returnParams"
+ FIELD_DETERMINISTIC = "deterministic"
+ FIELD_DEFINITIONS = "definitions"
+ FIELD_COMMENT = "comment"
+ FIELD_OPTIONS = "options"
+
+ name: str = json_field(FIELD_NAME)
+ input_params: Optional[List[DataField]] = json_field(FIELD_INPUT_PARAMS,
default=None)
+ return_params: Optional[List[DataField]] = json_field(FIELD_RETURN_PARAMS,
default=None)
+ deterministic: bool = json_field(FIELD_DETERMINISTIC, default=False)
+ definitions: Optional[Dict[str, FunctionDefinition]] =
json_field(FIELD_DEFINITIONS, default=None)
+ comment: Optional[str] = json_field(FIELD_COMMENT, default=None)
+ options: Optional[Dict[str, str]] = json_field(FIELD_OPTIONS, default=None)
+
+ def to_dict(self) -> Dict:
+ result = {
+ self.FIELD_NAME: self.name,
+ self.FIELD_DETERMINISTIC: self.deterministic,
+ }
+ if self.input_params is not None:
+ result[self.FIELD_INPUT_PARAMS] = [
+ p.to_dict() if hasattr(p, 'to_dict') else p for p in
self.input_params
+ ]
+ else:
+ result[self.FIELD_INPUT_PARAMS] = None
+ if self.return_params is not None:
+ result[self.FIELD_RETURN_PARAMS] = [
+ p.to_dict() if hasattr(p, 'to_dict') else p for p in
self.return_params
+ ]
+ else:
+ result[self.FIELD_RETURN_PARAMS] = None
+ if self.definitions is not None:
+ result[self.FIELD_DEFINITIONS] = {
+ k: v.to_dict() if hasattr(v, 'to_dict') else v
+ for k, v in self.definitions.items()
+ }
+ else:
+ result[self.FIELD_DEFINITIONS] = None
+ result[self.FIELD_COMMENT] = self.comment
+ result[self.FIELD_OPTIONS] = self.options
+ return result
+
+
+@dataclass
+class AlterFunctionRequest(RESTRequest):
+ FIELD_CHANGES = "changes"
+
+ changes: List[FunctionChange] = json_field(FIELD_CHANGES)
+
+ def to_dict(self) -> Dict:
+ return {
+ self.FIELD_CHANGES: [c.to_dict() for c in self.changes]
+ }
diff --git a/paimon-python/pypaimon/api/api_response.py
b/paimon-python/pypaimon/api/api_response.py
index 64afc50b9b..819b2c45f3 100644
--- a/paimon-python/pypaimon/api/api_response.py
+++ b/paimon-python/pypaimon/api/api_response.py
@@ -20,8 +20,10 @@ from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Generic, List, Optional
+from pypaimon.common.identifier import Identifier
from pypaimon.common.json_util import T, json_field
from pypaimon.common.options import Options
+from pypaimon.schema.data_types import DataField
from pypaimon.schema.schema import Schema
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
from pypaimon.snapshot.table_snapshot import TableSnapshot
@@ -327,3 +329,227 @@ class GetTableSnapshotResponse(RESTResponse):
def get_snapshot(self) -> Optional[TableSnapshot]:
return self.snapshot
+
+
+@dataclass
+class GetFunctionResponse(AuditRESTResponse):
+ """Response for getting a function."""
+ FIELD_UUID = "uuid"
+ FIELD_NAME = "name"
+ FIELD_INPUT_PARAMS = "inputParams"
+ FIELD_RETURN_PARAMS = "returnParams"
+ FIELD_DETERMINISTIC = "deterministic"
+ FIELD_DEFINITIONS = "definitions"
+ FIELD_COMMENT = "comment"
+ FIELD_OPTIONS = "options"
+
+ uuid: Optional[str] = json_field(FIELD_UUID, default=None)
+ name: Optional[str] = json_field(FIELD_NAME, default=None)
+ input_params: Optional[List[DataField]] = json_field(FIELD_INPUT_PARAMS,
default=None)
+ return_params: Optional[List[DataField]] = json_field(FIELD_RETURN_PARAMS,
default=None)
+ deterministic: bool = json_field(FIELD_DETERMINISTIC, default=False)
+ definitions: Optional[Dict[str, 'FunctionDefinition']] =
json_field(FIELD_DEFINITIONS, default=None)
+ comment: Optional[str] = json_field(FIELD_COMMENT, default=None)
+ options: Optional[Dict[str, str]] = json_field(FIELD_OPTIONS, default=None)
+
+ def __init__(
+ self,
+ uuid: Optional[str] = None,
+ name: Optional[str] = None,
+ input_params: Optional[List[DataField]] = None,
+ return_params: Optional[List[DataField]] = None,
+ deterministic: bool = False,
+ definitions: Optional[Dict[str, 'FunctionDefinition']] = None,
+ comment: Optional[str] = None,
+ options: Optional[Dict[str, str]] = None,
+ owner: Optional[str] = None,
+ created_at: Optional[int] = None,
+ created_by: Optional[str] = None,
+ updated_at: Optional[int] = None,
+ updated_by: Optional[str] = None,
+ ):
+ super().__init__(owner, created_at, created_by, updated_at, updated_by)
+ self.uuid = uuid
+ self.name = name
+ self.input_params = input_params
+ self.return_params = return_params
+ self.deterministic = deterministic
+ self.definitions = definitions
+ self.comment = comment
+ self.options = options
+
+ def to_function(self, identifier):
+ from pypaimon.function.function import FunctionImpl
+ return FunctionImpl(
+ identifier=identifier,
+ input_params=self.input_params,
+ return_params=self.return_params,
+ deterministic=self.deterministic,
+ definitions=self.definitions or {},
+ comment=self.comment,
+ options=self.options or {},
+ )
+
+ @staticmethod
+ def _parse_data_fields(raw: Optional[list]) -> Optional[List[DataField]]:
+ if raw is None:
+ return None
+ return [DataField.from_dict(f) if isinstance(f, dict) else f for f in
raw]
+
+ @staticmethod
+ def _parse_definitions(raw) -> Optional[Dict[str, 'FunctionDefinition']]:
+ from pypaimon.function.function_definition import FunctionDefinition
+ if raw is None:
+ return None
+ return {
+ k: FunctionDefinition.from_dict(v) if isinstance(v, dict) else v
+ for k, v in raw.items()
+ }
+
+ @classmethod
+ def from_dict(cls, data: Dict) -> "GetFunctionResponse":
+ return cls(
+ uuid=data.get("uuid"),
+ name=data.get("name"),
+ input_params=cls._parse_data_fields(data.get("inputParams")),
+ return_params=cls._parse_data_fields(data.get("returnParams")),
+ deterministic=data.get("deterministic", False),
+ definitions=cls._parse_definitions(data.get("definitions")),
+ comment=data.get("comment"),
+ options=data.get("options"),
+ owner=data.get("owner"),
+ created_at=data.get("createdAt"),
+ created_by=data.get("createdBy"),
+ updated_at=data.get("updatedAt"),
+ updated_by=data.get("updatedBy"),
+ )
+
+ def to_dict(self) -> Dict:
+ result = {}
+ if self.uuid is not None:
+ result["uuid"] = self.uuid
+ result["name"] = self.name
+ result["inputParams"] = (
+ [p.to_dict() if hasattr(p, 'to_dict') else p for p in
self.input_params]
+ if self.input_params is not None else None
+ )
+ result["returnParams"] = (
+ [p.to_dict() if hasattr(p, 'to_dict') else p for p in
self.return_params]
+ if self.return_params is not None else None
+ )
+ result["deterministic"] = self.deterministic
+ if self.definitions is not None:
+ result["definitions"] = {
+ k: v.to_dict() if hasattr(v, 'to_dict') else v
+ for k, v in self.definitions.items()
+ }
+ else:
+ result["definitions"] = None
+ result["comment"] = self.comment
+ result["options"] = self.options
+ if self.owner is not None:
+ result["owner"] = self.owner
+ if self.created_at is not None:
+ result["createdAt"] = self.created_at
+ if self.created_by is not None:
+ result["createdBy"] = self.created_by
+ if self.updated_at is not None:
+ result["updatedAt"] = self.updated_at
+ if self.updated_by is not None:
+ result["updatedBy"] = self.updated_by
+ return result
+
+
+@dataclass
+class ListFunctionsResponse(PagedResponse[str]):
+ """Response for listing functions."""
+ FIELD_FUNCTIONS = "functions"
+
+ functions: Optional[List[str]] = json_field(FIELD_FUNCTIONS, default=None)
+ next_page_token: Optional[str] = json_field(
+ PagedResponse.FIELD_NEXT_PAGE_TOKEN, default=None)
+
+ def data(self) -> Optional[List[str]]:
+ return self.functions
+
+ def get_next_page_token(self) -> Optional[str]:
+ return self.next_page_token
+
+
+@dataclass
+class ListFunctionDetailsResponse(PagedResponse['GetFunctionResponse']):
+ """Response for listing function details."""
+ FIELD_FUNCTION_DETAILS = "functionDetails"
+
+ function_details: Optional[List[GetFunctionResponse]] = json_field(
+ FIELD_FUNCTION_DETAILS, default=None)
+ next_page_token: Optional[str] = json_field(
+ PagedResponse.FIELD_NEXT_PAGE_TOKEN, default=None)
+
+ def data(self) -> Optional[List[GetFunctionResponse]]:
+ return self.function_details
+
+ def get_next_page_token(self) -> Optional[str]:
+ return self.next_page_token
+
+ @classmethod
+ def from_dict(cls, data: Dict) -> "ListFunctionDetailsResponse":
+ details = data.get("functionDetails")
+ if details is not None:
+ details = [GetFunctionResponse.from_dict(d) for d in details]
+ return cls(
+ function_details=details,
+ next_page_token=data.get("nextPageToken"),
+ )
+
+ def to_dict(self) -> Dict:
+ result = {}
+ if self.function_details is not None:
+ result["functionDetails"] = [d.to_dict() for d in
self.function_details]
+ else:
+ result["functionDetails"] = None
+ result["nextPageToken"] = self.next_page_token
+ return result
+
+
+@dataclass
+class ListFunctionsGloballyResponse(PagedResponse[Identifier]):
+ """Response for listing functions globally across databases."""
+ FIELD_FUNCTIONS = "functions"
+
+ functions: Optional[List[Identifier]] = json_field(FIELD_FUNCTIONS,
default=None)
+ next_page_token: Optional[str] = json_field(
+ PagedResponse.FIELD_NEXT_PAGE_TOKEN, default=None)
+
+ def data(self) -> Optional[List[Identifier]]:
+ return self.functions
+
+ def get_next_page_token(self) -> Optional[str]:
+ return self.next_page_token
+
+ @classmethod
+ def from_dict(cls, data: Dict) -> "ListFunctionsGloballyResponse":
+ functions = data.get("functions")
+ if functions is not None:
+ functions = [
+ Identifier.from_string(f) if isinstance(f, str) else
+ Identifier.create(f.get("database"), f.get("object"))
+ if isinstance(f, dict) else f
+ for f in functions
+ ]
+ return cls(
+ functions=functions,
+ next_page_token=data.get("nextPageToken"),
+ )
+
+ def to_dict(self) -> Dict:
+ result = {}
+ if self.functions is not None:
+ result["functions"] = [
+ {"database": f.get_database_name(), "object":
f.get_object_name()}
+ for f in self.functions
+ ]
+ else:
+ result["functions"] = None
+ result["nextPageToken"] = self.next_page_token
+ return result
diff --git a/paimon-python/pypaimon/api/resource_paths.py
b/paimon-python/pypaimon/api/resource_paths.py
index 9b2351c7e3..aba64018e6 100644
--- a/paimon-python/pypaimon/api/resource_paths.py
+++ b/paimon-python/pypaimon/api/resource_paths.py
@@ -28,6 +28,8 @@ class ResourcePaths:
TABLES = "tables"
TABLE_DETAILS = "table-details"
PARTITIONS = "partitions"
+ FUNCTIONS = "functions"
+ FUNCTION_DETAILS = "function-details"
def __init__(self, prefix: str):
self.base_path = "/{}/{}".format(self.V1, prefix).rstrip("/")
@@ -83,3 +85,17 @@ class ResourcePaths:
def partitions(self, database_name: str, table_name: str) -> str:
return ("{}/{}/{}/{}/{}/{}".format(self.base_path, self.DATABASES,
RESTUtil.encode_string(database_name),
self.TABLES,
RESTUtil.encode_string(table_name), self.PARTITIONS))
+
+ def functions(self, database_name: Optional[str] = None) -> str:
+ if database_name:
+ return "{}/{}/{}/{}".format(self.base_path, self.DATABASES,
+ RESTUtil.encode_string(database_name),
self.FUNCTIONS)
+ return "{}/{}".format(self.base_path, self.FUNCTIONS)
+
+ def function_details(self, database_name: str) -> str:
+ return "{}/{}/{}/{}".format(self.base_path, self.DATABASES,
+ RESTUtil.encode_string(database_name),
self.FUNCTION_DETAILS)
+
+ def function(self, database_name: str, function_name: str) -> str:
+ return "{}/{}/{}/{}/{}".format(self.base_path, self.DATABASES,
RESTUtil.encode_string(database_name),
+ self.FUNCTIONS,
RESTUtil.encode_string(function_name))
diff --git a/paimon-python/pypaimon/api/rest_api.py
b/paimon-python/pypaimon/api/rest_api.py
index 8db56fad0f..e224421131 100755
--- a/paimon-python/pypaimon/api/rest_api.py
+++ b/paimon-python/pypaimon/api/rest_api.py
@@ -18,14 +18,21 @@
import logging
from typing import Callable, Dict, List, Optional, Union
-from pypaimon.api.api_request import (AlterDatabaseRequest, AlterTableRequest,
CommitTableRequest,
- CreateDatabaseRequest,
+import re
+
+from pypaimon.api.api_request import (AlterDatabaseRequest,
AlterFunctionRequest,
+ AlterTableRequest, CommitTableRequest,
+ CreateDatabaseRequest,
CreateFunctionRequest,
CreateTableRequest, RenameTableRequest,
RollbackTableRequest)
from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse,
- GetDatabaseResponse, GetTableResponse,
+ GetDatabaseResponse,
GetFunctionResponse,
+ GetTableResponse,
GetTableTokenResponse,
ListDatabasesResponse,
+ ListFunctionDetailsResponse,
+ ListFunctionsGloballyResponse,
+ ListFunctionsResponse,
ListPartitionsResponse,
ListTablesResponse, PagedList,
PagedResponse, GetTableSnapshotResponse,
@@ -50,9 +57,13 @@ class RESTApi:
DATABASE_NAME_PATTERN = "databaseNamePattern"
TABLE_NAME_PATTERN = "tableNamePattern"
TABLE_TYPE = "tableType"
+ FUNCTION_NAME_PATTERN = "functionNamePattern"
PARTITION_NAME_PATTERN = "partitionNamePattern"
TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000
+ # Function name validation pattern
+ _FUNCTION_NAME_PATTERN = re.compile(r'^(?=.*[A-Za-z])[A-Za-z0-9._-]+$')
+
def __init__(self, options: Union[Options, Dict[str, str]],
config_required: bool = True):
if isinstance(options, dict):
options = Options(options)
@@ -425,6 +436,140 @@ class RESTApi:
partitions = response.data() or []
return PagedList(partitions, response.get_next_page_token())
+ @staticmethod
+ def is_valid_function_name(name: str) -> bool:
+ if not name:
+ return False
+ return RESTApi._FUNCTION_NAME_PATTERN.match(name) is not None
+
+ @staticmethod
+ def check_function_name(name: str) -> None:
+ if not RESTApi.is_valid_function_name(name):
+ raise IllegalArgumentError("Invalid function name: " + str(name))
+
+ def list_functions(self, database_name: str) -> List[str]:
+ return self.__list_data_from_page_api(
+ lambda query_params: self.client.get_with_params(
+ self.resource_paths.functions(database_name),
+ query_params,
+ ListFunctionsResponse,
+ self.rest_auth_function,
+ )
+ )
+
+ def list_functions_paged(
+ self,
+ database_name: str,
+ max_results: Optional[int] = None,
+ page_token: Optional[str] = None,
+ function_name_pattern: Optional[str] = None,
+ ) -> PagedList[str]:
+ response = self.client.get_with_params(
+ self.resource_paths.functions(database_name),
+ self.__build_paged_query_params(
+ max_results,
+ page_token,
+ {self.FUNCTION_NAME_PATTERN: function_name_pattern},
+ ),
+ ListFunctionsResponse,
+ self.rest_auth_function,
+ )
+ functions = response.functions if response.functions else []
+ return PagedList(functions, response.get_next_page_token())
+
+ def list_function_details_paged(
+ self,
+ database_name: str,
+ max_results: Optional[int] = None,
+ page_token: Optional[str] = None,
+ function_name_pattern: Optional[str] = None,
+ ) -> PagedList[GetFunctionResponse]:
+ response = self.client.get_with_params(
+ self.resource_paths.function_details(database_name),
+ self.__build_paged_query_params(
+ max_results,
+ page_token,
+ {self.FUNCTION_NAME_PATTERN: function_name_pattern},
+ ),
+ ListFunctionDetailsResponse,
+ self.rest_auth_function,
+ )
+ function_details = response.data() if response.data() else []
+ return PagedList(function_details, response.get_next_page_token())
+
+ def list_functions_paged_globally(
+ self,
+ database_name_pattern: Optional[str] = None,
+ function_name_pattern: Optional[str] = None,
+ max_results: Optional[int] = None,
+ page_token: Optional[str] = None,
+ ) -> PagedList:
+ response = self.client.get_with_params(
+ self.resource_paths.functions(),
+ self.__build_paged_query_params(
+ max_results,
+ page_token,
+ {
+ self.DATABASE_NAME_PATTERN: database_name_pattern,
+ self.FUNCTION_NAME_PATTERN: function_name_pattern,
+ },
+ ),
+ ListFunctionsGloballyResponse,
+ self.rest_auth_function,
+ )
+ functions = response.data() if response.data() else []
+ return PagedList(functions, response.get_next_page_token())
+
+ def get_function(self, identifier: Identifier) -> GetFunctionResponse:
+ from pypaimon.api.rest_exception import NoSuchResourceException
+ if not self.is_valid_function_name(identifier.get_object_name()):
+ raise NoSuchResourceException(
+ "FUNCTION",
+ identifier.get_object_name(),
+ "Invalid function name: " + identifier.get_object_name(),
+ )
+ return self.client.get(
+ self.resource_paths.function(
+ identifier.get_database_name(), identifier.get_object_name()),
+ GetFunctionResponse,
+ self.rest_auth_function,
+ )
+
+ def create_function(self, identifier: Identifier, function) -> None:
+ self.check_function_name(identifier.get_object_name())
+ request = CreateFunctionRequest(
+ name=function.name(),
+ input_params=function.input_params(),
+ return_params=function.return_params(),
+ deterministic=function.is_deterministic(),
+ definitions=function.definitions(),
+ comment=function.comment(),
+ options=function.options(),
+ )
+ self.client.post(
+ self.resource_paths.functions(identifier.get_database_name()),
+ request,
+ self.rest_auth_function,
+ )
+
+ def drop_function(self, identifier: Identifier) -> None:
+ self.check_function_name(identifier.get_object_name())
+ self.client.delete(
+ self.resource_paths.function(
+ identifier.get_database_name(), identifier.get_object_name()),
+ self.rest_auth_function,
+ )
+
+ def alter_function(self, identifier: Identifier, changes: List) -> None:
+ self.check_function_name(identifier.get_object_name())
+ request = AlterFunctionRequest(changes=changes)
+ self.client.post(
+ self.resource_paths.function(
+ identifier.get_database_name(), identifier.get_object_name()),
+ request,
+ self.rest_auth_function,
+ )
+
@staticmethod
def __validate_identifier(identifier: Identifier):
if not identifier:
@@ -439,3 +584,6 @@ class RESTApi:
raise ValueError("Table name cannot be None")
return database_name.strip(), table_name.strip()
+
+
+from pypaimon.catalog.catalog_exception import IllegalArgumentError
diff --git a/paimon-python/pypaimon/catalog/catalog_exception.py
b/paimon-python/pypaimon/catalog/catalog_exception.py
index 29785167a7..af7636bd64 100644
--- a/paimon-python/pypaimon/catalog/catalog_exception.py
+++ b/paimon-python/pypaimon/catalog/catalog_exception.py
@@ -178,3 +178,8 @@ class TagNotExistException(CatalogException):
def __init__(self, tag: str):
self.tag = tag
super().__init__(f"Tag {tag} does not exist")
+
+
+class IllegalArgumentError(CatalogException):
+ """Illegal argument exception"""
+ pass
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index fc2f516a8b..810fed963b 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -19,14 +19,18 @@ import logging
from typing import Any, Callable, Dict, List, Optional, Union
from pypaimon.api.api_response import GetTableResponse, PagedList,
ErrorResponse
from pypaimon.api.rest_api import RESTApi
-from pypaimon.api.rest_exception import NoSuchResourceException,
AlreadyExistsException, ForbiddenException
+from pypaimon.catalog.catalog_exception import IllegalArgumentError
+from pypaimon.api.rest_exception import (NoSuchResourceException,
AlreadyExistsException,
+ ForbiddenException,
BadRequestException)
from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.catalog_context import CatalogContext
from pypaimon.catalog.catalog_environment import CatalogEnvironment
from pypaimon.catalog.catalog_exception import (
TableNotExistException, DatabaseAlreadyExistException,
TableAlreadyExistException, DatabaseNotExistException,
- TableNoPermissionException, DatabaseNoPermissionException
+ TableNoPermissionException, DatabaseNoPermissionException,
+ FunctionNotExistException, FunctionAlreadyExistException,
+ DefinitionAlreadyExistException, DefinitionNotExistException,
)
from pypaimon.catalog.database import Database
from pypaimon.catalog.rest.property_change import PropertyChange
@@ -365,6 +369,105 @@ class RESTCatalog(Catalog):
except ForbiddenException as e:
raise TableNoPermissionException(identifier) from e
+ def list_functions(self, database_name: str) -> List[str]:
+ try:
+ return self.rest_api.list_functions(database_name)
+ except NoSuchResourceException as e:
+ raise DatabaseNotExistException(database_name) from e
+
+ def get_function(self, identifier: Union[str, Identifier]) -> 'Function':
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ try:
+ response = self.rest_api.get_function(identifier)
+ return response.to_function(identifier)
+ except NoSuchResourceException as e:
+ raise FunctionNotExistException(identifier) from e
+
+ def create_function(self, identifier: Union[str, Identifier],
+ function: 'Function', ignore_if_exists: bool = False)
-> None:
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ try:
+ self.rest_api.create_function(identifier, function)
+ except NoSuchResourceException as e:
+ raise DatabaseNotExistException(identifier.get_database_name())
from e
+ except AlreadyExistsException as e:
+ if ignore_if_exists:
+ return
+ raise FunctionAlreadyExistException(identifier) from e
+
+ def drop_function(self, identifier: Union[str, Identifier],
+ ignore_if_not_exists: bool = False) -> None:
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ try:
+ self.rest_api.drop_function(identifier)
+ except NoSuchResourceException as e:
+ if ignore_if_not_exists:
+ return
+ raise FunctionNotExistException(identifier) from e
+
+ def alter_function(self, identifier: Union[str, Identifier],
+ changes: List['FunctionChange'],
+ ignore_if_not_exists: bool = False) -> None:
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ try:
+ self.rest_api.alter_function(identifier, changes)
+ except AlreadyExistsException as e:
+ raise DefinitionAlreadyExistException(identifier, e.resource_name)
from e
+ except NoSuchResourceException as e:
+ if e.resource_type == ErrorResponse.RESOURCE_TYPE_DEFINITION:
+ raise DefinitionNotExistException(identifier, e.resource_name)
from e
+ if not ignore_if_not_exists:
+ raise FunctionNotExistException(identifier) from e
+ except BadRequestException as e:
+ raise IllegalArgumentError(str(e)) from e
+
+ def list_functions_paged(
+ self,
+ database_name: str,
+ max_results: Optional[int] = None,
+ page_token: Optional[str] = None,
+ function_name_pattern: Optional[str] = None,
+ ) -> PagedList[str]:
+ try:
+ return self.rest_api.list_functions_paged(
+ database_name, max_results, page_token, function_name_pattern)
+ except NoSuchResourceException as e:
+ raise DatabaseNotExistException(database_name) from e
+
+ def list_functions_paged_globally(
+ self,
+ database_name_pattern: Optional[str] = None,
+ function_name_pattern: Optional[str] = None,
+ max_results: Optional[int] = None,
+ page_token: Optional[str] = None,
+ ) -> PagedList[Identifier]:
+ result = self.rest_api.list_functions_paged_globally(
+ database_name_pattern, function_name_pattern, max_results,
page_token)
+ functions = result.elements if result.elements else []
+ return PagedList(functions, result.next_page_token)
+
+ def list_function_details_paged(
+ self,
+ database_name: str,
+ max_results: Optional[int] = None,
+ page_token: Optional[str] = None,
+ function_name_pattern: Optional[str] = None,
+ ) -> PagedList['Function']:
+ try:
+ result = self.rest_api.list_function_details_paged(
+ database_name, max_results, page_token, function_name_pattern)
+ functions = [
+ resp.to_function(Identifier.create(database_name, resp.name))
+ for resp in result.elements
+ ]
+ return PagedList(functions, result.next_page_token)
+ except NoSuchResourceException as e:
+ raise DatabaseNotExistException(database_name) from e
+
def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
try:
response = self.rest_api.get_table(identifier)
diff --git a/paimon-python/pypaimon/common/identifier.py
b/paimon-python/pypaimon/common/identifier.py
index e6ee68e37a..45615233d1 100755
--- a/paimon-python/pypaimon/common/identifier.py
+++ b/paimon-python/pypaimon/common/identifier.py
@@ -103,5 +103,8 @@ class Identifier:
"""Get branch name or return default 'main' if branch is None."""
return self.branch if self.branch else "main"
+ def __hash__(self):
+ return hash((self.database, self.object, self.branch))
+
def is_system_table(self) -> bool:
return self.object.startswith('$')
diff --git a/paimon-python/pypaimon/function/__init__.py
b/paimon-python/pypaimon/function/__init__.py
new file mode 100644
index 0000000000..5dcc290b1e
--- /dev/null
+++ b/paimon-python/pypaimon/function/__init__.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pypaimon.function.function import Function, FunctionImpl # noqa: F401
+from pypaimon.function.function_definition import ( # noqa: F401
+ FunctionDefinition, FunctionFileResource,
+ FileFunctionDefinition, SQLFunctionDefinition, LambdaFunctionDefinition,
+)
+from pypaimon.function.function_change import FunctionChange # noqa: F401
diff --git a/paimon-python/pypaimon/function/function.py
b/paimon-python/pypaimon/function/function.py
new file mode 100644
index 0000000000..a535a39877
--- /dev/null
+++ b/paimon-python/pypaimon/function/function.py
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, List, Optional
+
+from pypaimon.common.identifier import Identifier
+from pypaimon.function.function_definition import FunctionDefinition
+from pypaimon.schema.data_types import DataField
+
+
+class Function:
+ """Interface for a function in Paimon."""
+
+ def name(self) -> str:
+ raise NotImplementedError
+
+ def full_name(self) -> str:
+ raise NotImplementedError
+
+ def identifier(self) -> Identifier:
+ raise NotImplementedError
+
+ def input_params(self) -> Optional[List[DataField]]:
+ raise NotImplementedError
+
+ def return_params(self) -> Optional[List[DataField]]:
+ raise NotImplementedError
+
+ def is_deterministic(self) -> bool:
+ raise NotImplementedError
+
+ def definitions(self) -> Dict[str, FunctionDefinition]:
+ raise NotImplementedError
+
+ def definition(self, dialect: str) -> Optional[FunctionDefinition]:
+ raise NotImplementedError
+
+ def comment(self) -> Optional[str]:
+ raise NotImplementedError
+
+ def options(self) -> Dict[str, str]:
+ raise NotImplementedError
+
+
+class FunctionImpl(Function):
+ """Implementation of Function."""
+
+ def __init__(
+ self,
+ identifier: Identifier,
+ input_params: Optional[List[DataField]] = None,
+ return_params: Optional[List[DataField]] = None,
+ deterministic: bool = False,
+ definitions: Optional[Dict[str, FunctionDefinition]] = None,
+ comment: Optional[str] = None,
+ options: Optional[Dict[str, str]] = None,
+ ):
+ self._identifier = identifier
+ self._input_params = input_params
+ self._return_params = return_params
+ self._deterministic = deterministic
+ self._definitions = definitions or {}
+ self._comment = comment
+ self._options = options or {}
+
+ def name(self) -> str:
+ return self._identifier.get_object_name()
+
+ def full_name(self) -> str:
+ return self._identifier.get_full_name()
+
+ def identifier(self) -> Identifier:
+ return self._identifier
+
+ def input_params(self) -> Optional[List[DataField]]:
+ return self._input_params
+
+ def return_params(self) -> Optional[List[DataField]]:
+ return self._return_params
+
+ def is_deterministic(self) -> bool:
+ return self._deterministic
+
+ def definitions(self) -> Dict[str, FunctionDefinition]:
+ return self._definitions
+
+ def definition(self, dialect: str) -> Optional[FunctionDefinition]:
+ return self._definitions.get(dialect)
+
+ def comment(self) -> Optional[str]:
+ return self._comment
+
+ def options(self) -> Dict[str, str]:
+ return self._options
+
+ def __eq__(self, other):
+ if not isinstance(other, FunctionImpl):
+ return False
+ return (self._identifier == other._identifier
+ and self._input_params == other._input_params
+ and self._return_params == other._return_params
+ and self._deterministic == other._deterministic
+ and self._definitions == other._definitions
+ and self._comment == other._comment
+ and self._options == other._options)
+
+ def __hash__(self):
+ return hash(self._identifier)
diff --git a/paimon-python/pypaimon/function/function_change.py
b/paimon-python/pypaimon/function/function_change.py
new file mode 100644
index 0000000000..90390d7c97
--- /dev/null
+++ b/paimon-python/pypaimon/function/function_change.py
@@ -0,0 +1,146 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Optional
+
+from pypaimon.function.function_definition import FunctionDefinition
+
+
+class Actions:
+ SET_OPTION = "setOption"
+ REMOVE_OPTION = "removeOption"
+ UPDATE_COMMENT = "updateComment"
+ ADD_DEFINITION = "addDefinition"
+ UPDATE_DEFINITION = "updateDefinition"
+ DROP_DEFINITION = "dropDefinition"
+
+
+class FunctionChange:
+ """Represents a change to a function."""
+
+ def __init__(self, action: str):
+ self._action = action
+
+ @staticmethod
+ def set_option(key: str, value: str) -> "SetFunctionOption":
+ return SetFunctionOption(key, value)
+
+ @staticmethod
+ def remove_option(key: str) -> "RemoveFunctionOption":
+ return RemoveFunctionOption(key)
+
+ @staticmethod
+ def update_comment(comment: Optional[str]) -> "UpdateFunctionComment":
+ return UpdateFunctionComment(comment)
+
+ @staticmethod
+ def add_definition(name: str, definition: FunctionDefinition) ->
"AddDefinition":
+ return AddDefinition(name, definition)
+
+ @staticmethod
+ def update_definition(name: str, definition: FunctionDefinition) ->
"UpdateDefinition":
+ return UpdateDefinition(name, definition)
+
+ @staticmethod
+ def drop_definition(name: str) -> "DropDefinition":
+ return DropDefinition(name)
+
+ def to_dict(self) -> Dict:
+ raise NotImplementedError
+
+ @classmethod
+ def from_dict(cls, data: Dict) -> "FunctionChange":
+ action = data.get("action")
+ if action == Actions.SET_OPTION:
+ return SetFunctionOption(data["key"], data["value"])
+ elif action == Actions.REMOVE_OPTION:
+ return RemoveFunctionOption(data["key"])
+ elif action == Actions.UPDATE_COMMENT:
+ return UpdateFunctionComment(data.get("comment"))
+ elif action == Actions.ADD_DEFINITION:
+ return AddDefinition(data["name"],
FunctionDefinition.from_dict(data["definition"]))
+ elif action == Actions.UPDATE_DEFINITION:
+ return UpdateDefinition(data["name"],
FunctionDefinition.from_dict(data["definition"]))
+ elif action == Actions.DROP_DEFINITION:
+ return DropDefinition(data["name"])
+ else:
+ raise ValueError(f"Unknown function change action: {action}")
+
+
+class SetFunctionOption(FunctionChange):
+ def __init__(self, key: str, value: str):
+ super().__init__(Actions.SET_OPTION)
+ self.key = key
+ self.value = value
+
+ def to_dict(self) -> Dict:
+ return {"action": self._action, "key": self.key, "value": self.value}
+
+
+class RemoveFunctionOption(FunctionChange):
+ def __init__(self, key: str):
+ super().__init__(Actions.REMOVE_OPTION)
+ self.key = key
+
+ def to_dict(self) -> Dict:
+ return {"action": self._action, "key": self.key}
+
+
+class UpdateFunctionComment(FunctionChange):
+ def __init__(self, comment: Optional[str]):
+ super().__init__(Actions.UPDATE_COMMENT)
+ self.comment = comment
+
+ def to_dict(self) -> Dict:
+ return {"action": self._action, "comment": self.comment}
+
+
+class AddDefinition(FunctionChange):
+ def __init__(self, name: str, definition: FunctionDefinition):
+ super().__init__(Actions.ADD_DEFINITION)
+ self.name = name
+ self.definition = definition
+
+ def to_dict(self) -> Dict:
+ return {
+ "action": self._action,
+ "name": self.name,
+ "definition": self.definition.to_dict(),
+ }
+
+
+class UpdateDefinition(FunctionChange):
+ def __init__(self, name: str, definition: FunctionDefinition):
+ super().__init__(Actions.UPDATE_DEFINITION)
+ self.name = name
+ self.definition = definition
+
+ def to_dict(self) -> Dict:
+ return {
+ "action": self._action,
+ "name": self.name,
+ "definition": self.definition.to_dict(),
+ }
+
+
+class DropDefinition(FunctionChange):
+ def __init__(self, name: str):
+ super().__init__(Actions.DROP_DEFINITION)
+ self.name = name
+
+ def to_dict(self) -> Dict:
+ return {"action": self._action, "name": self.name}
diff --git a/paimon-python/pypaimon/function/function_definition.py
b/paimon-python/pypaimon/function/function_definition.py
new file mode 100644
index 0000000000..486eb6d722
--- /dev/null
+++ b/paimon-python/pypaimon/function/function_definition.py
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import ABC, abstractmethod
+from typing import Dict, List, Optional
+
+
+class Types:
+ FILE = "file"
+ SQL = "sql"
+ LAMBDA = "lambda"
+
+
+class FunctionFileResource:
+ """Represents a file resource for a function."""
+
+ def __init__(self, resource_type: str, uri: str):
+ self.resource_type = resource_type
+ self.uri = uri
+
+ def to_dict(self) -> Dict:
+ return {
+ "resourceType": self.resource_type,
+ "uri": self.uri,
+ }
+
+ @classmethod
+ def from_dict(cls, data: Dict) -> "FunctionFileResource":
+ return cls(
+ resource_type=data.get("resourceType"),
+ uri=data.get("uri"),
+ )
+
+ def __eq__(self, other):
+ if not isinstance(other, FunctionFileResource):
+ return False
+ return self.resource_type == other.resource_type and self.uri ==
other.uri
+
+ def __hash__(self):
+ return hash((self.resource_type, self.uri))
+
+
+class FunctionDefinition(ABC):
+ """Base class for function definitions."""
+
+ @staticmethod
+ def file(
+ file_resources: List[FunctionFileResource],
+ language: str,
+ class_name: str,
+ function_name: str,
+ ) -> "FunctionDefinition":
+ return FileFunctionDefinition(file_resources, language, class_name,
function_name)
+
+ @staticmethod
+ def sql(body: str) -> "FunctionDefinition":
+ return SQLFunctionDefinition(body)
+
+ @staticmethod
+ def lambda_def(definition: str, language: str) -> "FunctionDefinition":
+ return LambdaFunctionDefinition(definition, language)
+
+ @abstractmethod
+ def to_dict(self) -> Dict:
+ """Convert to dict for JSON serialization."""
+
+ @staticmethod
+ def from_dict(data: Dict) -> Optional["FunctionDefinition"]:
+ if data is None:
+ return None
+ defn_type = data.get("type")
+ if defn_type == Types.FILE:
+ file_resources = data.get("fileResources")
+ if file_resources is not None:
+ file_resources = [
+ FunctionFileResource.from_dict(r) if isinstance(r, dict)
else r
+ for r in file_resources
+ ]
+ return FileFunctionDefinition(
+ file_resources=file_resources,
+ language=data.get("language"),
+ class_name=data.get("className"),
+ function_name=data.get("functionName"),
+ )
+ elif defn_type == Types.SQL:
+ return SQLFunctionDefinition(definition=data.get("definition"))
+ elif defn_type == Types.LAMBDA:
+ return LambdaFunctionDefinition(
+ definition=data.get("definition"),
+ language=data.get("language"),
+ )
+ else:
+ raise ValueError(f"Unknown function definition type: {defn_type}")
+
+
+class FileFunctionDefinition(FunctionDefinition):
+
+ def __init__(
+ self,
+ file_resources: List[FunctionFileResource],
+ language: str,
+ class_name: str,
+ function_name: str,
+ ):
+ self.file_resources = file_resources
+ self.language = language
+ self.class_name = class_name
+ self.function_name = function_name
+
+ def to_dict(self) -> Dict:
+ return {
+ "type": Types.FILE,
+ "fileResources": [
+ r.to_dict() if isinstance(r, FunctionFileResource) else r
+ for r in self.file_resources
+ ] if self.file_resources else None,
+ "language": self.language,
+ "className": self.class_name,
+ "functionName": self.function_name,
+ }
+
+ def __eq__(self, other):
+ if not isinstance(other, FileFunctionDefinition):
+ return False
+ return (self.file_resources == other.file_resources
+ and self.language == other.language
+ and self.class_name == other.class_name
+ and self.function_name == other.function_name)
+
+ def __hash__(self):
+ return hash((
+ tuple(self.file_resources) if self.file_resources else (),
+ self.language,
+ self.class_name,
+ self.function_name,
+ ))
+
+
+class SQLFunctionDefinition(FunctionDefinition):
+
+ def __init__(self, definition: str):
+ self.definition = definition
+
+ def to_dict(self) -> Dict:
+ return {
+ "type": Types.SQL,
+ "definition": self.definition,
+ }
+
+ def __eq__(self, other):
+ if not isinstance(other, SQLFunctionDefinition):
+ return False
+ return self.definition == other.definition
+
+ def __hash__(self):
+ return hash(self.definition)
+
+
+class LambdaFunctionDefinition(FunctionDefinition):
+
+ def __init__(self, definition: str, language: str):
+ self.definition = definition
+ self.language = language
+
+ def to_dict(self) -> Dict:
+ return {
+ "type": Types.LAMBDA,
+ "definition": self.definition,
+ "language": self.language,
+ }
+
+ def __eq__(self, other):
+ if not isinstance(other, LambdaFunctionDefinition):
+ return False
+ return self.definition == other.definition and self.language ==
other.language
+
+ def __hash__(self):
+ return hash((self.definition, self.language))
diff --git a/paimon-python/pypaimon/tests/rest/rest_function_test.py
b/paimon-python/pypaimon/tests/rest/rest_function_test.py
new file mode 100644
index 0000000000..2cad320fb5
--- /dev/null
+++ b/paimon-python/pypaimon/tests/rest/rest_function_test.py
@@ -0,0 +1,327 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import shutil
+import tempfile
+import unittest
+import uuid
+
+from pypaimon.api.api_response import ConfigResponse
+from pypaimon.api.auth import BearTokenAuthProvider
+from pypaimon.api.rest_api import RESTApi, IllegalArgumentError
+from pypaimon.catalog.catalog_exception import (
+ FunctionNotExistException,
+ FunctionAlreadyExistException,
+ DefinitionAlreadyExistException,
+ DefinitionNotExistException,
+)
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+from pypaimon.common.identifier import Identifier
+from pypaimon.common.options import Options
+from pypaimon.function.function import FunctionImpl
+from pypaimon.function.function_change import FunctionChange
+from pypaimon.function.function_definition import (
+ FunctionDefinition, FunctionFileResource,
+)
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.tests.rest.rest_server import RESTCatalogServer
+
+
+def _mock_function(identifier: Identifier) -> FunctionImpl:
+ input_params = [
+ DataField(0, "length", AtomicType("DOUBLE")),
+ DataField(1, "width", AtomicType("DOUBLE")),
+ ]
+ return_params = [
+ DataField(0, "area", AtomicType("DOUBLE")),
+ ]
+ flink_function = FunctionDefinition.file(
+ file_resources=[FunctionFileResource("jar", "/a/b/c.jar")],
+ language="java",
+ class_name="className",
+ function_name="eval",
+ )
+ spark_function = FunctionDefinition.lambda_def(
+ "(Double length, Double width) -> length * width", "java"
+ )
+ trino_function = FunctionDefinition.sql("length * width")
+ definitions = {
+ "flink": flink_function,
+ "spark": spark_function,
+ "trino": trino_function,
+ }
+ return FunctionImpl(
+ identifier=identifier,
+ input_params=input_params,
+ return_params=return_params,
+ deterministic=False,
+ definitions=definitions,
+ comment="comment",
+ options={},
+ )
+
+
+class RESTFunctionTest(unittest.TestCase):
+
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp(prefix="function_test_")
+ self.config = ConfigResponse(defaults={"prefix": "mock-test"})
+ self.token = str(uuid.uuid4())
+ self.server = RESTCatalogServer(
+ data_path=self.temp_dir,
+ auth_provider=BearTokenAuthProvider(self.token),
+ config=self.config,
+ warehouse="warehouse",
+ )
+ self.server.start()
+
+ options = Options({
+ "metastore": "rest",
+ "uri": f"http://localhost:{self.server.port}",
+ "warehouse": "warehouse",
+ "token.provider": "bear",
+ "token": self.token,
+ })
+ self.catalog = RESTCatalog(CatalogContext.create_from_options(options))
+
+ def tearDown(self):
+ self.server.shutdown()
+ import gc
+ gc.collect()
+ shutil.rmtree(self.temp_dir, ignore_errors=True)
+
+ def test_function(self):
+ self.catalog.create_database("rest_catalog_db", True)
+
+ identifier_with_slash = Identifier.create("rest_catalog_db",
"function/")
+ with self.assertRaises(IllegalArgumentError):
+ self.catalog.create_function(
+ identifier_with_slash,
+ _mock_function(identifier_with_slash),
+ False,
+ )
+ with self.assertRaises(FunctionNotExistException):
+ self.catalog.get_function(identifier_with_slash)
+ with self.assertRaises(IllegalArgumentError):
+ self.catalog.drop_function(identifier_with_slash, True)
+
+ identifier_without_alphabet = Identifier.create("rest_catalog_db", "-")
+ with self.assertRaises(IllegalArgumentError):
+ self.catalog.create_function(
+ identifier_without_alphabet,
+ _mock_function(identifier_without_alphabet),
+ False,
+ )
+ with self.assertRaises(FunctionNotExistException):
+ self.catalog.get_function(identifier_without_alphabet)
+ with self.assertRaises(IllegalArgumentError):
+ self.catalog.drop_function(identifier_without_alphabet, True)
+
+ identifier =
Identifier.from_string("rest_catalog_db.function.na_me-01")
+ function = _mock_function(identifier)
+
+ # Drop first to ensure fresh data (not stale from a previous failed
run)
+ self.catalog.drop_function(identifier, True)
+ self.catalog.create_function(identifier, function, True)
+ with self.assertRaises(FunctionAlreadyExistException):
+ self.catalog.create_function(identifier, function, False)
+
+ self.assertIn(function.name(),
self.catalog.list_functions(identifier.get_database_name()))
+
+ get_function = self.catalog.get_function(identifier)
+ self.assertEqual(get_function.name(), function.name())
+ for dialect in function.definitions().keys():
+ self.assertEqual(get_function.definition(dialect),
function.definition(dialect))
+
+ self.catalog.drop_function(identifier, True)
+ self.assertNotIn(function.name(),
self.catalog.list_functions(identifier.get_database_name()))
+
+ with self.assertRaises(FunctionNotExistException):
+ self.catalog.drop_function(identifier, False)
+ with self.assertRaises(FunctionNotExistException):
+ self.catalog.get_function(identifier)
+
+ def test_list_functions(self):
+ db1 = "db_rest_catalog_db"
+ db2 = "db2_rest_catalog"
+ identifier = Identifier.create(db1, "list_function")
+ identifier1 = Identifier.create(db1, "function")
+ identifier2 = Identifier.create(db2, "list_function")
+ identifier3 = Identifier.create(db2, "function")
+
+ self.catalog.create_database(db1, True)
+ self.catalog.create_database(db2, True)
+ self.catalog.create_function(identifier, _mock_function(identifier),
True)
+ self.catalog.create_function(identifier1, _mock_function(identifier1),
True)
+ self.catalog.create_function(identifier2, _mock_function(identifier2),
True)
+ self.catalog.create_function(identifier3, _mock_function(identifier3),
True)
+
+ result = self.catalog.list_functions_paged(db1, None, None, None)
+ self.assertEqual(
+ set(result.elements),
+ {identifier.get_object_name(), identifier1.get_object_name()},
+ )
+
+ result = self.catalog.list_functions_paged(db1, 1, None, None)
+ self.assertEqual(len(result.elements), 1)
+ self.assertIn(
+ result.elements[0],
+ [identifier.get_object_name(), identifier1.get_object_name()],
+ )
+
+ result = self.catalog.list_functions_paged(
+ db1, 1, identifier1.get_object_name(), None)
+ self.assertEqual(
+ result.elements,
+ [identifier.get_object_name()],
+ )
+
+ result = self.catalog.list_functions_paged(db1, None, None, "func%")
+ self.assertEqual(result.elements, [identifier1.get_object_name()])
+
+ result = self.catalog.list_functions_paged_globally("db2_rest%",
"func%", None, None)
+ self.assertEqual(len(result.elements), 1)
+ self.assertEqual(result.elements[0].get_full_name(),
identifier3.get_full_name())
+
+ result = self.catalog.list_functions_paged_globally(
+ "db2_rest%", None, 1, None)
+ self.assertEqual(len(result.elements), 1)
+ self.assertIn(
+ result.elements[0].get_full_name(),
+ [identifier2.get_full_name(), identifier3.get_full_name()],
+ )
+
+ result = self.catalog.list_functions_paged_globally(
+ "db2_rest%", None, 1, identifier3.get_full_name())
+ self.assertEqual(len(result.elements), 1)
+ self.assertEqual(
+ result.elements[0].get_full_name(),
+ identifier2.get_full_name(),
+ )
+
+ result = self.catalog.list_function_details_paged(db1, 1, None, None)
+ self.assertEqual(len(result.elements), 1)
+ self.assertIn(
+ result.elements[0].full_name(),
+ [identifier.get_full_name(), identifier1.get_full_name()],
+ )
+
+ result = self.catalog.list_function_details_paged(db2, 4, None,
"func%")
+ self.assertEqual(len(result.elements), 1)
+ self.assertEqual(
+ result.elements[0].full_name(), identifier3.get_full_name())
+
+ result = self.catalog.list_function_details_paged(
+ db2, 1, identifier3.get_object_name(), None)
+ full_names = [f.full_name() for f in result.elements]
+ self.assertIn(identifier2.get_full_name(), full_names)
+
+ def test_alter_function(self):
+ identifier = Identifier.create("rest_catalog_db",
"alter_function_name")
+ self.catalog.create_database(identifier.get_database_name(), True)
+ self.catalog.drop_function(identifier, True)
+ function = _mock_function(identifier)
+ definition = FunctionDefinition.sql("x * y + 1")
+ add_definition = FunctionChange.add_definition("flink_1", definition)
+
+ self.catalog.alter_function(identifier, [add_definition], True)
+
+ with self.assertRaises(FunctionNotExistException):
+ self.catalog.alter_function(identifier, [add_definition], False)
+
+ self.catalog.create_function(identifier, function, True)
+
+ key = str(uuid.uuid4())
+ value = str(uuid.uuid4())
+ set_option = FunctionChange.set_option(key, value)
+ self.catalog.alter_function(identifier, [set_option], False)
+ catalog_function = self.catalog.get_function(identifier)
+ self.assertEqual(catalog_function.options().get(key), value)
+
+ self.catalog.alter_function(identifier,
[FunctionChange.remove_option(key)], False)
+ catalog_function = self.catalog.get_function(identifier)
+ self.assertNotIn(key, catalog_function.options())
+
+ new_comment = "new comment"
+ self.catalog.alter_function(
+ identifier, [FunctionChange.update_comment(new_comment)], False
+ )
+ catalog_function = self.catalog.get_function(identifier)
+ self.assertEqual(catalog_function.comment(), new_comment)
+
+ self.catalog.alter_function(identifier, [add_definition], False)
+ catalog_function = self.catalog.get_function(identifier)
+ self.assertEqual(
+ catalog_function.definition(add_definition.name),
+ add_definition.definition,
+ )
+
+ with self.assertRaises(DefinitionAlreadyExistException):
+ self.catalog.alter_function(identifier, [add_definition], False)
+
+ update_definition = FunctionChange.update_definition("flink_1",
definition)
+ self.catalog.alter_function(identifier, [update_definition], False)
+ catalog_function = self.catalog.get_function(identifier)
+ self.assertEqual(
+ catalog_function.definition(update_definition.name),
+ update_definition.definition,
+ )
+
+ with self.assertRaises(DefinitionNotExistException):
+ self.catalog.alter_function(
+ identifier,
+ [FunctionChange.update_definition("no_exist", definition)],
+ False,
+ )
+
+ drop_definition =
FunctionChange.drop_definition(update_definition.name)
+ self.catalog.alter_function(identifier, [drop_definition], False)
+ catalog_function = self.catalog.get_function(identifier)
+ self.assertIsNone(catalog_function.definition(update_definition.name))
+
+ with self.assertRaises(DefinitionNotExistException):
+ self.catalog.alter_function(identifier, [drop_definition], False)
+
+ def test_validate_function_name(self):
+ self.assertTrue(RESTApi.is_valid_function_name("a"))
+ self.assertTrue(RESTApi.is_valid_function_name("a1_"))
+ self.assertTrue(RESTApi.is_valid_function_name("a-b_c"))
+ self.assertTrue(RESTApi.is_valid_function_name("a-b_c.1"))
+
+ self.assertFalse(RESTApi.is_valid_function_name("a\\/b"))
+ self.assertFalse(RESTApi.is_valid_function_name("a$?b"))
+ self.assertFalse(RESTApi.is_valid_function_name("a@b"))
+ self.assertFalse(RESTApi.is_valid_function_name("a*b"))
+ self.assertFalse(RESTApi.is_valid_function_name("123"))
+ self.assertFalse(RESTApi.is_valid_function_name("_-"))
+ self.assertFalse(RESTApi.is_valid_function_name(""))
+ self.assertFalse(RESTApi.is_valid_function_name(None))
+
+ with self.assertRaises(IllegalArgumentError):
+ RESTApi.check_function_name("a\\/b")
+ with self.assertRaises(IllegalArgumentError):
+ RESTApi.check_function_name("123")
+ with self.assertRaises(IllegalArgumentError):
+ RESTApi.check_function_name("")
+ with self.assertRaises(IllegalArgumentError):
+ RESTApi.check_function_name(None)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py
b/paimon-python/pypaimon/tests/rest/rest_server.py
index 937d99f1bd..c3a4188669 100755
--- a/paimon-python/pypaimon/tests/rest/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest/rest_server.py
@@ -33,7 +33,11 @@ from pypaimon.api.api_request import (AlterDatabaseRequest,
AlterTableRequest,
CreateDatabaseRequest,
CreateTableRequest, RenameTableRequest)
from pypaimon.api.api_response import (ConfigResponse, GetDatabaseResponse,
+ GetFunctionResponse,
GetTableResponse, ListDatabasesResponse,
+ ListFunctionDetailsResponse,
+ ListFunctionsGloballyResponse,
+ ListFunctionsResponse,
ListPartitionsResponse,
ListTablesResponse,
PagedList, Partition,
RESTResponse, ErrorResponse)
@@ -43,7 +47,11 @@ from pypaimon.catalog.catalog_exception import
(DatabaseNoPermissionException,
DatabaseNotExistException,
TableNoPermissionException,
TableNotExistException,
DatabaseAlreadyExistException,
- TableAlreadyExistException)
+ TableAlreadyExistException,
+ FunctionNotExistException,
+ FunctionAlreadyExistException,
+
DefinitionAlreadyExistException,
+ DefinitionNotExistException)
from pypaimon.catalog.rest.table_metadata import TableMetadata
from pypaimon.common.identifier import Identifier
from pypaimon.api.typedef import RESTAuthParameter
@@ -196,6 +204,7 @@ class RESTCatalogServer:
self.table_metadata_store: Dict[str, TableMetadata] = {}
self.table_latest_snapshot_store: Dict[str, str] = {}
self.table_partitions_store: Dict[str, List] = {}
+ self.function_store: Dict[str, Dict] = {} # key: "db.func_name",
value: GetFunctionResponse-like dict
self.no_permission_databases: List[str] = []
self.no_permission_tables: List[str] = []
self.table_token_store: Dict[str, "RESTToken"] = {}
@@ -365,6 +374,10 @@ class RESTCatalogServer:
source_table_dir.rename(destination_table_dir)
return self._mock_response("", 200)
+ # Global functions endpoint (catalog-scoped)
+ if resource_path == self.resource_paths.functions() and method ==
"GET":
+ return self._functions_globally_handle(parameters)
+
database = resource_path.split("/")[4]
# Database-specific endpoints
if
resource_path.startswith(self.resource_paths.database(database)):
@@ -391,6 +404,10 @@ class RESTCatalogServer:
if resource_type.startswith(ResourcePaths.TABLES):
return self._tables_handle(method, data,
database_name, parameters)
+ elif resource_type == ResourcePaths.FUNCTIONS:
+ return self._functions_handle(method, data,
database_name, parameters)
+ elif resource_type == ResourcePaths.FUNCTION_DETAILS:
+ return self._function_details_handle(database_name,
parameters)
elif len(path_parts) >= 3:
# Individual resource operations
@@ -402,6 +419,8 @@ class RESTCatalogServer:
return self._handle_table_resource(method, path_parts,
identifier, data, parameters)
elif resource_type == ResourcePaths.PARTITIONS:
return self._table_partitions_handle(method,
identifier, parameters)
+ elif resource_type == ResourcePaths.FUNCTIONS:
+ return self._function_handle(method, data, identifier)
return self._mock_response(ErrorResponse(None, None, "Not
Found", 404), 404)
@@ -437,6 +456,26 @@ class RESTCatalogServer:
ErrorResponse.RESOURCE_TYPE_TABLE,
e.identifier.get_full_name(), str(e), 409
)
return self._mock_response(response, 409)
+ except FunctionNotExistException as e:
+ response = ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_FUNCTION,
e.identifier.get_object_name(), str(e), 404
+ )
+ return self._mock_response(response, 404)
+ except FunctionAlreadyExistException as e:
+ response = ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_FUNCTION,
e.identifier.get_full_name(), str(e), 409
+ )
+ return self._mock_response(response, 409)
+ except DefinitionAlreadyExistException as e:
+ response = ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_DEFINITION, e.name, str(e), 409
+ )
+ return self._mock_response(response, 409)
+ except DefinitionNotExistException as e:
+ response = ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_DEFINITION, e.name, str(e), 404
+ )
+ return self._mock_response(response, 404)
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
response = ErrorResponse(None, None, str(e), 500)
@@ -489,6 +528,188 @@ class RESTCatalogServer:
return self._mock_response(ErrorResponse(None, None, "Not
Found", 404), 404)
return self._mock_response(ErrorResponse(None, None, "Not Found",
404), 404)
+ # ======================= Function Handlers ===============================
+
+ def _functions_handle(self, method: str, data: str, database_name: str,
+ parameters: Dict[str, str]) -> Tuple[str, int]:
+ """Handle database-scoped function list / create."""
+ if method == "GET":
+ function_name_pattern = parameters.get(FUNCTION_NAME_PATTERN)
+ functions = [
+ key.split(".", 1)[1]
+ for key in self.function_store.keys()
+ if key.startswith(database_name + ".")
+ and (not function_name_pattern or
self._match_name_pattern(key.split(".", 1)[1], function_name_pattern))
+ ]
+ return self._generate_final_list_functions_response(parameters,
functions)
+ elif method == "POST":
+ import json as json_module
+ request_dict = json_module.loads(data)
+ func_name = request_dict.get("name")
+ key = f"{database_name}.{func_name}"
+ if key in self.function_store:
+ identifier = Identifier.create(database_name, func_name)
+ raise FunctionAlreadyExistException(identifier)
+ self.function_store[key] = GetFunctionResponse(
+ uuid=str(uuid.uuid4()),
+ name=func_name,
+ input_params=request_dict.get("inputParams"),
+ return_params=request_dict.get("returnParams"),
+ deterministic=request_dict.get("deterministic", False),
+ definitions=request_dict.get("definitions"),
+ comment=request_dict.get("comment"),
+ options=request_dict.get("options", {}),
+ owner="owner",
+ created_at=1,
+ created_by="owner",
+ updated_at=1,
+ updated_by="owner",
+ )
+ return self._mock_response("", 200)
+ return self._mock_response(ErrorResponse(None, None, "Method Not
Allowed", 405), 405)
+
+ def _function_handle(self, method: str, data: str, identifier: Identifier)
-> Tuple[str, int]:
+ """Handle individual function operations (GET, POST alter, DELETE)."""
+ key = identifier.get_full_name()
+ if method == "GET":
+ if key not in self.function_store:
+ raise FunctionNotExistException(identifier)
+ return self._mock_response(self.function_store[key], 200)
+ elif method == "POST":
+ # Alter function
+ if key not in self.function_store:
+ raise FunctionNotExistException(identifier)
+ import json as json_module
+ request_dict = json_module.loads(data)
+ changes = request_dict.get("changes", [])
+ self._apply_function_changes(identifier, changes)
+ return self._mock_response("", 200)
+ elif method == "DELETE":
+ if key not in self.function_store:
+ raise FunctionNotExistException(identifier)
+ del self.function_store[key]
+ return self._mock_response("", 200)
+ return self._mock_response(ErrorResponse(None, None, "Method Not
Allowed", 405), 405)
+
+ def _function_details_handle(self, database_name: str,
+ parameters: Dict[str, str]) -> Tuple[str,
int]:
+ """Handle function details listing."""
+ function_name_pattern = parameters.get(FUNCTION_NAME_PATTERN)
+ details = []
+ for key, resp in self.function_store.items():
+ if key.startswith(database_name + "."):
+ func_name = key.split(".", 1)[1]
+ if not function_name_pattern or
self._match_name_pattern(func_name, function_name_pattern):
+ details.append(resp)
+ return self._generate_final_list_function_details_response(parameters,
details)
+
+ def _functions_globally_handle(self, parameters: Dict[str, str]) ->
Tuple[str, int]:
+ """Handle catalog-scoped function listing."""
+ database_name_pattern = parameters.get(DATABASE_NAME_PATTERN)
+ function_name_pattern = parameters.get(FUNCTION_NAME_PATTERN)
+ identifiers = []
+ for key in self.function_store.keys():
+ db_name, func_name = key.split(".", 1)
+ if database_name_pattern and not self._match_name_pattern(db_name,
database_name_pattern):
+ continue
+ if function_name_pattern and not
self._match_name_pattern(func_name, function_name_pattern):
+ continue
+ identifiers.append(Identifier.create(db_name, func_name))
+ return
self._generate_final_list_functions_globally_response(parameters, identifiers)
+
+ def _apply_function_changes(self, identifier: Identifier, changes:
List[Dict]) -> None:
+ """Apply function changes to the function store, mirroring Java mock
server logic."""
+ from pypaimon.function.function_change import Actions
+ key = identifier.get_full_name()
+ func_resp = self.function_store[key]
+
+ # Work with mutable copies
+ options = dict(func_resp.options) if func_resp.options else {}
+ definitions = dict(func_resp.definitions) if func_resp.definitions
else {}
+ comment = func_resp.comment
+
+ for change in changes:
+ action = change.get("action")
+ if action == Actions.SET_OPTION:
+ options[change["key"]] = change["value"]
+ elif action == Actions.REMOVE_OPTION:
+ options.pop(change["key"], None)
+ elif action == Actions.UPDATE_COMMENT:
+ comment = change.get("comment")
+ elif action == Actions.ADD_DEFINITION:
+ name = change["name"]
+ if name in definitions:
+ raise DefinitionAlreadyExistException(identifier, name)
+ definitions[name] = change["definition"]
+ elif action == Actions.UPDATE_DEFINITION:
+ name = change["name"]
+ if name not in definitions:
+ raise DefinitionNotExistException(identifier, name)
+ definitions[name] = change["definition"]
+ elif action == Actions.DROP_DEFINITION:
+ name = change["name"]
+ if name not in definitions:
+ raise DefinitionNotExistException(identifier, name)
+ del definitions[name]
+
+ self.function_store[key] = GetFunctionResponse(
+ uuid=func_resp.uuid,
+ name=func_resp.name,
+ input_params=func_resp.input_params,
+ return_params=func_resp.return_params,
+ deterministic=func_resp.deterministic,
+ definitions=definitions,
+ comment=comment,
+ options=options,
+ owner=func_resp.owner,
+ created_at=func_resp.created_at,
+ created_by=func_resp.created_by,
+ updated_at=func_resp.updated_at,
+ updated_by=func_resp.updated_by,
+ )
+
+ def _generate_final_list_functions_response(self, parameters: Dict[str,
str],
+ functions: List[str]) ->
Tuple[str, int]:
+ if functions:
+ max_results = self._get_max_results(parameters)
+ page_token = parameters.get(PAGE_TOKEN)
+ paged = self._build_paged_entities(functions, max_results,
page_token)
+ response = ListFunctionsResponse(
+ functions=paged.elements,
+ next_page_token=paged.next_page_token
+ )
+ else:
+ response = ListFunctionsResponse(functions=[],
next_page_token=None)
+ return self._mock_response(response, 200)
+
+ def _generate_final_list_function_details_response(self, parameters:
Dict[str, str],
+ details: List) ->
Tuple[str, int]:
+ if details:
+ max_results = self._get_max_results(parameters)
+ page_token = parameters.get(PAGE_TOKEN)
+ paged = self._build_paged_entities(details, max_results,
page_token)
+ response = ListFunctionDetailsResponse(
+ function_details=paged.elements,
+ next_page_token=paged.next_page_token,
+ )
+ else:
+ response = ListFunctionDetailsResponse(function_details=[],
next_page_token=None)
+ return self._mock_response(response, 200)
+
+ def _generate_final_list_functions_globally_response(self, parameters:
Dict[str, str],
+ identifiers: List) ->
Tuple[str, int]:
+ if identifiers:
+ max_results = self._get_max_results(parameters)
+ page_token = parameters.get(PAGE_TOKEN)
+ paged = self._build_paged_entities(identifiers, max_results,
page_token)
+ response = ListFunctionsGloballyResponse(
+ functions=paged.elements,
+ next_page_token=paged.next_page_token,
+ )
+ else:
+ response = ListFunctionsGloballyResponse(functions=[],
next_page_token=None)
+ return self._mock_response(response, 200)
+
def _table_partitions_handle(
self, method: str, identifier: Identifier, parameters: Dict[str,
str]) -> Tuple[str, int]:
"""Handle table partitions listing"""