This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5427a958c7 [python] Support REST function API in pypaimon (#7559)
5427a958c7 is described below

commit 5427a958c73fdbcbb1a30e61b65e9d9c3d0f1b95
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Apr 2 20:03:35 2026 +0800

    [python] Support REST function API in pypaimon (#7559)
---
 paimon-python/pypaimon/api/api_request.py          |  62 ++++
 paimon-python/pypaimon/api/api_response.py         | 226 ++++++++++++++
 paimon-python/pypaimon/api/resource_paths.py       |  16 +
 paimon-python/pypaimon/api/rest_api.py             | 154 +++++++++-
 .../pypaimon/catalog/catalog_exception.py          |   5 +
 .../pypaimon/catalog/rest/rest_catalog.py          | 107 ++++++-
 paimon-python/pypaimon/common/identifier.py        |   3 +
 paimon-python/pypaimon/function/__init__.py        |  23 ++
 paimon-python/pypaimon/function/function.py        | 122 ++++++++
 paimon-python/pypaimon/function/function_change.py | 146 +++++++++
 .../pypaimon/function/function_definition.py       | 192 ++++++++++++
 .../pypaimon/tests/rest/rest_function_test.py      | 327 +++++++++++++++++++++
 paimon-python/pypaimon/tests/rest/rest_server.py   | 223 +++++++++++++-
 13 files changed, 1600 insertions(+), 6 deletions(-)

diff --git a/paimon-python/pypaimon/api/api_request.py 
b/paimon-python/pypaimon/api/api_request.py
index 6d1af78894..b7c7c1966b 100644
--- a/paimon-python/pypaimon/api/api_request.py
+++ b/paimon-python/pypaimon/api/api_request.py
@@ -22,6 +22,9 @@ from typing import Dict, List, Optional
 
 from pypaimon.common.identifier import Identifier
 from pypaimon.common.json_util import json_field
+from pypaimon.function.function_change import FunctionChange
+from pypaimon.function.function_definition import FunctionDefinition
+from pypaimon.schema.data_types import DataField
 from pypaimon.schema.schema import Schema
 from pypaimon.schema.schema_change import SchemaChange
 from pypaimon.snapshot.snapshot import Snapshot
@@ -94,3 +97,62 @@ class RollbackTableRequest(RESTRequest):
 
     instant: Instant = json_field(FIELD_INSTANT)
     from_snapshot: Optional[int] = json_field(FIELD_FROM_SNAPSHOT)
+
+
+@dataclass
+class CreateFunctionRequest(RESTRequest):
+    FIELD_NAME = "name"
+    FIELD_INPUT_PARAMS = "inputParams"
+    FIELD_RETURN_PARAMS = "returnParams"
+    FIELD_DETERMINISTIC = "deterministic"
+    FIELD_DEFINITIONS = "definitions"
+    FIELD_COMMENT = "comment"
+    FIELD_OPTIONS = "options"
+
+    name: str = json_field(FIELD_NAME)
+    input_params: Optional[List[DataField]] = json_field(FIELD_INPUT_PARAMS, 
default=None)
+    return_params: Optional[List[DataField]] = json_field(FIELD_RETURN_PARAMS, 
default=None)
+    deterministic: bool = json_field(FIELD_DETERMINISTIC, default=False)
+    definitions: Optional[Dict[str, FunctionDefinition]] = 
json_field(FIELD_DEFINITIONS, default=None)
+    comment: Optional[str] = json_field(FIELD_COMMENT, default=None)
+    options: Optional[Dict[str, str]] = json_field(FIELD_OPTIONS, default=None)
+
+    def to_dict(self) -> Dict:
+        result = {
+            self.FIELD_NAME: self.name,
+            self.FIELD_DETERMINISTIC: self.deterministic,
+        }
+        if self.input_params is not None:
+            result[self.FIELD_INPUT_PARAMS] = [
+                p.to_dict() if hasattr(p, 'to_dict') else p for p in 
self.input_params
+            ]
+        else:
+            result[self.FIELD_INPUT_PARAMS] = None
+        if self.return_params is not None:
+            result[self.FIELD_RETURN_PARAMS] = [
+                p.to_dict() if hasattr(p, 'to_dict') else p for p in 
self.return_params
+            ]
+        else:
+            result[self.FIELD_RETURN_PARAMS] = None
+        if self.definitions is not None:
+            result[self.FIELD_DEFINITIONS] = {
+                k: v.to_dict() if hasattr(v, 'to_dict') else v
+                for k, v in self.definitions.items()
+            }
+        else:
+            result[self.FIELD_DEFINITIONS] = None
+        result[self.FIELD_COMMENT] = self.comment
+        result[self.FIELD_OPTIONS] = self.options
+        return result
+
+
+@dataclass
+class AlterFunctionRequest(RESTRequest):
+    FIELD_CHANGES = "changes"
+
+    changes: List[FunctionChange] = json_field(FIELD_CHANGES)
+
+    def to_dict(self) -> Dict:
+        return {
+            self.FIELD_CHANGES: [c.to_dict() for c in self.changes]
+        }
diff --git a/paimon-python/pypaimon/api/api_response.py 
b/paimon-python/pypaimon/api/api_response.py
index 64afc50b9b..819b2c45f3 100644
--- a/paimon-python/pypaimon/api/api_response.py
+++ b/paimon-python/pypaimon/api/api_response.py
@@ -20,8 +20,10 @@ from abc import ABC, abstractmethod
 from dataclasses import dataclass
 from typing import Dict, Generic, List, Optional
 
+from pypaimon.common.identifier import Identifier
 from pypaimon.common.json_util import T, json_field
 from pypaimon.common.options import Options
+from pypaimon.schema.data_types import DataField
 from pypaimon.schema.schema import Schema
 from pypaimon.snapshot.snapshot_commit import PartitionStatistics
 from pypaimon.snapshot.table_snapshot import TableSnapshot
@@ -327,3 +329,227 @@ class GetTableSnapshotResponse(RESTResponse):
 
     def get_snapshot(self) -> Optional[TableSnapshot]:
         return self.snapshot
+
+
+@dataclass
+class GetFunctionResponse(AuditRESTResponse):
+    """Response for getting a function."""
+    FIELD_UUID = "uuid"
+    FIELD_NAME = "name"
+    FIELD_INPUT_PARAMS = "inputParams"
+    FIELD_RETURN_PARAMS = "returnParams"
+    FIELD_DETERMINISTIC = "deterministic"
+    FIELD_DEFINITIONS = "definitions"
+    FIELD_COMMENT = "comment"
+    FIELD_OPTIONS = "options"
+
+    uuid: Optional[str] = json_field(FIELD_UUID, default=None)
+    name: Optional[str] = json_field(FIELD_NAME, default=None)
+    input_params: Optional[List[DataField]] = json_field(FIELD_INPUT_PARAMS, 
default=None)
+    return_params: Optional[List[DataField]] = json_field(FIELD_RETURN_PARAMS, 
default=None)
+    deterministic: bool = json_field(FIELD_DETERMINISTIC, default=False)
+    definitions: Optional[Dict[str, 'FunctionDefinition']] = 
json_field(FIELD_DEFINITIONS, default=None)
+    comment: Optional[str] = json_field(FIELD_COMMENT, default=None)
+    options: Optional[Dict[str, str]] = json_field(FIELD_OPTIONS, default=None)
+
+    def __init__(
+        self,
+        uuid: Optional[str] = None,
+        name: Optional[str] = None,
+        input_params: Optional[List[DataField]] = None,
+        return_params: Optional[List[DataField]] = None,
+        deterministic: bool = False,
+        definitions: Optional[Dict[str, 'FunctionDefinition']] = None,
+        comment: Optional[str] = None,
+        options: Optional[Dict[str, str]] = None,
+        owner: Optional[str] = None,
+        created_at: Optional[int] = None,
+        created_by: Optional[str] = None,
+        updated_at: Optional[int] = None,
+        updated_by: Optional[str] = None,
+    ):
+        super().__init__(owner, created_at, created_by, updated_at, updated_by)
+        self.uuid = uuid
+        self.name = name
+        self.input_params = input_params
+        self.return_params = return_params
+        self.deterministic = deterministic
+        self.definitions = definitions
+        self.comment = comment
+        self.options = options
+
+    def to_function(self, identifier):
+        from pypaimon.function.function import FunctionImpl
+        return FunctionImpl(
+            identifier=identifier,
+            input_params=self.input_params,
+            return_params=self.return_params,
+            deterministic=self.deterministic,
+            definitions=self.definitions or {},
+            comment=self.comment,
+            options=self.options or {},
+        )
+
+    @staticmethod
+    def _parse_data_fields(raw: Optional[list]) -> Optional[List[DataField]]:
+        if raw is None:
+            return None
+        return [DataField.from_dict(f) if isinstance(f, dict) else f for f in 
raw]
+
+    @staticmethod
+    def _parse_definitions(raw) -> Optional[Dict[str, 'FunctionDefinition']]:
+        from pypaimon.function.function_definition import FunctionDefinition
+        if raw is None:
+            return None
+        return {
+            k: FunctionDefinition.from_dict(v) if isinstance(v, dict) else v
+            for k, v in raw.items()
+        }
+
+    @classmethod
+    def from_dict(cls, data: Dict) -> "GetFunctionResponse":
+        return cls(
+            uuid=data.get("uuid"),
+            name=data.get("name"),
+            input_params=cls._parse_data_fields(data.get("inputParams")),
+            return_params=cls._parse_data_fields(data.get("returnParams")),
+            deterministic=data.get("deterministic", False),
+            definitions=cls._parse_definitions(data.get("definitions")),
+            comment=data.get("comment"),
+            options=data.get("options"),
+            owner=data.get("owner"),
+            created_at=data.get("createdAt"),
+            created_by=data.get("createdBy"),
+            updated_at=data.get("updatedAt"),
+            updated_by=data.get("updatedBy"),
+        )
+
+    def to_dict(self) -> Dict:
+        result = {}
+        if self.uuid is not None:
+            result["uuid"] = self.uuid
+        result["name"] = self.name
+        result["inputParams"] = (
+            [p.to_dict() if hasattr(p, 'to_dict') else p for p in 
self.input_params]
+            if self.input_params is not None else None
+        )
+        result["returnParams"] = (
+            [p.to_dict() if hasattr(p, 'to_dict') else p for p in 
self.return_params]
+            if self.return_params is not None else None
+        )
+        result["deterministic"] = self.deterministic
+        if self.definitions is not None:
+            result["definitions"] = {
+                k: v.to_dict() if hasattr(v, 'to_dict') else v
+                for k, v in self.definitions.items()
+            }
+        else:
+            result["definitions"] = None
+        result["comment"] = self.comment
+        result["options"] = self.options
+        if self.owner is not None:
+            result["owner"] = self.owner
+        if self.created_at is not None:
+            result["createdAt"] = self.created_at
+        if self.created_by is not None:
+            result["createdBy"] = self.created_by
+        if self.updated_at is not None:
+            result["updatedAt"] = self.updated_at
+        if self.updated_by is not None:
+            result["updatedBy"] = self.updated_by
+        return result
+
+
+@dataclass
+class ListFunctionsResponse(PagedResponse[str]):
+    """Response for listing functions."""
+    FIELD_FUNCTIONS = "functions"
+
+    functions: Optional[List[str]] = json_field(FIELD_FUNCTIONS, default=None)
+    next_page_token: Optional[str] = json_field(
+        PagedResponse.FIELD_NEXT_PAGE_TOKEN, default=None)
+
+    def data(self) -> Optional[List[str]]:
+        return self.functions
+
+    def get_next_page_token(self) -> Optional[str]:
+        return self.next_page_token
+
+
+@dataclass
+class ListFunctionDetailsResponse(PagedResponse['GetFunctionResponse']):
+    """Response for listing function details."""
+    FIELD_FUNCTION_DETAILS = "functionDetails"
+
+    function_details: Optional[List[GetFunctionResponse]] = json_field(
+        FIELD_FUNCTION_DETAILS, default=None)
+    next_page_token: Optional[str] = json_field(
+        PagedResponse.FIELD_NEXT_PAGE_TOKEN, default=None)
+
+    def data(self) -> Optional[List[GetFunctionResponse]]:
+        return self.function_details
+
+    def get_next_page_token(self) -> Optional[str]:
+        return self.next_page_token
+
+    @classmethod
+    def from_dict(cls, data: Dict) -> "ListFunctionDetailsResponse":
+        details = data.get("functionDetails")
+        if details is not None:
+            details = [GetFunctionResponse.from_dict(d) for d in details]
+        return cls(
+            function_details=details,
+            next_page_token=data.get("nextPageToken"),
+        )
+
+    def to_dict(self) -> Dict:
+        result = {}
+        if self.function_details is not None:
+            result["functionDetails"] = [d.to_dict() for d in 
self.function_details]
+        else:
+            result["functionDetails"] = None
+        result["nextPageToken"] = self.next_page_token
+        return result
+
+
+@dataclass
+class ListFunctionsGloballyResponse(PagedResponse[Identifier]):
+    """Response for listing functions globally across databases."""
+    FIELD_FUNCTIONS = "functions"
+
+    functions: Optional[List[Identifier]] = json_field(FIELD_FUNCTIONS, 
default=None)
+    next_page_token: Optional[str] = json_field(
+        PagedResponse.FIELD_NEXT_PAGE_TOKEN, default=None)
+
+    def data(self) -> Optional[List[Identifier]]:
+        return self.functions
+
+    def get_next_page_token(self) -> Optional[str]:
+        return self.next_page_token
+
+    @classmethod
+    def from_dict(cls, data: Dict) -> "ListFunctionsGloballyResponse":
+        functions = data.get("functions")
+        if functions is not None:
+            functions = [
+                Identifier.from_string(f) if isinstance(f, str) else
+                Identifier.create(f.get("database"), f.get("object"))
+                if isinstance(f, dict) else f
+                for f in functions
+            ]
+        return cls(
+            functions=functions,
+            next_page_token=data.get("nextPageToken"),
+        )
+
+    def to_dict(self) -> Dict:
+        result = {}
+        if self.functions is not None:
+            result["functions"] = [
+                {"database": f.get_database_name(), "object": 
f.get_object_name()}
+                for f in self.functions
+            ]
+        else:
+            result["functions"] = None
+        result["nextPageToken"] = self.next_page_token
+        return result
diff --git a/paimon-python/pypaimon/api/resource_paths.py 
b/paimon-python/pypaimon/api/resource_paths.py
index 9b2351c7e3..aba64018e6 100644
--- a/paimon-python/pypaimon/api/resource_paths.py
+++ b/paimon-python/pypaimon/api/resource_paths.py
@@ -28,6 +28,8 @@ class ResourcePaths:
     TABLES = "tables"
     TABLE_DETAILS = "table-details"
     PARTITIONS = "partitions"
+    FUNCTIONS = "functions"
+    FUNCTION_DETAILS = "function-details"
 
     def __init__(self, prefix: str):
         self.base_path = "/{}/{}".format(self.V1, prefix).rstrip("/")
@@ -83,3 +85,17 @@ class ResourcePaths:
     def partitions(self, database_name: str, table_name: str) -> str:
         return ("{}/{}/{}/{}/{}/{}".format(self.base_path, self.DATABASES, 
RESTUtil.encode_string(database_name),
                                            self.TABLES, 
RESTUtil.encode_string(table_name), self.PARTITIONS))
+
+    def functions(self, database_name: Optional[str] = None) -> str:
+        if database_name:
+            return "{}/{}/{}/{}".format(self.base_path, self.DATABASES,
+                                        RESTUtil.encode_string(database_name), 
self.FUNCTIONS)
+        return "{}/{}".format(self.base_path, self.FUNCTIONS)
+
+    def function_details(self, database_name: str) -> str:
+        return "{}/{}/{}/{}".format(self.base_path, self.DATABASES,
+                                    RESTUtil.encode_string(database_name), 
self.FUNCTION_DETAILS)
+
+    def function(self, database_name: str, function_name: str) -> str:
+        return "{}/{}/{}/{}/{}".format(self.base_path, self.DATABASES, 
RESTUtil.encode_string(database_name),
+                                       self.FUNCTIONS, 
RESTUtil.encode_string(function_name))
diff --git a/paimon-python/pypaimon/api/rest_api.py 
b/paimon-python/pypaimon/api/rest_api.py
index 8db56fad0f..e224421131 100755
--- a/paimon-python/pypaimon/api/rest_api.py
+++ b/paimon-python/pypaimon/api/rest_api.py
@@ -18,14 +18,21 @@
 import logging
 from typing import Callable, Dict, List, Optional, Union
 
-from pypaimon.api.api_request import (AlterDatabaseRequest, AlterTableRequest, 
CommitTableRequest,
-                                      CreateDatabaseRequest,
+import re
+
+from pypaimon.api.api_request import (AlterDatabaseRequest, 
AlterFunctionRequest,
+                                      AlterTableRequest, CommitTableRequest,
+                                      CreateDatabaseRequest, 
CreateFunctionRequest,
                                       CreateTableRequest, RenameTableRequest,
                                       RollbackTableRequest)
 from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse,
-                                       GetDatabaseResponse, GetTableResponse,
+                                       GetDatabaseResponse, 
GetFunctionResponse,
+                                       GetTableResponse,
                                        GetTableTokenResponse,
                                        ListDatabasesResponse,
+                                       ListFunctionDetailsResponse,
+                                       ListFunctionsGloballyResponse,
+                                       ListFunctionsResponse,
                                        ListPartitionsResponse,
                                        ListTablesResponse, PagedList,
                                        PagedResponse, GetTableSnapshotResponse,
@@ -50,9 +57,13 @@ class RESTApi:
     DATABASE_NAME_PATTERN = "databaseNamePattern"
     TABLE_NAME_PATTERN = "tableNamePattern"
     TABLE_TYPE = "tableType"
+    FUNCTION_NAME_PATTERN = "functionNamePattern"
     PARTITION_NAME_PATTERN = "partitionNamePattern"
     TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000
 
+    # Function name validation pattern
+    _FUNCTION_NAME_PATTERN = re.compile(r'^(?=.*[A-Za-z])[A-Za-z0-9._-]+$')
+
     def __init__(self, options: Union[Options, Dict[str, str]], 
config_required: bool = True):
         if isinstance(options, dict):
             options = Options(options)
@@ -425,6 +436,140 @@ class RESTApi:
         partitions = response.data() or []
         return PagedList(partitions, response.get_next_page_token())
 
+    @staticmethod
+    def is_valid_function_name(name: str) -> bool:
+        if not name:
+            return False
+        return RESTApi._FUNCTION_NAME_PATTERN.match(name) is not None
+
+    @staticmethod
+    def check_function_name(name: str) -> None:
+        if not RESTApi.is_valid_function_name(name):
+            raise IllegalArgumentError("Invalid function name: " + str(name))
+
+    def list_functions(self, database_name: str) -> List[str]:
+        return self.__list_data_from_page_api(
+            lambda query_params: self.client.get_with_params(
+                self.resource_paths.functions(database_name),
+                query_params,
+                ListFunctionsResponse,
+                self.rest_auth_function,
+            )
+        )
+
+    def list_functions_paged(
+            self,
+            database_name: str,
+            max_results: Optional[int] = None,
+            page_token: Optional[str] = None,
+            function_name_pattern: Optional[str] = None,
+    ) -> PagedList[str]:
+        response = self.client.get_with_params(
+            self.resource_paths.functions(database_name),
+            self.__build_paged_query_params(
+                max_results,
+                page_token,
+                {self.FUNCTION_NAME_PATTERN: function_name_pattern},
+            ),
+            ListFunctionsResponse,
+            self.rest_auth_function,
+        )
+        functions = response.functions if response.functions else []
+        return PagedList(functions, response.get_next_page_token())
+
+    def list_function_details_paged(
+            self,
+            database_name: str,
+            max_results: Optional[int] = None,
+            page_token: Optional[str] = None,
+            function_name_pattern: Optional[str] = None,
+    ) -> PagedList[GetFunctionResponse]:
+        response = self.client.get_with_params(
+            self.resource_paths.function_details(database_name),
+            self.__build_paged_query_params(
+                max_results,
+                page_token,
+                {self.FUNCTION_NAME_PATTERN: function_name_pattern},
+            ),
+            ListFunctionDetailsResponse,
+            self.rest_auth_function,
+        )
+        function_details = response.data() if response.data() else []
+        return PagedList(function_details, response.get_next_page_token())
+
+    def list_functions_paged_globally(
+            self,
+            database_name_pattern: Optional[str] = None,
+            function_name_pattern: Optional[str] = None,
+            max_results: Optional[int] = None,
+            page_token: Optional[str] = None,
+    ) -> PagedList:
+        response = self.client.get_with_params(
+            self.resource_paths.functions(),
+            self.__build_paged_query_params(
+                max_results,
+                page_token,
+                {
+                    self.DATABASE_NAME_PATTERN: database_name_pattern,
+                    self.FUNCTION_NAME_PATTERN: function_name_pattern,
+                },
+            ),
+            ListFunctionsGloballyResponse,
+            self.rest_auth_function,
+        )
+        functions = response.data() if response.data() else []
+        return PagedList(functions, response.get_next_page_token())
+
+    def get_function(self, identifier: Identifier) -> GetFunctionResponse:
+        from pypaimon.api.rest_exception import NoSuchResourceException
+        if not self.is_valid_function_name(identifier.get_object_name()):
+            raise NoSuchResourceException(
+                "FUNCTION",
+                identifier.get_object_name(),
+                "Invalid function name: " + identifier.get_object_name(),
+            )
+        return self.client.get(
+            self.resource_paths.function(
+                identifier.get_database_name(), identifier.get_object_name()),
+            GetFunctionResponse,
+            self.rest_auth_function,
+        )
+
+    def create_function(self, identifier: Identifier, function) -> None:
+        self.check_function_name(identifier.get_object_name())
+        request = CreateFunctionRequest(
+            name=function.name(),
+            input_params=function.input_params(),
+            return_params=function.return_params(),
+            deterministic=function.is_deterministic(),
+            definitions=function.definitions(),
+            comment=function.comment(),
+            options=function.options(),
+        )
+        self.client.post(
+            self.resource_paths.functions(identifier.get_database_name()),
+            request,
+            self.rest_auth_function,
+        )
+
+    def drop_function(self, identifier: Identifier) -> None:
+        self.check_function_name(identifier.get_object_name())
+        self.client.delete(
+            self.resource_paths.function(
+                identifier.get_database_name(), identifier.get_object_name()),
+            self.rest_auth_function,
+        )
+
+    def alter_function(self, identifier: Identifier, changes: List) -> None:
+        self.check_function_name(identifier.get_object_name())
+        request = AlterFunctionRequest(changes=changes)
+        self.client.post(
+            self.resource_paths.function(
+                identifier.get_database_name(), identifier.get_object_name()),
+            request,
+            self.rest_auth_function,
+        )
+
     @staticmethod
     def __validate_identifier(identifier: Identifier):
         if not identifier:
@@ -439,3 +584,6 @@ class RESTApi:
             raise ValueError("Table name cannot be None")
 
         return database_name.strip(), table_name.strip()
+
+
+from pypaimon.catalog.catalog_exception import IllegalArgumentError
diff --git a/paimon-python/pypaimon/catalog/catalog_exception.py 
b/paimon-python/pypaimon/catalog/catalog_exception.py
index 29785167a7..af7636bd64 100644
--- a/paimon-python/pypaimon/catalog/catalog_exception.py
+++ b/paimon-python/pypaimon/catalog/catalog_exception.py
@@ -178,3 +178,8 @@ class TagNotExistException(CatalogException):
     def __init__(self, tag: str):
         self.tag = tag
         super().__init__(f"Tag {tag} does not exist")
+
+
+class IllegalArgumentError(CatalogException):
+    """Illegal argument exception"""
+    pass
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index fc2f516a8b..810fed963b 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -19,14 +19,18 @@ import logging
 from typing import Any, Callable, Dict, List, Optional, Union
 from pypaimon.api.api_response import GetTableResponse, PagedList, 
ErrorResponse
 from pypaimon.api.rest_api import RESTApi
-from pypaimon.api.rest_exception import NoSuchResourceException, 
AlreadyExistsException, ForbiddenException
+from pypaimon.catalog.catalog_exception import IllegalArgumentError
+from pypaimon.api.rest_exception import (NoSuchResourceException, 
AlreadyExistsException,
+                                         ForbiddenException, 
BadRequestException)
 from pypaimon.catalog.catalog import Catalog
 from pypaimon.catalog.catalog_context import CatalogContext
 from pypaimon.catalog.catalog_environment import CatalogEnvironment
 from pypaimon.catalog.catalog_exception import (
     TableNotExistException, DatabaseAlreadyExistException,
     TableAlreadyExistException, DatabaseNotExistException,
-    TableNoPermissionException, DatabaseNoPermissionException
+    TableNoPermissionException, DatabaseNoPermissionException,
+    FunctionNotExistException, FunctionAlreadyExistException,
+    DefinitionAlreadyExistException, DefinitionNotExistException,
 )
 from pypaimon.catalog.database import Database
 from pypaimon.catalog.rest.property_change import PropertyChange
@@ -365,6 +369,105 @@ class RESTCatalog(Catalog):
         except ForbiddenException as e:
             raise TableNoPermissionException(identifier) from e
 
+    def list_functions(self, database_name: str) -> List[str]:
+        try:
+            return self.rest_api.list_functions(database_name)
+        except NoSuchResourceException as e:
+            raise DatabaseNotExistException(database_name) from e
+
+    def get_function(self, identifier: Union[str, Identifier]) -> 'Function':
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        try:
+            response = self.rest_api.get_function(identifier)
+            return response.to_function(identifier)
+        except NoSuchResourceException as e:
+            raise FunctionNotExistException(identifier) from e
+
+    def create_function(self, identifier: Union[str, Identifier],
+                        function: 'Function', ignore_if_exists: bool = False) 
-> None:
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        try:
+            self.rest_api.create_function(identifier, function)
+        except NoSuchResourceException as e:
+            raise DatabaseNotExistException(identifier.get_database_name()) 
from e
+        except AlreadyExistsException as e:
+            if ignore_if_exists:
+                return
+            raise FunctionAlreadyExistException(identifier) from e
+
+    def drop_function(self, identifier: Union[str, Identifier],
+                      ignore_if_not_exists: bool = False) -> None:
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        try:
+            self.rest_api.drop_function(identifier)
+        except NoSuchResourceException as e:
+            if ignore_if_not_exists:
+                return
+            raise FunctionNotExistException(identifier) from e
+
+    def alter_function(self, identifier: Union[str, Identifier],
+                       changes: List['FunctionChange'],
+                       ignore_if_not_exists: bool = False) -> None:
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        try:
+            self.rest_api.alter_function(identifier, changes)
+        except AlreadyExistsException as e:
+            raise DefinitionAlreadyExistException(identifier, e.resource_name) 
from e
+        except NoSuchResourceException as e:
+            if e.resource_type == ErrorResponse.RESOURCE_TYPE_DEFINITION:
+                raise DefinitionNotExistException(identifier, e.resource_name) 
from e
+            if not ignore_if_not_exists:
+                raise FunctionNotExistException(identifier) from e
+        except BadRequestException as e:
+            raise IllegalArgumentError(str(e)) from e
+
+    def list_functions_paged(
+            self,
+            database_name: str,
+            max_results: Optional[int] = None,
+            page_token: Optional[str] = None,
+            function_name_pattern: Optional[str] = None,
+    ) -> PagedList[str]:
+        try:
+            return self.rest_api.list_functions_paged(
+                database_name, max_results, page_token, function_name_pattern)
+        except NoSuchResourceException as e:
+            raise DatabaseNotExistException(database_name) from e
+
+    def list_functions_paged_globally(
+            self,
+            database_name_pattern: Optional[str] = None,
+            function_name_pattern: Optional[str] = None,
+            max_results: Optional[int] = None,
+            page_token: Optional[str] = None,
+    ) -> PagedList[Identifier]:
+        result = self.rest_api.list_functions_paged_globally(
+            database_name_pattern, function_name_pattern, max_results, 
page_token)
+        functions = result.elements if result.elements else []
+        return PagedList(functions, result.next_page_token)
+
+    def list_function_details_paged(
+            self,
+            database_name: str,
+            max_results: Optional[int] = None,
+            page_token: Optional[str] = None,
+            function_name_pattern: Optional[str] = None,
+    ) -> PagedList['Function']:
+        try:
+            result = self.rest_api.list_function_details_paged(
+                database_name, max_results, page_token, function_name_pattern)
+            functions = [
+                resp.to_function(Identifier.create(database_name, resp.name))
+                for resp in result.elements
+            ]
+            return PagedList(functions, result.next_page_token)
+        except NoSuchResourceException as e:
+            raise DatabaseNotExistException(database_name) from e
+
     def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
         try:
             response = self.rest_api.get_table(identifier)
diff --git a/paimon-python/pypaimon/common/identifier.py 
b/paimon-python/pypaimon/common/identifier.py
index e6ee68e37a..45615233d1 100755
--- a/paimon-python/pypaimon/common/identifier.py
+++ b/paimon-python/pypaimon/common/identifier.py
@@ -103,5 +103,8 @@ class Identifier:
         """Get branch name or return default 'main' if branch is None."""
         return self.branch if self.branch else "main"
 
+    def __hash__(self):
+        return hash((self.database, self.object, self.branch))
+
     def is_system_table(self) -> bool:
         return self.object.startswith('$')
diff --git a/paimon-python/pypaimon/function/__init__.py 
b/paimon-python/pypaimon/function/__init__.py
new file mode 100644
index 0000000000..5dcc290b1e
--- /dev/null
+++ b/paimon-python/pypaimon/function/__init__.py
@@ -0,0 +1,23 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from pypaimon.function.function import Function, FunctionImpl  # noqa: F401
+from pypaimon.function.function_definition import (  # noqa: F401
+    FunctionDefinition, FunctionFileResource,
+    FileFunctionDefinition, SQLFunctionDefinition, LambdaFunctionDefinition,
+)
+from pypaimon.function.function_change import FunctionChange  # noqa: F401
diff --git a/paimon-python/pypaimon/function/function.py 
b/paimon-python/pypaimon/function/function.py
new file mode 100644
index 0000000000..a535a39877
--- /dev/null
+++ b/paimon-python/pypaimon/function/function.py
@@ -0,0 +1,122 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from typing import Dict, List, Optional
+
+from pypaimon.common.identifier import Identifier
+from pypaimon.function.function_definition import FunctionDefinition
+from pypaimon.schema.data_types import DataField
+
+
+class Function:
+    """Interface for a function in Paimon."""
+
+    def name(self) -> str:
+        raise NotImplementedError
+
+    def full_name(self) -> str:
+        raise NotImplementedError
+
+    def identifier(self) -> Identifier:
+        raise NotImplementedError
+
+    def input_params(self) -> Optional[List[DataField]]:
+        raise NotImplementedError
+
+    def return_params(self) -> Optional[List[DataField]]:
+        raise NotImplementedError
+
+    def is_deterministic(self) -> bool:
+        raise NotImplementedError
+
+    def definitions(self) -> Dict[str, FunctionDefinition]:
+        raise NotImplementedError
+
+    def definition(self, dialect: str) -> Optional[FunctionDefinition]:
+        raise NotImplementedError
+
+    def comment(self) -> Optional[str]:
+        raise NotImplementedError
+
+    def options(self) -> Dict[str, str]:
+        raise NotImplementedError
+
+
+class FunctionImpl(Function):
+    """Implementation of Function."""
+
+    def __init__(
+        self,
+        identifier: Identifier,
+        input_params: Optional[List[DataField]] = None,
+        return_params: Optional[List[DataField]] = None,
+        deterministic: bool = False,
+        definitions: Optional[Dict[str, FunctionDefinition]] = None,
+        comment: Optional[str] = None,
+        options: Optional[Dict[str, str]] = None,
+    ):
+        self._identifier = identifier
+        self._input_params = input_params
+        self._return_params = return_params
+        self._deterministic = deterministic
+        self._definitions = definitions or {}
+        self._comment = comment
+        self._options = options or {}
+
+    def name(self) -> str:
+        return self._identifier.get_object_name()
+
+    def full_name(self) -> str:
+        return self._identifier.get_full_name()
+
+    def identifier(self) -> Identifier:
+        return self._identifier
+
+    def input_params(self) -> Optional[List[DataField]]:
+        return self._input_params
+
+    def return_params(self) -> Optional[List[DataField]]:
+        return self._return_params
+
+    def is_deterministic(self) -> bool:
+        return self._deterministic
+
+    def definitions(self) -> Dict[str, FunctionDefinition]:
+        return self._definitions
+
+    def definition(self, dialect: str) -> Optional[FunctionDefinition]:
+        return self._definitions.get(dialect)
+
+    def comment(self) -> Optional[str]:
+        return self._comment
+
+    def options(self) -> Dict[str, str]:
+        return self._options
+
+    def __eq__(self, other):
+        if not isinstance(other, FunctionImpl):
+            return False
+        return (self._identifier == other._identifier
+                and self._input_params == other._input_params
+                and self._return_params == other._return_params
+                and self._deterministic == other._deterministic
+                and self._definitions == other._definitions
+                and self._comment == other._comment
+                and self._options == other._options)
+
+    def __hash__(self):
+        return hash(self._identifier)
diff --git a/paimon-python/pypaimon/function/function_change.py 
b/paimon-python/pypaimon/function/function_change.py
new file mode 100644
index 0000000000..90390d7c97
--- /dev/null
+++ b/paimon-python/pypaimon/function/function_change.py
@@ -0,0 +1,146 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from typing import Dict, Optional
+
+from pypaimon.function.function_definition import FunctionDefinition
+
+
+class Actions:
+    SET_OPTION = "setOption"
+    REMOVE_OPTION = "removeOption"
+    UPDATE_COMMENT = "updateComment"
+    ADD_DEFINITION = "addDefinition"
+    UPDATE_DEFINITION = "updateDefinition"
+    DROP_DEFINITION = "dropDefinition"
+
+
+class FunctionChange:
+    """Represents a change to a function."""
+
+    def __init__(self, action: str):
+        self._action = action
+
+    @staticmethod
+    def set_option(key: str, value: str) -> "SetFunctionOption":
+        return SetFunctionOption(key, value)
+
+    @staticmethod
+    def remove_option(key: str) -> "RemoveFunctionOption":
+        return RemoveFunctionOption(key)
+
+    @staticmethod
+    def update_comment(comment: Optional[str]) -> "UpdateFunctionComment":
+        return UpdateFunctionComment(comment)
+
+    @staticmethod
+    def add_definition(name: str, definition: FunctionDefinition) -> 
"AddDefinition":
+        return AddDefinition(name, definition)
+
+    @staticmethod
+    def update_definition(name: str, definition: FunctionDefinition) -> 
"UpdateDefinition":
+        return UpdateDefinition(name, definition)
+
+    @staticmethod
+    def drop_definition(name: str) -> "DropDefinition":
+        return DropDefinition(name)
+
+    def to_dict(self) -> Dict:
+        raise NotImplementedError
+
+    @classmethod
+    def from_dict(cls, data: Dict) -> "FunctionChange":
+        action = data.get("action")
+        if action == Actions.SET_OPTION:
+            return SetFunctionOption(data["key"], data["value"])
+        elif action == Actions.REMOVE_OPTION:
+            return RemoveFunctionOption(data["key"])
+        elif action == Actions.UPDATE_COMMENT:
+            return UpdateFunctionComment(data.get("comment"))
+        elif action == Actions.ADD_DEFINITION:
+            return AddDefinition(data["name"], 
FunctionDefinition.from_dict(data["definition"]))
+        elif action == Actions.UPDATE_DEFINITION:
+            return UpdateDefinition(data["name"], 
FunctionDefinition.from_dict(data["definition"]))
+        elif action == Actions.DROP_DEFINITION:
+            return DropDefinition(data["name"])
+        else:
+            raise ValueError(f"Unknown function change action: {action}")
+
+
+class SetFunctionOption(FunctionChange):
+    def __init__(self, key: str, value: str):
+        super().__init__(Actions.SET_OPTION)
+        self.key = key
+        self.value = value
+
+    def to_dict(self) -> Dict:
+        return {"action": self._action, "key": self.key, "value": self.value}
+
+
+class RemoveFunctionOption(FunctionChange):
+    def __init__(self, key: str):
+        super().__init__(Actions.REMOVE_OPTION)
+        self.key = key
+
+    def to_dict(self) -> Dict:
+        return {"action": self._action, "key": self.key}
+
+
+class UpdateFunctionComment(FunctionChange):
+    def __init__(self, comment: Optional[str]):
+        super().__init__(Actions.UPDATE_COMMENT)
+        self.comment = comment
+
+    def to_dict(self) -> Dict:
+        return {"action": self._action, "comment": self.comment}
+
+
+class AddDefinition(FunctionChange):
+    def __init__(self, name: str, definition: FunctionDefinition):
+        super().__init__(Actions.ADD_DEFINITION)
+        self.name = name
+        self.definition = definition
+
+    def to_dict(self) -> Dict:
+        return {
+            "action": self._action,
+            "name": self.name,
+            "definition": self.definition.to_dict(),
+        }
+
+
+class UpdateDefinition(FunctionChange):
+    def __init__(self, name: str, definition: FunctionDefinition):
+        super().__init__(Actions.UPDATE_DEFINITION)
+        self.name = name
+        self.definition = definition
+
+    def to_dict(self) -> Dict:
+        return {
+            "action": self._action,
+            "name": self.name,
+            "definition": self.definition.to_dict(),
+        }
+
+
+class DropDefinition(FunctionChange):
+    def __init__(self, name: str):
+        super().__init__(Actions.DROP_DEFINITION)
+        self.name = name
+
+    def to_dict(self) -> Dict:
+        return {"action": self._action, "name": self.name}
diff --git a/paimon-python/pypaimon/function/function_definition.py 
b/paimon-python/pypaimon/function/function_definition.py
new file mode 100644
index 0000000000..486eb6d722
--- /dev/null
+++ b/paimon-python/pypaimon/function/function_definition.py
@@ -0,0 +1,192 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from abc import ABC, abstractmethod
+from typing import Dict, List, Optional
+
+
+class Types:
+    FILE = "file"
+    SQL = "sql"
+    LAMBDA = "lambda"
+
+
+class FunctionFileResource:
+    """Represents a file resource for a function."""
+
+    def __init__(self, resource_type: str, uri: str):
+        self.resource_type = resource_type
+        self.uri = uri
+
+    def to_dict(self) -> Dict:
+        return {
+            "resourceType": self.resource_type,
+            "uri": self.uri,
+        }
+
+    @classmethod
+    def from_dict(cls, data: Dict) -> "FunctionFileResource":
+        return cls(
+            resource_type=data.get("resourceType"),
+            uri=data.get("uri"),
+        )
+
+    def __eq__(self, other):
+        if not isinstance(other, FunctionFileResource):
+            return False
+        return self.resource_type == other.resource_type and self.uri == 
other.uri
+
+    def __hash__(self):
+        return hash((self.resource_type, self.uri))
+
+
+class FunctionDefinition(ABC):
+    """Base class for function definitions."""
+
+    @staticmethod
+    def file(
+        file_resources: List[FunctionFileResource],
+        language: str,
+        class_name: str,
+        function_name: str,
+    ) -> "FunctionDefinition":
+        return FileFunctionDefinition(file_resources, language, class_name, 
function_name)
+
+    @staticmethod
+    def sql(body: str) -> "FunctionDefinition":
+        return SQLFunctionDefinition(body)
+
+    @staticmethod
+    def lambda_def(definition: str, language: str) -> "FunctionDefinition":
+        return LambdaFunctionDefinition(definition, language)
+
+    @abstractmethod
+    def to_dict(self) -> Dict:
+        """Convert to dict for JSON serialization."""
+
+    @staticmethod
+    def from_dict(data: Dict) -> Optional["FunctionDefinition"]:
+        if data is None:
+            return None
+        defn_type = data.get("type")
+        if defn_type == Types.FILE:
+            file_resources = data.get("fileResources")
+            if file_resources is not None:
+                file_resources = [
+                    FunctionFileResource.from_dict(r) if isinstance(r, dict) 
else r
+                    for r in file_resources
+                ]
+            return FileFunctionDefinition(
+                file_resources=file_resources,
+                language=data.get("language"),
+                class_name=data.get("className"),
+                function_name=data.get("functionName"),
+            )
+        elif defn_type == Types.SQL:
+            return SQLFunctionDefinition(definition=data.get("definition"))
+        elif defn_type == Types.LAMBDA:
+            return LambdaFunctionDefinition(
+                definition=data.get("definition"),
+                language=data.get("language"),
+            )
+        else:
+            raise ValueError(f"Unknown function definition type: {defn_type}")
+
+
+class FileFunctionDefinition(FunctionDefinition):
+
+    def __init__(
+        self,
+        file_resources: List[FunctionFileResource],
+        language: str,
+        class_name: str,
+        function_name: str,
+    ):
+        self.file_resources = file_resources
+        self.language = language
+        self.class_name = class_name
+        self.function_name = function_name
+
+    def to_dict(self) -> Dict:
+        return {
+            "type": Types.FILE,
+            "fileResources": [
+                r.to_dict() if isinstance(r, FunctionFileResource) else r
+                for r in self.file_resources
+            ] if self.file_resources else None,
+            "language": self.language,
+            "className": self.class_name,
+            "functionName": self.function_name,
+        }
+
+    def __eq__(self, other):
+        if not isinstance(other, FileFunctionDefinition):
+            return False
+        return (self.file_resources == other.file_resources
+                and self.language == other.language
+                and self.class_name == other.class_name
+                and self.function_name == other.function_name)
+
+    def __hash__(self):
+        return hash((
+            tuple(self.file_resources) if self.file_resources else (),
+            self.language,
+            self.class_name,
+            self.function_name,
+        ))
+
+
+class SQLFunctionDefinition(FunctionDefinition):
+
+    def __init__(self, definition: str):
+        self.definition = definition
+
+    def to_dict(self) -> Dict:
+        return {
+            "type": Types.SQL,
+            "definition": self.definition,
+        }
+
+    def __eq__(self, other):
+        if not isinstance(other, SQLFunctionDefinition):
+            return False
+        return self.definition == other.definition
+
+    def __hash__(self):
+        return hash(self.definition)
+
+
+class LambdaFunctionDefinition(FunctionDefinition):
+
+    def __init__(self, definition: str, language: str):
+        self.definition = definition
+        self.language = language
+
+    def to_dict(self) -> Dict:
+        return {
+            "type": Types.LAMBDA,
+            "definition": self.definition,
+            "language": self.language,
+        }
+
+    def __eq__(self, other):
+        if not isinstance(other, LambdaFunctionDefinition):
+            return False
+        return self.definition == other.definition and self.language == 
other.language
+
+    def __hash__(self):
+        return hash((self.definition, self.language))
diff --git a/paimon-python/pypaimon/tests/rest/rest_function_test.py 
b/paimon-python/pypaimon/tests/rest/rest_function_test.py
new file mode 100644
index 0000000000..2cad320fb5
--- /dev/null
+++ b/paimon-python/pypaimon/tests/rest/rest_function_test.py
@@ -0,0 +1,327 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import shutil
+import tempfile
+import unittest
+import uuid
+
+from pypaimon.api.api_response import ConfigResponse
+from pypaimon.api.auth import BearTokenAuthProvider
+from pypaimon.api.rest_api import RESTApi, IllegalArgumentError
+from pypaimon.catalog.catalog_exception import (
+    FunctionNotExistException,
+    FunctionAlreadyExistException,
+    DefinitionAlreadyExistException,
+    DefinitionNotExistException,
+)
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+from pypaimon.common.identifier import Identifier
+from pypaimon.common.options import Options
+from pypaimon.function.function import FunctionImpl
+from pypaimon.function.function_change import FunctionChange
+from pypaimon.function.function_definition import (
+    FunctionDefinition, FunctionFileResource,
+)
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.tests.rest.rest_server import RESTCatalogServer
+
+
+def _mock_function(identifier: Identifier) -> FunctionImpl:
+    input_params = [
+        DataField(0, "length", AtomicType("DOUBLE")),
+        DataField(1, "width", AtomicType("DOUBLE")),
+    ]
+    return_params = [
+        DataField(0, "area", AtomicType("DOUBLE")),
+    ]
+    flink_function = FunctionDefinition.file(
+        file_resources=[FunctionFileResource("jar", "/a/b/c.jar")],
+        language="java",
+        class_name="className",
+        function_name="eval",
+    )
+    spark_function = FunctionDefinition.lambda_def(
+        "(Double length, Double width) -> length * width", "java"
+    )
+    trino_function = FunctionDefinition.sql("length * width")
+    definitions = {
+        "flink": flink_function,
+        "spark": spark_function,
+        "trino": trino_function,
+    }
+    return FunctionImpl(
+        identifier=identifier,
+        input_params=input_params,
+        return_params=return_params,
+        deterministic=False,
+        definitions=definitions,
+        comment="comment",
+        options={},
+    )
+
+
+class RESTFunctionTest(unittest.TestCase):
+
+    def setUp(self):
+        self.temp_dir = tempfile.mkdtemp(prefix="function_test_")
+        self.config = ConfigResponse(defaults={"prefix": "mock-test"})
+        self.token = str(uuid.uuid4())
+        self.server = RESTCatalogServer(
+            data_path=self.temp_dir,
+            auth_provider=BearTokenAuthProvider(self.token),
+            config=self.config,
+            warehouse="warehouse",
+        )
+        self.server.start()
+
+        options = Options({
+            "metastore": "rest",
+            "uri": f"http://localhost:{self.server.port}";,
+            "warehouse": "warehouse",
+            "token.provider": "bear",
+            "token": self.token,
+        })
+        self.catalog = RESTCatalog(CatalogContext.create_from_options(options))
+
+    def tearDown(self):
+        self.server.shutdown()
+        import gc
+        gc.collect()
+        shutil.rmtree(self.temp_dir, ignore_errors=True)
+
+    def test_function(self):
+        self.catalog.create_database("rest_catalog_db", True)
+
+        identifier_with_slash = Identifier.create("rest_catalog_db", 
"function/")
+        with self.assertRaises(IllegalArgumentError):
+            self.catalog.create_function(
+                identifier_with_slash,
+                _mock_function(identifier_with_slash),
+                False,
+            )
+        with self.assertRaises(FunctionNotExistException):
+            self.catalog.get_function(identifier_with_slash)
+        with self.assertRaises(IllegalArgumentError):
+            self.catalog.drop_function(identifier_with_slash, True)
+
+        identifier_without_alphabet = Identifier.create("rest_catalog_db", "-")
+        with self.assertRaises(IllegalArgumentError):
+            self.catalog.create_function(
+                identifier_without_alphabet,
+                _mock_function(identifier_without_alphabet),
+                False,
+            )
+        with self.assertRaises(FunctionNotExistException):
+            self.catalog.get_function(identifier_without_alphabet)
+        with self.assertRaises(IllegalArgumentError):
+            self.catalog.drop_function(identifier_without_alphabet, True)
+
+        identifier = 
Identifier.from_string("rest_catalog_db.function.na_me-01")
+        function = _mock_function(identifier)
+
+        # Drop first to ensure fresh data (not stale from a previous failed 
run)
+        self.catalog.drop_function(identifier, True)
+        self.catalog.create_function(identifier, function, True)
+        with self.assertRaises(FunctionAlreadyExistException):
+            self.catalog.create_function(identifier, function, False)
+
+        self.assertIn(function.name(), 
self.catalog.list_functions(identifier.get_database_name()))
+
+        get_function = self.catalog.get_function(identifier)
+        self.assertEqual(get_function.name(), function.name())
+        for dialect in function.definitions().keys():
+            self.assertEqual(get_function.definition(dialect), 
function.definition(dialect))
+
+        self.catalog.drop_function(identifier, True)
+        self.assertNotIn(function.name(), 
self.catalog.list_functions(identifier.get_database_name()))
+
+        with self.assertRaises(FunctionNotExistException):
+            self.catalog.drop_function(identifier, False)
+        with self.assertRaises(FunctionNotExistException):
+            self.catalog.get_function(identifier)
+
+    def test_list_functions(self):
+        db1 = "db_rest_catalog_db"
+        db2 = "db2_rest_catalog"
+        identifier = Identifier.create(db1, "list_function")
+        identifier1 = Identifier.create(db1, "function")
+        identifier2 = Identifier.create(db2, "list_function")
+        identifier3 = Identifier.create(db2, "function")
+
+        self.catalog.create_database(db1, True)
+        self.catalog.create_database(db2, True)
+        self.catalog.create_function(identifier, _mock_function(identifier), 
True)
+        self.catalog.create_function(identifier1, _mock_function(identifier1), 
True)
+        self.catalog.create_function(identifier2, _mock_function(identifier2), 
True)
+        self.catalog.create_function(identifier3, _mock_function(identifier3), 
True)
+
+        result = self.catalog.list_functions_paged(db1, None, None, None)
+        self.assertEqual(
+            set(result.elements),
+            {identifier.get_object_name(), identifier1.get_object_name()},
+        )
+
+        result = self.catalog.list_functions_paged(db1, 1, None, None)
+        self.assertEqual(len(result.elements), 1)
+        self.assertIn(
+            result.elements[0],
+            [identifier.get_object_name(), identifier1.get_object_name()],
+        )
+
+        result = self.catalog.list_functions_paged(
+            db1, 1, identifier1.get_object_name(), None)
+        self.assertEqual(
+            result.elements,
+            [identifier.get_object_name()],
+        )
+
+        result = self.catalog.list_functions_paged(db1, None, None, "func%")
+        self.assertEqual(result.elements, [identifier1.get_object_name()])
+
+        result = self.catalog.list_functions_paged_globally("db2_rest%", 
"func%", None, None)
+        self.assertEqual(len(result.elements), 1)
+        self.assertEqual(result.elements[0].get_full_name(), 
identifier3.get_full_name())
+
+        result = self.catalog.list_functions_paged_globally(
+            "db2_rest%", None, 1, None)
+        self.assertEqual(len(result.elements), 1)
+        self.assertIn(
+            result.elements[0].get_full_name(),
+            [identifier2.get_full_name(), identifier3.get_full_name()],
+        )
+
+        result = self.catalog.list_functions_paged_globally(
+            "db2_rest%", None, 1, identifier3.get_full_name())
+        self.assertEqual(len(result.elements), 1)
+        self.assertEqual(
+            result.elements[0].get_full_name(),
+            identifier2.get_full_name(),
+        )
+
+        result = self.catalog.list_function_details_paged(db1, 1, None, None)
+        self.assertEqual(len(result.elements), 1)
+        self.assertIn(
+            result.elements[0].full_name(),
+            [identifier.get_full_name(), identifier1.get_full_name()],
+        )
+
+        result = self.catalog.list_function_details_paged(db2, 4, None, 
"func%")
+        self.assertEqual(len(result.elements), 1)
+        self.assertEqual(
+            result.elements[0].full_name(), identifier3.get_full_name())
+
+        result = self.catalog.list_function_details_paged(
+            db2, 1, identifier3.get_object_name(), None)
+        full_names = [f.full_name() for f in result.elements]
+        self.assertIn(identifier2.get_full_name(), full_names)
+
+    def test_alter_function(self):
+        identifier = Identifier.create("rest_catalog_db", 
"alter_function_name")
+        self.catalog.create_database(identifier.get_database_name(), True)
+        self.catalog.drop_function(identifier, True)
+        function = _mock_function(identifier)
+        definition = FunctionDefinition.sql("x * y + 1")
+        add_definition = FunctionChange.add_definition("flink_1", definition)
+
+        self.catalog.alter_function(identifier, [add_definition], True)
+
+        with self.assertRaises(FunctionNotExistException):
+            self.catalog.alter_function(identifier, [add_definition], False)
+
+        self.catalog.create_function(identifier, function, True)
+
+        key = str(uuid.uuid4())
+        value = str(uuid.uuid4())
+        set_option = FunctionChange.set_option(key, value)
+        self.catalog.alter_function(identifier, [set_option], False)
+        catalog_function = self.catalog.get_function(identifier)
+        self.assertEqual(catalog_function.options().get(key), value)
+
+        self.catalog.alter_function(identifier, 
[FunctionChange.remove_option(key)], False)
+        catalog_function = self.catalog.get_function(identifier)
+        self.assertNotIn(key, catalog_function.options())
+
+        new_comment = "new comment"
+        self.catalog.alter_function(
+            identifier, [FunctionChange.update_comment(new_comment)], False
+        )
+        catalog_function = self.catalog.get_function(identifier)
+        self.assertEqual(catalog_function.comment(), new_comment)
+
+        self.catalog.alter_function(identifier, [add_definition], False)
+        catalog_function = self.catalog.get_function(identifier)
+        self.assertEqual(
+            catalog_function.definition(add_definition.name),
+            add_definition.definition,
+        )
+
+        with self.assertRaises(DefinitionAlreadyExistException):
+            self.catalog.alter_function(identifier, [add_definition], False)
+
+        update_definition = FunctionChange.update_definition("flink_1", 
definition)
+        self.catalog.alter_function(identifier, [update_definition], False)
+        catalog_function = self.catalog.get_function(identifier)
+        self.assertEqual(
+            catalog_function.definition(update_definition.name),
+            update_definition.definition,
+        )
+
+        with self.assertRaises(DefinitionNotExistException):
+            self.catalog.alter_function(
+                identifier,
+                [FunctionChange.update_definition("no_exist", definition)],
+                False,
+            )
+
+        drop_definition = 
FunctionChange.drop_definition(update_definition.name)
+        self.catalog.alter_function(identifier, [drop_definition], False)
+        catalog_function = self.catalog.get_function(identifier)
+        self.assertIsNone(catalog_function.definition(update_definition.name))
+
+        with self.assertRaises(DefinitionNotExistException):
+            self.catalog.alter_function(identifier, [drop_definition], False)
+
+    def test_validate_function_name(self):
+        self.assertTrue(RESTApi.is_valid_function_name("a"))
+        self.assertTrue(RESTApi.is_valid_function_name("a1_"))
+        self.assertTrue(RESTApi.is_valid_function_name("a-b_c"))
+        self.assertTrue(RESTApi.is_valid_function_name("a-b_c.1"))
+
+        self.assertFalse(RESTApi.is_valid_function_name("a\\/b"))
+        self.assertFalse(RESTApi.is_valid_function_name("a$?b"))
+        self.assertFalse(RESTApi.is_valid_function_name("a@b"))
+        self.assertFalse(RESTApi.is_valid_function_name("a*b"))
+        self.assertFalse(RESTApi.is_valid_function_name("123"))
+        self.assertFalse(RESTApi.is_valid_function_name("_-"))
+        self.assertFalse(RESTApi.is_valid_function_name(""))
+        self.assertFalse(RESTApi.is_valid_function_name(None))
+
+        with self.assertRaises(IllegalArgumentError):
+            RESTApi.check_function_name("a\\/b")
+        with self.assertRaises(IllegalArgumentError):
+            RESTApi.check_function_name("123")
+        with self.assertRaises(IllegalArgumentError):
+            RESTApi.check_function_name("")
+        with self.assertRaises(IllegalArgumentError):
+            RESTApi.check_function_name(None)
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py 
b/paimon-python/pypaimon/tests/rest/rest_server.py
index 937d99f1bd..c3a4188669 100755
--- a/paimon-python/pypaimon/tests/rest/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest/rest_server.py
@@ -33,7 +33,11 @@ from pypaimon.api.api_request import (AlterDatabaseRequest, 
AlterTableRequest,
                                       CreateDatabaseRequest,
                                       CreateTableRequest, RenameTableRequest)
 from pypaimon.api.api_response import (ConfigResponse, GetDatabaseResponse,
+                                       GetFunctionResponse,
                                        GetTableResponse, ListDatabasesResponse,
+                                       ListFunctionDetailsResponse,
+                                       ListFunctionsGloballyResponse,
+                                       ListFunctionsResponse,
                                        ListPartitionsResponse, 
ListTablesResponse,
                                        PagedList, Partition,
                                        RESTResponse, ErrorResponse)
@@ -43,7 +47,11 @@ from pypaimon.catalog.catalog_exception import 
(DatabaseNoPermissionException,
                                                 DatabaseNotExistException,
                                                 TableNoPermissionException,
                                                 TableNotExistException, 
DatabaseAlreadyExistException,
-                                                TableAlreadyExistException)
+                                                TableAlreadyExistException,
+                                                FunctionNotExistException,
+                                                FunctionAlreadyExistException,
+                                                
DefinitionAlreadyExistException,
+                                                DefinitionNotExistException)
 from pypaimon.catalog.rest.table_metadata import TableMetadata
 from pypaimon.common.identifier import Identifier
 from pypaimon.api.typedef import RESTAuthParameter
@@ -196,6 +204,7 @@ class RESTCatalogServer:
         self.table_metadata_store: Dict[str, TableMetadata] = {}
         self.table_latest_snapshot_store: Dict[str, str] = {}
         self.table_partitions_store: Dict[str, List] = {}
+        self.function_store: Dict[str, Dict] = {}  # key: "db.func_name", 
value: GetFunctionResponse-like dict
         self.no_permission_databases: List[str] = []
         self.no_permission_tables: List[str] = []
         self.table_token_store: Dict[str, "RESTToken"] = {}
@@ -365,6 +374,10 @@ class RESTCatalogServer:
                     source_table_dir.rename(destination_table_dir)
                 return self._mock_response("", 200)
 
+            # Global functions endpoint (catalog-scoped)
+            if resource_path == self.resource_paths.functions() and method == 
"GET":
+                return self._functions_globally_handle(parameters)
+
             database = resource_path.split("/")[4]
             # Database-specific endpoints
             if 
resource_path.startswith(self.resource_paths.database(database)):
@@ -391,6 +404,10 @@ class RESTCatalogServer:
 
                     if resource_type.startswith(ResourcePaths.TABLES):
                         return self._tables_handle(method, data, 
database_name, parameters)
+                    elif resource_type == ResourcePaths.FUNCTIONS:
+                        return self._functions_handle(method, data, 
database_name, parameters)
+                    elif resource_type == ResourcePaths.FUNCTION_DETAILS:
+                        return self._function_details_handle(database_name, 
parameters)
 
                 elif len(path_parts) >= 3:
                     # Individual resource operations
@@ -402,6 +419,8 @@ class RESTCatalogServer:
                         return self._handle_table_resource(method, path_parts, 
identifier, data, parameters)
                     elif resource_type == ResourcePaths.PARTITIONS:
                         return self._table_partitions_handle(method, 
identifier, parameters)
+                    elif resource_type == ResourcePaths.FUNCTIONS:
+                        return self._function_handle(method, data, identifier)
 
                 return self._mock_response(ErrorResponse(None, None, "Not 
Found", 404), 404)
 
@@ -437,6 +456,26 @@ class RESTCatalogServer:
                 ErrorResponse.RESOURCE_TYPE_TABLE, 
e.identifier.get_full_name(), str(e), 409
             )
             return self._mock_response(response, 409)
+        except FunctionNotExistException as e:
+            response = ErrorResponse(
+                ErrorResponse.RESOURCE_TYPE_FUNCTION, 
e.identifier.get_object_name(), str(e), 404
+            )
+            return self._mock_response(response, 404)
+        except FunctionAlreadyExistException as e:
+            response = ErrorResponse(
+                ErrorResponse.RESOURCE_TYPE_FUNCTION, 
e.identifier.get_full_name(), str(e), 409
+            )
+            return self._mock_response(response, 409)
+        except DefinitionAlreadyExistException as e:
+            response = ErrorResponse(
+                ErrorResponse.RESOURCE_TYPE_DEFINITION, e.name, str(e), 409
+            )
+            return self._mock_response(response, 409)
+        except DefinitionNotExistException as e:
+            response = ErrorResponse(
+                ErrorResponse.RESOURCE_TYPE_DEFINITION, e.name, str(e), 404
+            )
+            return self._mock_response(response, 404)
         except Exception as e:
             self.logger.error(f"Unexpected error: {e}")
             response = ErrorResponse(None, None, str(e), 500)
@@ -489,6 +528,188 @@ class RESTCatalogServer:
                 return self._mock_response(ErrorResponse(None, None, "Not 
Found", 404), 404)
         return self._mock_response(ErrorResponse(None, None, "Not Found", 
404), 404)
 
+    # ======================= Function Handlers ===============================
+
+    def _functions_handle(self, method: str, data: str, database_name: str,
+                          parameters: Dict[str, str]) -> Tuple[str, int]:
+        """Handle database-scoped function list / create."""
+        if method == "GET":
+            function_name_pattern = parameters.get(FUNCTION_NAME_PATTERN)
+            functions = [
+                key.split(".", 1)[1]
+                for key in self.function_store.keys()
+                if key.startswith(database_name + ".")
+                and (not function_name_pattern or 
self._match_name_pattern(key.split(".", 1)[1], function_name_pattern))
+            ]
+            return self._generate_final_list_functions_response(parameters, 
functions)
+        elif method == "POST":
+            import json as json_module
+            request_dict = json_module.loads(data)
+            func_name = request_dict.get("name")
+            key = f"{database_name}.{func_name}"
+            if key in self.function_store:
+                identifier = Identifier.create(database_name, func_name)
+                raise FunctionAlreadyExistException(identifier)
+            self.function_store[key] = GetFunctionResponse(
+                uuid=str(uuid.uuid4()),
+                name=func_name,
+                input_params=request_dict.get("inputParams"),
+                return_params=request_dict.get("returnParams"),
+                deterministic=request_dict.get("deterministic", False),
+                definitions=request_dict.get("definitions"),
+                comment=request_dict.get("comment"),
+                options=request_dict.get("options", {}),
+                owner="owner",
+                created_at=1,
+                created_by="owner",
+                updated_at=1,
+                updated_by="owner",
+            )
+            return self._mock_response("", 200)
+        return self._mock_response(ErrorResponse(None, None, "Method Not 
Allowed", 405), 405)
+
+    def _function_handle(self, method: str, data: str, identifier: Identifier) 
-> Tuple[str, int]:
+        """Handle individual function operations (GET, POST alter, DELETE)."""
+        key = identifier.get_full_name()
+        if method == "GET":
+            if key not in self.function_store:
+                raise FunctionNotExistException(identifier)
+            return self._mock_response(self.function_store[key], 200)
+        elif method == "POST":
+            # Alter function
+            if key not in self.function_store:
+                raise FunctionNotExistException(identifier)
+            import json as json_module
+            request_dict = json_module.loads(data)
+            changes = request_dict.get("changes", [])
+            self._apply_function_changes(identifier, changes)
+            return self._mock_response("", 200)
+        elif method == "DELETE":
+            if key not in self.function_store:
+                raise FunctionNotExistException(identifier)
+            del self.function_store[key]
+            return self._mock_response("", 200)
+        return self._mock_response(ErrorResponse(None, None, "Method Not 
Allowed", 405), 405)
+
+    def _function_details_handle(self, database_name: str,
+                                 parameters: Dict[str, str]) -> Tuple[str, 
int]:
+        """Handle function details listing."""
+        function_name_pattern = parameters.get(FUNCTION_NAME_PATTERN)
+        details = []
+        for key, resp in self.function_store.items():
+            if key.startswith(database_name + "."):
+                func_name = key.split(".", 1)[1]
+                if not function_name_pattern or 
self._match_name_pattern(func_name, function_name_pattern):
+                    details.append(resp)
+        return self._generate_final_list_function_details_response(parameters, 
details)
+
+    def _functions_globally_handle(self, parameters: Dict[str, str]) -> 
Tuple[str, int]:
+        """Handle catalog-scoped function listing."""
+        database_name_pattern = parameters.get(DATABASE_NAME_PATTERN)
+        function_name_pattern = parameters.get(FUNCTION_NAME_PATTERN)
+        identifiers = []
+        for key in self.function_store.keys():
+            db_name, func_name = key.split(".", 1)
+            if database_name_pattern and not self._match_name_pattern(db_name, 
database_name_pattern):
+                continue
+            if function_name_pattern and not 
self._match_name_pattern(func_name, function_name_pattern):
+                continue
+            identifiers.append(Identifier.create(db_name, func_name))
+        return 
self._generate_final_list_functions_globally_response(parameters, identifiers)
+
+    def _apply_function_changes(self, identifier: Identifier, changes: 
List[Dict]) -> None:
+        """Apply function changes to the function store, mirroring Java mock 
server logic."""
+        from pypaimon.function.function_change import Actions
+        key = identifier.get_full_name()
+        func_resp = self.function_store[key]
+
+        # Work with mutable copies
+        options = dict(func_resp.options) if func_resp.options else {}
+        definitions = dict(func_resp.definitions) if func_resp.definitions 
else {}
+        comment = func_resp.comment
+
+        for change in changes:
+            action = change.get("action")
+            if action == Actions.SET_OPTION:
+                options[change["key"]] = change["value"]
+            elif action == Actions.REMOVE_OPTION:
+                options.pop(change["key"], None)
+            elif action == Actions.UPDATE_COMMENT:
+                comment = change.get("comment")
+            elif action == Actions.ADD_DEFINITION:
+                name = change["name"]
+                if name in definitions:
+                    raise DefinitionAlreadyExistException(identifier, name)
+                definitions[name] = change["definition"]
+            elif action == Actions.UPDATE_DEFINITION:
+                name = change["name"]
+                if name not in definitions:
+                    raise DefinitionNotExistException(identifier, name)
+                definitions[name] = change["definition"]
+            elif action == Actions.DROP_DEFINITION:
+                name = change["name"]
+                if name not in definitions:
+                    raise DefinitionNotExistException(identifier, name)
+                del definitions[name]
+
+        self.function_store[key] = GetFunctionResponse(
+            uuid=func_resp.uuid,
+            name=func_resp.name,
+            input_params=func_resp.input_params,
+            return_params=func_resp.return_params,
+            deterministic=func_resp.deterministic,
+            definitions=definitions,
+            comment=comment,
+            options=options,
+            owner=func_resp.owner,
+            created_at=func_resp.created_at,
+            created_by=func_resp.created_by,
+            updated_at=func_resp.updated_at,
+            updated_by=func_resp.updated_by,
+        )
+
+    def _generate_final_list_functions_response(self, parameters: Dict[str, 
str],
+                                                functions: List[str]) -> 
Tuple[str, int]:
+        if functions:
+            max_results = self._get_max_results(parameters)
+            page_token = parameters.get(PAGE_TOKEN)
+            paged = self._build_paged_entities(functions, max_results, 
page_token)
+            response = ListFunctionsResponse(
+                functions=paged.elements,
+                next_page_token=paged.next_page_token
+            )
+        else:
+            response = ListFunctionsResponse(functions=[], 
next_page_token=None)
+        return self._mock_response(response, 200)
+
+    def _generate_final_list_function_details_response(self, parameters: 
Dict[str, str],
+                                                       details: List) -> 
Tuple[str, int]:
+        if details:
+            max_results = self._get_max_results(parameters)
+            page_token = parameters.get(PAGE_TOKEN)
+            paged = self._build_paged_entities(details, max_results, 
page_token)
+            response = ListFunctionDetailsResponse(
+                function_details=paged.elements,
+                next_page_token=paged.next_page_token,
+            )
+        else:
+            response = ListFunctionDetailsResponse(function_details=[], 
next_page_token=None)
+        return self._mock_response(response, 200)
+
+    def _generate_final_list_functions_globally_response(self, parameters: 
Dict[str, str],
+                                                         identifiers: List) -> 
Tuple[str, int]:
+        if identifiers:
+            max_results = self._get_max_results(parameters)
+            page_token = parameters.get(PAGE_TOKEN)
+            paged = self._build_paged_entities(identifiers, max_results, 
page_token)
+            response = ListFunctionsGloballyResponse(
+                functions=paged.elements,
+                next_page_token=paged.next_page_token,
+            )
+        else:
+            response = ListFunctionsGloballyResponse(functions=[], 
next_page_token=None)
+        return self._mock_response(response, 200)
+
     def _table_partitions_handle(
             self, method: str, identifier: Identifier, parameters: Dict[str, 
str]) -> Tuple[str, int]:
         """Handle table partitions listing"""

Reply via email to