This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 1112b1c08a [#7760] feat(client-python): Add the python client for Job 
System (#7954)
1112b1c08a is described below

commit 1112b1c08a78106643ae3896e49cdd2789f1cc3f
Author: Jerry Shao <[email protected]>
AuthorDate: Mon Aug 11 14:06:07 2025 +0800

    [#7760] feat(client-python): Add the python client for Job System (#7954)
    
    ### What changes were proposed in this pull request?
    
    Add the Python client for Job System.
    
    ### Why are the changes needed?
    
    Fix: #7760
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add UTs.
---
 .../client-python/gravitino/api/job/job_handle.py  |  24 +-
 .../gravitino/api/job/job_template.py              |  35 ++-
 .../gravitino/api/job/shell_job_template.py        |  10 +
 .../gravitino/api/job/spark_job_template.py        |  10 +
 .../gravitino/client/dto_converters.py             | 112 ++++++++
 .../job_handle.py => client/generic_job_handle.py} |  28 +-
 .../gravitino/client/gravitino_client.py           | 115 +++++++-
 .../gravitino/client/gravitino_metalake.py         | 202 +++++++++++++-
 clients/client-python/gravitino/constants/error.py |   9 +
 .../{api/job/job_handle.py => dto/job/__init__.py} |  24 --
 clients/client-python/gravitino/dto/job/job_dto.py |  65 +++++
 .../gravitino/dto/job/job_template_dto.py          | 130 +++++++++
 .../gravitino/dto/job/shell_job_template_dto.py    |  40 +++
 .../gravitino/dto/job/spark_job_template_dto.py    |  69 +++++
 .../gravitino/dto/requests/job_run_request.py      |  46 ++++
 .../dto/requests/job_template_register_request.py  |  46 ++++
 .../gravitino/dto/responses/job_list_response.py   |  47 ++++
 .../responses/job_response.py}                     |  40 +--
 .../dto/responses/job_template_list_response.py    |  71 +++++
 .../dto/responses/job_template_response.py         |  59 ++++
 clients/client-python/gravitino/exceptions/base.py |   4 +
 .../exceptions/handlers/job_error_handler.py       |  58 ++++
 .../unittests/dto/job/__init__.py}                 |  24 --
 .../dto/job/test_job_template_dto_serde.py         |  95 +++++++
 clients/client-python/tests/unittests/mock_base.py |   4 +-
 .../tests/unittests/test_supports_jobs.py          | 298 +++++++++++++++++++++
 26 files changed, 1572 insertions(+), 93 deletions(-)

diff --git a/clients/client-python/gravitino/api/job/job_handle.py 
b/clients/client-python/gravitino/api/job/job_handle.py
index d1b0ba734a..b8da91fcd3 100644
--- a/clients/client-python/gravitino/api/job/job_handle.py
+++ b/clients/client-python/gravitino/api/job/job_handle.py
@@ -21,11 +21,25 @@ from enum import Enum
 
 class JobHandle(ABC):
     class Status(Enum):
-        QUEUED = "QUEUED"
-        STARTED = "STARTED"
-        FAILED = "FAILED"
-        SUCCEEDED = "SUCCEEDED"
-        CANCELLED = "CANCELLED"
+        QUEUED = "queued"
+        STARTED = "started"
+        FAILED = "failed"
+        SUCCEEDED = "succeeded"
+        CANCELLING = "cancelling"
+        CANCELLED = "cancelled"
+
+        @classmethod
+        def job_status_serialize(cls, status: "JobHandle.Status") -> str:
+            """Serializes the job status to a string."""
+            return status.value.lower()
+
+        @classmethod
+        def job_status_deserialize(cls, status: str) -> "JobHandle.Status":
+            """Deserializes a string to a job status."""
+            for m in cls:
+                if m.value.lower() == status.lower():
+                    return m
+            raise ValueError(f"Unknown job status: {status}")
 
     @abstractmethod
     def job_template_name(self) -> str:
diff --git a/clients/client-python/gravitino/api/job/job_template.py 
b/clients/client-python/gravitino/api/job/job_template.py
index 755905640c..0af7eb745a 100644
--- a/clients/client-python/gravitino/api/job/job_template.py
+++ b/clients/client-python/gravitino/api/job/job_template.py
@@ -28,11 +28,42 @@ class JobType(Enum):
     required to determine the runtime environment for executing the job.
     """
 
-    SPARK = "SPARK"
+    SPARK = "spark"
     """job type for executing Spark jobs"""
-    SHELL = "SHELL"
+    SHELL = "shell"
     """job type for executing shell commands"""
 
+    @classmethod
+    def job_type_serialize(cls, job_type: "JobType") -> str:
+        """
+        Serializes the JobType to a string.
+
+        Args:
+            job_type: The JobType to serialize.
+
+        Returns:
+            The serialized string representation of the JobType.
+        """
+        return job_type.value.lower()
+
+    @classmethod
+    def job_type_deserialize(cls, job_type_str: str) -> "JobType":
+        """
+        Deserializes a string to a JobType.
+
+        Args:
+            job_type_str: The string representation of the JobType.
+
+        Returns:
+            The JobType corresponding to the string.
+        """
+        for m in cls:
+            if m.value.lower() == job_type_str.lower():
+                return m
+
+        # If no match found, raise an error
+        raise ValueError(f"Invalid job type string: {job_type_str}")
+
 
 T = TypeVar("T", bound="JobTemplate")
 B = TypeVar("B", bound="BaseBuilder")
diff --git a/clients/client-python/gravitino/api/job/shell_job_template.py 
b/clients/client-python/gravitino/api/job/shell_job_template.py
index 2c321bc91b..fb3cece1c6 100644
--- a/clients/client-python/gravitino/api/job/shell_job_template.py
+++ b/clients/client-python/gravitino/api/job/shell_job_template.py
@@ -51,6 +51,16 @@ class ShellJobTemplate(JobTemplate):
         """
         return JobType.SHELL
 
+    @staticmethod
+    def builder() -> "ShellJobTemplate.Builder":
+        """
+        Creates a new builder instance for constructing a ShellJobTemplate.
+
+        Returns:
+            ShellJobTemplate.Builder: A new builder instance.
+        """
+        return ShellJobTemplate.Builder()
+
     def __eq__(self, other: Any) -> bool:
         if not super().__eq__(other):
             return False
diff --git a/clients/client-python/gravitino/api/job/spark_job_template.py 
b/clients/client-python/gravitino/api/job/spark_job_template.py
index d0ac6c1d5e..c731ccadd5 100644
--- a/clients/client-python/gravitino/api/job/spark_job_template.py
+++ b/clients/client-python/gravitino/api/job/spark_job_template.py
@@ -95,6 +95,16 @@ class SparkJobTemplate(JobTemplate):
         """
         return JobType.SPARK
 
+    @staticmethod
+    def builder() -> "SparkJobTemplate.Builder":
+        """
+        Creates a new builder instance for constructing a SparkJobTemplate.
+
+        Returns:
+            SparkJobTemplate.Builder: A new builder instance.
+        """
+        return SparkJobTemplate.Builder()
+
     def __eq__(self, other: Any) -> bool:
         if not super().__eq__(other):
             return False
diff --git a/clients/client-python/gravitino/client/dto_converters.py 
b/clients/client-python/gravitino/client/dto_converters.py
index e0f6819a92..ec35d537bb 100644
--- a/clients/client-python/gravitino/client/dto_converters.py
+++ b/clients/client-python/gravitino/client/dto_converters.py
@@ -17,9 +17,15 @@
 
 from gravitino.api.catalog import Catalog
 from gravitino.api.catalog_change import CatalogChange
+from gravitino.api.job.job_template import JobTemplate, JobType
+from gravitino.api.job.shell_job_template import ShellJobTemplate
+from gravitino.api.job.spark_job_template import SparkJobTemplate
 from gravitino.client.fileset_catalog import FilesetCatalog
 from gravitino.client.generic_model_catalog import GenericModelCatalog
 from gravitino.dto.catalog_dto import CatalogDTO
+from gravitino.dto.job.job_template_dto import JobTemplateDTO
+from gravitino.dto.job.shell_job_template_dto import ShellJobTemplateDTO
+from gravitino.dto.job.spark_job_template_dto import SparkJobTemplateDTO
 from gravitino.dto.requests.catalog_update_request import CatalogUpdateRequest
 from gravitino.dto.requests.metalake_update_request import 
MetalakeUpdateRequest
 from gravitino.api.metalake_change import MetalakeChange
@@ -99,3 +105,109 @@ class DTOConverters:
             )
 
         raise ValueError(f"Unknown change type: {type(change).__name__}")
+
+    @staticmethod
+    def _to_shell_job_template_dto(dto: JobTemplateDTO) -> ShellJobTemplateDTO:
+        if isinstance(dto, ShellJobTemplateDTO):
+            return dto
+
+        raise TypeError(
+            f"The provided JobTemplateDTO is not of type ShellJobTemplateDTO: 
{type(dto)}"
+        )
+
+    @staticmethod
+    def _to_spark_job_template_dto(dto: JobTemplateDTO) -> SparkJobTemplateDTO:
+        if isinstance(dto, SparkJobTemplateDTO):
+            return dto
+
+        raise TypeError(
+            f"The provided JobTemplateDTO is not of type SparkJobTemplateDTO: 
{type(dto)}"
+        )
+
+    @staticmethod
+    def _to_shell_job_template(template: JobTemplate) -> ShellJobTemplate:
+        if isinstance(template, ShellJobTemplate):
+            return template
+
+        raise TypeError(
+            f"The provided JobTemplate is not of type ShellJobTemplate: 
{type(template)}"
+        )
+
+    @staticmethod
+    def _to_spark_job_template(template: JobTemplate) -> SparkJobTemplate:
+        if isinstance(template, SparkJobTemplate):
+            return template
+
+        raise TypeError(
+            f"The provided JobTemplate is not of type SparkJobTemplate: 
{type(template)}"
+        )
+
+    @staticmethod
+    def from_job_template_dto(dto: JobTemplateDTO) -> JobTemplate:
+        if dto.job_type() == JobType.SHELL:
+            shell_dto = DTOConverters._to_shell_job_template_dto(dto)
+            return (
+                ShellJobTemplate.builder()
+                .with_name(shell_dto.name())
+                .with_comment(shell_dto.comment())
+                .with_executable(shell_dto.executable())
+                .with_arguments(shell_dto.arguments())
+                .with_environments(shell_dto.environments())
+                .with_custom_fields(shell_dto.custom_fields())
+                .with_scripts(shell_dto.scripts())
+                .build()
+            )
+
+        if dto.job_type() == JobType.SPARK:
+            spark_dto = DTOConverters._to_spark_job_template_dto(dto)
+            return (
+                SparkJobTemplate.builder()
+                .with_name(spark_dto.name())
+                .with_comment(spark_dto.comment())
+                .with_executable(spark_dto.executable())
+                .with_arguments(spark_dto.arguments())
+                .with_environments(spark_dto.environments())
+                .with_custom_fields(spark_dto.custom_fields())
+                .with_class_name(spark_dto.class_name())
+                .with_jars(spark_dto.jars())
+                .with_files(spark_dto.files())
+                .with_archives(spark_dto.archives())
+                .with_configs(spark_dto.configs())
+                .build()
+            )
+
+        raise ValueError(f"Unsupported job type: {dto.job_type()}")
+
+    @staticmethod
+    def to_job_template_dto(template: JobTemplate) -> JobTemplateDTO:
+        if isinstance(template, ShellJobTemplate):
+            shell_template = DTOConverters._to_shell_job_template(template)
+            return ShellJobTemplateDTO(
+                _job_type=shell_template.job_type(),
+                _name=shell_template.name,
+                _comment=shell_template.comment,
+                _executable=shell_template.executable,
+                _arguments=shell_template.arguments,
+                _environments=shell_template.environments,
+                _custom_fields=shell_template.custom_fields,
+                _scripts=shell_template.scripts,
+            )
+
+        if isinstance(template, SparkJobTemplate):
+            spark_template = DTOConverters._to_spark_job_template(template)
+            return SparkJobTemplateDTO(
+                _job_type=spark_template.job_type(),
+                _name=spark_template.name,
+                _comment=spark_template.comment,
+                _executable=spark_template.executable,
+                _arguments=spark_template.arguments,
+                _environments=spark_template.environments,
+                _custom_fields=spark_template.custom_fields,
+                _class_name=spark_template.class_name,
+                _jars=spark_template.jars,
+                _files=spark_template.files,
+                _archives=spark_template.archives,
+                _configs=spark_template.configs,
+            )
+
+        raise ValueError(f"Unsupported job type: {type(template)}")
diff --git a/clients/client-python/gravitino/api/job/job_handle.py 
b/clients/client-python/gravitino/client/generic_job_handle.py
similarity index 64%
copy from clients/client-python/gravitino/api/job/job_handle.py
copy to clients/client-python/gravitino/client/generic_job_handle.py
index d1b0ba734a..95b568a613 100644
--- a/clients/client-python/gravitino/api/job/job_handle.py
+++ b/clients/client-python/gravitino/client/generic_job_handle.py
@@ -6,7 +6,7 @@
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
 #
-#   http://www.apache.org/licenses/LICENSE-2.0
+#  http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
@@ -14,27 +14,21 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from gravitino.api.job.job_handle import JobHandle
+from gravitino.dto.job.job_dto import JobDTO
 
-from abc import ABC, abstractmethod
-from enum import Enum
 
+class GenericJobHandle(JobHandle):
+    """Represents a generic job handle."""
 
-class JobHandle(ABC):
-    class Status(Enum):
-        QUEUED = "QUEUED"
-        STARTED = "STARTED"
-        FAILED = "FAILED"
-        SUCCEEDED = "SUCCEEDED"
-        CANCELLED = "CANCELLED"
+    def __init__(self, job_dto: JobDTO):
+        self._job_dto = job_dto
 
-    @abstractmethod
     def job_template_name(self) -> str:
-        pass
+        return self._job_dto.job_template_name()
 
-    @abstractmethod
     def job_id(self) -> str:
-        pass
+        return self._job_dto.job_id()
 
-    @abstractmethod
-    def job_status(self) -> Status:
-        pass
+    def job_status(self):
+        return self._job_dto.status()
diff --git a/clients/client-python/gravitino/client/gravitino_client.py 
b/clients/client-python/gravitino/client/gravitino_client.py
index cc70cdd971..ea27a86443 100644
--- a/clients/client-python/gravitino/client/gravitino_client.py
+++ b/clients/client-python/gravitino/client/gravitino_client.py
@@ -19,12 +19,15 @@ from typing import List, Dict
 
 from gravitino.api.catalog import Catalog
 from gravitino.api.catalog_change import CatalogChange
+from gravitino.api.job.job_handle import JobHandle
+from gravitino.api.job.job_template import JobTemplate
+from gravitino.api.job.supports_jobs import SupportsJobs
 from gravitino.auth.auth_data_provider import AuthDataProvider
 from gravitino.client.gravitino_client_base import GravitinoClientBase
 from gravitino.client.gravitino_metalake import GravitinoMetalake
 
 
-class GravitinoClient(GravitinoClientBase):
+class GravitinoClient(GravitinoClientBase, SupportsJobs):
     """Gravitino Client for a user to interact with the Gravitino API, 
allowing the client to list,
     load, create, and alter Catalog.
 
@@ -103,3 +106,113 @@ class GravitinoClient(GravitinoClientBase):
 
     def disable_catalog(self, name: str):
         return self.get_metalake().disable_catalog(name)
+
+    def list_job_templates(self) -> List[JobTemplate]:
+        """Lists all job templates in the current metalake.
+
+        Returns:
+            A list of JobTemplate objects representing the job templates in 
the metalake.
+        """
+        return self.get_metalake().list_job_templates()
+
+    def register_job_template(self, job_template) -> None:
+        """Register a job template with the specified job template to 
Gravitino. The registered
+        job template will be maintained in Gravitino, allowing it to be 
executed later.
+
+        Args:
+            job_template: The job template to register.
+
+        Raises:
+            JobTemplateAlreadyExists: If a job template with the same name 
already exists.
+        """
+        self.get_metalake().register_job_template(job_template)
+
+    def get_job_template(self, job_template_name: str) -> JobTemplate:
+        """Retrieves a job template by its name.
+
+        Args:
+            job_template_name: The name of the job template to retrieve.
+
+        Returns:
+            The JobTemplate object corresponding to the specified name.
+
+        Raises:
+            NoSuchJobTemplateException: If no job template with the specified 
name exists.
+        """
+        return self.get_metalake().get_job_template(job_template_name)
+
+    def delete_job_template(self, job_template_name: str) -> bool:
+        """
+        Deletes a job template by its name. This will remove the job template 
from Gravitino, and it
+        will no longer be available for execution. Only when all the jobs 
associated with this job
+        template are completed, failed or cancelled, the job template can be 
deleted successfully,
+        otherwise it will throw InUseException. Returns false if the job 
template to be deleted does
+        not exist.
+
+        The deletion of a job template will also delete all the jobs 
associated with this template.
+
+        Args:
+            job_template_name: The name of the job template to delete.
+
+        Returns:
+            bool: True if the job template was deleted successfully, False if 
the job template
+            does not exist.
+
+        Raises:
+            InUseException: If there are still queued or started jobs 
associated with this job template.
+        """
+        return self.get_metalake().delete_job_template(job_template_name)
+
+    def list_jobs(self, job_template_name: str = None) -> List[JobHandle]:
+        """Lists all the jobs in the current metalake.
+
+        Args:
+            job_template_name: Optional; if provided, filters the jobs by the 
specified job template name.
+
+        Returns:
+            A list of JobHandle objects representing the jobs in the metalake.
+        """
+        return self.get_metalake().list_jobs(job_template_name)
+
+    def get_job(self, job_id: str) -> JobHandle:
+        """Retrieves a job by its ID.
+
+        Args:
+            job_id: The ID of the job to retrieve.
+
+        Returns:
+            The JobHandle object corresponding to the specified job ID.
+
+        Raises:
+            NoSuchJobException: If no job with the specified ID exists.
+        """
+        return self.get_metalake().get_job(job_id)
+
+    def run_job(self, job_template_name: str, job_conf: Dict[str, str]) -> 
JobHandle:
+        """Runs a job using the specified job template and configuration.
+
+        Args:
+            job_template_name: The name of the job template to use for running 
the job.
+            job_conf: A dictionary containing the configuration for the job.
+
+        Returns:
+            A JobHandle object representing the started job.
+
+        Raises:
+            NoSuchJobTemplateException: If no job template with the specified 
name exists.
+        """
+        return self.get_metalake().run_job(job_template_name, job_conf)
+
+    def cancel_job(self, job_id: str) -> JobHandle:
+        """Cancels a job by its ID.
+
+        Args:
+            job_id: The ID of the job to cancel.
+
+        Returns:
+            The JobHandle object representing the cancelled job.
+
+        Raises:
+            NoSuchJobException: If no job with the specified ID exists.
+        """
+        return self.get_metalake().cancel_job(job_id)
diff --git a/clients/client-python/gravitino/client/gravitino_metalake.py 
b/clients/client-python/gravitino/client/gravitino_metalake.py
index a4a62559e1..b85fa41c7b 100644
--- a/clients/client-python/gravitino/client/gravitino_metalake.py
+++ b/clients/client-python/gravitino/client/gravitino_metalake.py
@@ -20,16 +20,29 @@ from typing import List, Dict
 
 from gravitino.api.catalog import Catalog
 from gravitino.api.catalog_change import CatalogChange
+from gravitino.api.job.job_handle import JobHandle
+from gravitino.api.job.job_template import JobTemplate
+from gravitino.api.job.supports_jobs import SupportsJobs
 from gravitino.client.dto_converters import DTOConverters
+from gravitino.client.generic_job_handle import GenericJobHandle
 from gravitino.dto.metalake_dto import MetalakeDTO
 from gravitino.dto.requests.catalog_create_request import CatalogCreateRequest
 from gravitino.dto.requests.catalog_set_request import CatalogSetRequest
 from gravitino.dto.requests.catalog_updates_request import 
CatalogUpdatesRequest
+from gravitino.dto.requests.job_run_request import JobRunRequest
+from gravitino.dto.requests.job_template_register_request import (
+    JobTemplateRegisterRequest,
+)
 from gravitino.dto.responses.catalog_list_response import CatalogListResponse
 from gravitino.dto.responses.catalog_response import CatalogResponse
 from gravitino.dto.responses.drop_response import DropResponse
 from gravitino.dto.responses.entity_list_response import EntityListResponse
+from gravitino.dto.responses.job_list_response import JobListResponse
+from gravitino.dto.responses.job_response import JobResponse
+from gravitino.dto.responses.job_template_list_response import 
JobTemplateListResponse
+from gravitino.dto.responses.job_template_response import JobTemplateResponse
 from gravitino.exceptions.handlers.catalog_error_handler import 
CATALOG_ERROR_HANDLER
+from gravitino.exceptions.handlers.job_error_handler import JOB_ERROR_HANDLER
 from gravitino.rest.rest_utils import encode_string
 from gravitino.utils import HTTPClient
 
@@ -37,7 +50,7 @@ from gravitino.utils import HTTPClient
 logger = logging.getLogger(__name__)
 
 
-class GravitinoMetalake(MetalakeDTO):
+class GravitinoMetalake(MetalakeDTO, SupportsJobs):
     """
     Gravitino Metalake is the top-level metadata repository for users. It 
contains a list of catalogs
     as sub-level metadata collections. With GravitinoMetalake, users can list, 
create, load,
@@ -47,6 +60,8 @@ class GravitinoMetalake(MetalakeDTO):
     rest_client: HTTPClient
 
     API_METALAKES_CATALOGS_PATH = "api/metalakes/{}/catalogs/{}"
+    API_METALAKES_JOB_TEMPLATES_PATH = "api/metalakes/{}/jobs/templates"
+    API_METALAKES_JOB_RUNS_PATH = "api/metalakes/{}/jobs/runs"
 
     def __init__(self, metalake: MetalakeDTO = None, client: HTTPClient = 
None):
         super().__init__(
@@ -255,3 +270,188 @@ class GravitinoMetalake(MetalakeDTO):
         self.rest_client.patch(
             url, json=catalog_disable_request, 
error_handler=CATALOG_ERROR_HANDLER
         )
+
+    def list_job_templates(self) -> List[JobTemplate]:
+        """List all the registered job templates in Gravitino.
+
+        Returns:
+            List of job templates.
+        """
+        params = {"details": "true"}
+        url = 
self.API_METALAKES_JOB_TEMPLATES_PATH.format(encode_string(self.name()))
+        response = self.rest_client.get(
+            url, params=params, error_handler=JOB_ERROR_HANDLER
+        )
+        resp = JobTemplateListResponse.from_json(response.body, 
infer_missing=True)
+        resp.validate()
+
+        return [
+            DTOConverters.from_job_template_dto(dto) for dto in 
resp.job_templates()
+        ]
+
+    def register_job_template(self, job_template: JobTemplate) -> None:
+        """Register a job template with the specified job template to 
Gravitino. The registered
+        job template will be maintained in Gravitino, allowing it to be 
executed later.
+
+        Args:
+            job_template: The job template to register.
+
+        Raises:
+            JobTemplateAlreadyExists: If a job template with the same name 
already exists.
+        """
+        url = 
self.API_METALAKES_JOB_TEMPLATES_PATH.format(encode_string(self.name()))
+        req = JobTemplateRegisterRequest(
+            DTOConverters.to_job_template_dto(job_template)
+        )
+
+        self.rest_client.post(url, json=req, error_handler=JOB_ERROR_HANDLER)
+
+    def get_job_template(self, job_template_name: str) -> JobTemplate:
+        """Retrieves a job template by its name.
+
+        Args:
+            job_template_name: The name of the job template to retrieve.
+
+        Returns:
+            The job template if found, otherwise raises an exception.
+
+        Raises:
+            NoSuchJobTemplateException: If no job template with the specified 
name exists.
+        """
+        if not job_template_name or not job_template_name.strip():
+            raise ValueError("Job template name cannot be null or empty")
+
+        url = (
+            
f"{self.API_METALAKES_JOB_TEMPLATES_PATH}/{encode_string(self.name())}/"
+            f"{encode_string(job_template_name)}"
+        )
+        response = self.rest_client.get(url, error_handler=JOB_ERROR_HANDLER)
+        resp = JobTemplateResponse.from_json(response.body, infer_missing=True)
+        resp.validate()
+
+        return DTOConverters.from_job_template_dto(resp.job_template())
+
+    def delete_job_template(self, job_template_name: str) -> bool:
+        """
+        Deletes a job template by its name. This will remove the job template 
from Gravitino, and it
+        will no longer be available for execution. Only when all the jobs 
associated with this job
+        template are completed, failed or cancelled, the job template can be 
deleted successfully,
+        otherwise it will throw InUseException. Returns false if the job 
template to be deleted does
+        not exist.
+
+        The deletion of a job template will also delete all the jobs 
associated with this template.
+
+        Args:
+            job_template_name: The name of the job template to delete.
+
+        Returns:
+            bool: True if the job template was deleted successfully, False if 
the job template
+            does not exist.
+
+        Raises:
+            InUseException: If the job template is currently in use by any 
jobs, it cannot be deleted.
+        """
+        if not job_template_name or not job_template_name.strip():
+            raise ValueError("Job template name cannot be null or empty")
+
+        url = (
+            
f"{self.API_METALAKES_JOB_TEMPLATES_PATH}/{encode_string(self.name())}/"
+            f"{encode_string(job_template_name)}"
+        )
+        response = self.rest_client.delete(url, 
error_handler=JOB_ERROR_HANDLER)
+
+        drop_response = DropResponse.from_json(response.body, 
infer_missing=True)
+        drop_response.validate()
+
+        return drop_response.dropped()
+
+    def list_jobs(self, job_template_name: str = None) -> List[JobHandle]:
+        """List all the jobs under this metalake.
+
+        Args:
+            job_template_name: The name of the job template to filter jobs by. 
If None, all jobs are listed.
+
+        Returns:
+            A list of JobHandle objects representing the jobs.
+        """
+        params = (
+            {"jobTemplateName": encode_string(job_template_name)}
+            if job_template_name
+            else {}
+        )
+        url = 
self.API_METALAKES_JOB_RUNS_PATH.format(encode_string(self.name()))
+        response = self.rest_client.get(
+            url, params=params, error_handler=JOB_ERROR_HANDLER
+        )
+        resp = JobListResponse.from_json(response.body, infer_missing=True)
+        resp.validate()
+
+        return [GenericJobHandle(dto) for dto in resp.jobs()]
+
+    def get_job(self, job_id: str) -> JobHandle:
+        """Retrieves a job by its ID.
+
+        Args:
+            job_id: The ID of the job to retrieve.
+
+        Returns:
+            The JobHandle representing the job if found, otherwise raises an 
exception.
+
+        Raises:
+            NoSuchJobException: If no job with the specified ID exists.
+        """
+        if not job_id or not job_id.strip():
+            raise ValueError("Job ID cannot be null or empty")
+
+        url = 
f"{self.API_METALAKES_JOB_RUNS_PATH}/{encode_string(self.name())}/{encode_string(job_id)}"
+        response = self.rest_client.get(url, error_handler=JOB_ERROR_HANDLER)
+        resp = JobResponse.from_json(response.body, infer_missing=True)
+        resp.validate()
+
+        return GenericJobHandle(resp.job())
+
+    def run_job(self, job_template_name: str, job_conf: Dict[str, str]) -> 
JobHandle:
+        """Runs a job based on the specified job template and configuration.
+
+        Args:
+            job_template_name: The name of the job template to use for running 
the job.
+            job_conf: A dictionary containing the configuration for the job.
+
+        Returns:
+            A JobHandle representing the started job.
+
+        Raises:
+            NoSuchJobTemplateException: If no job template with the specified 
name exists.
+        """
+        url = 
f"{self.API_METALAKES_JOB_RUNS_PATH}/{encode_string(self.name())}"
+        request = JobRunRequest(job_template_name, job_conf)
+
+        response = self.rest_client.post(
+            url, json=request, error_handler=JOB_ERROR_HANDLER
+        )
+        resp = JobResponse.from_json(response.body, infer_missing=True)
+        resp.validate()
+
+        return GenericJobHandle(resp.job())
+
+    def cancel_job(self, job_id: str) -> JobHandle:
+        """Cancels a job by its ID.
+
+        Args:
+            job_id: The ID of the job to cancel.
+
+        Returns:
+            A JobHandle representing the cancelled job.
+
+        Raises:
+            NoSuchJobException: If no job with the specified ID exists.
+        """
+        if not job_id or not job_id.strip():
+            raise ValueError("Job ID cannot be null or empty")
+
+        url = 
f"{self.API_METALAKES_JOB_RUNS_PATH}/{encode_string(self.name())}/{encode_string(job_id)}"
+        response = self.rest_client.post(url, error_handler=JOB_ERROR_HANDLER)
+        resp = JobResponse.from_json(response.body, infer_missing=True)
+        resp.validate()
+
+        return GenericJobHandle(resp.job())
diff --git a/clients/client-python/gravitino/constants/error.py 
b/clients/client-python/gravitino/constants/error.py
index 7ff56c6fee..27e6031451 100644
--- a/clients/client-python/gravitino/constants/error.py
+++ b/clients/client-python/gravitino/constants/error.py
@@ -26,6 +26,9 @@ from gravitino.exceptions.base import (
     AlreadyExistsException,
     NotEmptyException,
     UnsupportedOperationException,
+    ForbiddenException,
+    NotInUseException,
+    InUseException,
 )
 
 
@@ -56,6 +59,9 @@ class ErrorConstants(IntEnum):
     # Error codes for connect to catalog failed.
     CONNECTION_FAILED_CODE = 1007
 
+    # Error codes for forbidden operation.
+    FORBIDDEN_CODE = 1008
+
     # Error codes for operation on a no in use entity.
     NOT_IN_USE_CODE = 1009
 
@@ -75,6 +81,9 @@ EXCEPTION_MAPPING = {
     NotEmptyException: ErrorConstants.NON_EMPTY_CODE,
     UnsupportedOperationException: ErrorConstants.UNSUPPORTED_OPERATION_CODE,
     ConnectionFailedException: ErrorConstants.CONNECTION_FAILED_CODE,
+    ForbiddenException: ErrorConstants.FORBIDDEN_CODE,
+    NotInUseException: ErrorConstants.NOT_IN_USE_CODE,
+    InUseException: ErrorConstants.IN_USE_CODE,
 }
 
 ERROR_CODE_MAPPING = {v: k for k, v in EXCEPTION_MAPPING.items()}
diff --git a/clients/client-python/gravitino/api/job/job_handle.py 
b/clients/client-python/gravitino/dto/job/__init__.py
similarity index 63%
copy from clients/client-python/gravitino/api/job/job_handle.py
copy to clients/client-python/gravitino/dto/job/__init__.py
index d1b0ba734a..13a83393a9 100644
--- a/clients/client-python/gravitino/api/job/job_handle.py
+++ b/clients/client-python/gravitino/dto/job/__init__.py
@@ -14,27 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-from abc import ABC, abstractmethod
-from enum import Enum
-
-
-class JobHandle(ABC):
-    class Status(Enum):
-        QUEUED = "QUEUED"
-        STARTED = "STARTED"
-        FAILED = "FAILED"
-        SUCCEEDED = "SUCCEEDED"
-        CANCELLED = "CANCELLED"
-
-    @abstractmethod
-    def job_template_name(self) -> str:
-        pass
-
-    @abstractmethod
-    def job_id(self) -> str:
-        pass
-
-    @abstractmethod
-    def job_status(self) -> Status:
-        pass
diff --git a/clients/client-python/gravitino/dto/job/job_dto.py 
b/clients/client-python/gravitino/dto/job/job_dto.py
new file mode 100644
index 0000000000..eab42da57c
--- /dev/null
+++ b/clients/client-python/gravitino/dto/job/job_dto.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from dataclasses_json import config, DataClassJsonMixin
+
+from gravitino.api.job.job_handle import JobHandle
+from gravitino.dto.audit_dto import AuditDTO
+
+
+@dataclass
+class JobDTO(DataClassJsonMixin):
+    """Data transfer object representing a Job."""
+
+    _job_id: str = field(metadata=config(field_name="jobId"))
+    _job_template_name: str = 
field(metadata=config(field_name="jobTemplateName"))
+    _status: JobHandle.Status = field(
+        metadata=config(
+            field_name="status",
+            encoder=JobHandle.Status.job_status_serialize,
+            decoder=JobHandle.Status.job_status_deserialize,
+        )
+    )
+    _audit: AuditDTO = field(metadata=config(field_name="audit"))
+
+    def job_id(self) -> str:
+        """Returns the job ID."""
+        return self._job_id
+
+    def job_template_name(self) -> str:
+        """Returns the job template name."""
+        return self._job_template_name
+
+    def status(self) -> JobHandle.Status:
+        """Returns the status of the job."""
+        return self._status
+
+    def audit(self) -> AuditDTO:
+        """Returns the audit information of the job."""
+        return self._audit
+
+    def validate(self) -> None:
+        """Validates the JobDTO, ensuring required fields are present and 
non-empty."""
+        if self._job_id is None or not self._job_id.strip():
+            raise ValueError('"jobId" is required and cannot be empty')
+        if self._job_template_name is None or not 
self._job_template_name.strip():
+            raise ValueError('"jobTemplateName" is required and cannot be 
empty')
+        if self._status is None:
+            raise ValueError('"status" must not be None')
+        if self._audit is None:
+            raise ValueError('"audit" must not be None')
diff --git a/clients/client-python/gravitino/dto/job/job_template_dto.py 
b/clients/client-python/gravitino/dto/job/job_template_dto.py
new file mode 100644
index 0000000000..6ed6b1a6cc
--- /dev/null
+++ b/clients/client-python/gravitino/dto/job/job_template_dto.py
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from abc import ABC
+from typing import Dict, List, Optional, Type
+from dataclasses import dataclass, field
+
+from dataclasses_json import config, DataClassJsonMixin
+
+from gravitino.api.job.job_template import JobType
+from gravitino.dto.audit_dto import AuditDTO
+
+
+@dataclass
+class JobTemplateDTO(DataClassJsonMixin, ABC):
+    """Represents a Job Template Data Transfer Object (DTO)."""
+
+    # pylint: disable=R0902
+    _job_type: JobType = field(
+        metadata=config(
+            field_name="jobType",
+            encoder=JobType.job_type_serialize,
+            decoder=JobType.job_type_deserialize,
+        )
+    )
+    _name: str = field(metadata=config(field_name="name"))
+    _executable: str = field(metadata=config(field_name="executable"))
+    _comment: Optional[str] = field(default=None, 
metadata=config(field_name="comment"))
+    _arguments: Optional[List[str]] = field(
+        default=None, metadata=config(field_name="arguments")
+    )
+    _environments: Optional[Dict[str, str]] = field(
+        default=None, metadata=config(field_name="environments")
+    )
+    _custom_fields: Optional[Dict[str, str]] = field(
+        default=None, metadata=config(field_name="customFields")
+    )
+    _audit: Optional[AuditDTO] = field(
+        default=None, metadata=config(field_name="audit")
+    )
+
+    def job_type(self) -> JobType:
+        """Returns the type of the job."""
+        return self._job_type
+
+    def name(self) -> str:
+        """Returns the name of the job template."""
+        return self._name
+
+    def comment(self) -> Optional[str]:
+        """Returns the comment associated with the job template."""
+        return self._comment
+
+    def executable(self) -> str:
+        """Returns the executable associated with the job template."""
+        return self._executable
+
+    def arguments(self) -> Optional[List[str]]:
+        """Returns the list of arguments for the job template."""
+        return self._arguments
+
+    def environments(self) -> Optional[Dict[str, str]]:
+        """Returns the environment variables for the job template."""
+        return self._environments
+
+    def custom_fields(self) -> Optional[Dict[str, str]]:
+        """Returns custom fields for the job template."""
+        return self._custom_fields
+
+    def audit_info(self) -> Optional[AuditDTO]:
+        """Returns the audit information for the job template."""
+        return self._audit
+
+    def validate(self) -> None:
+        """Validates the JobTemplateDTO. Ensures that required fields are not 
null or empty."""
+        if self._job_type is None:
+            raise ValueError('"jobType" is required and cannot be None')
+
+        if self._name is None or not self._name.strip():
+            raise ValueError('"name" is required and cannot be empty')
+
+        if self._executable is None or not self._executable.strip():
+            raise ValueError('"executable" is required and cannot be empty')
+
+    @classmethod
+    def from_json(
+        cls, s: str, infer_missing: bool = False, **kwargs
+    ) -> "JobTemplateDTO":
+        """Creates a JobTemplateDTO from a JSON string."""
+        data = json.loads(s)
+        job_type = JobType.job_type_deserialize(data.get("jobType"))
+        subclass = JOB_TYPE_TEMPLATE_MAPPING.get(job_type)
+        if not subclass:
+            raise ValueError(f"Unsupported job type: {job_type}")
+        return subclass.from_dict(data, infer_missing=infer_missing)
+
+
+JOB_TYPE_TEMPLATE_MAPPING: Dict[JobType, Type["JobTemplateDTO"]] = {}
+
+
+def register_job_template(job_type: JobType):
+    """
+    Decorator to register a subclass of JobTemplateDTO for a specific job type.
+
+    Args:
+        job_type (JobType): The job type to register the subclass for.
+
+    Returns:
+        Callable: The decorator function.
+    """
+
+    def decorator(subclass: Type["JobTemplateDTO"]):
+        JOB_TYPE_TEMPLATE_MAPPING[job_type] = subclass
+        return subclass
+
+    return decorator
diff --git a/clients/client-python/gravitino/dto/job/shell_job_template_dto.py 
b/clients/client-python/gravitino/dto/job/shell_job_template_dto.py
new file mode 100644
index 0000000000..40e9c911aa
--- /dev/null
+++ b/clients/client-python/gravitino/dto/job/shell_job_template_dto.py
@@ -0,0 +1,40 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+
+from typing import List, Optional
+from dataclasses import dataclass, field
+
+from dataclasses_json import config
+
+from gravitino.api.job.job_template import JobType
+from gravitino.dto.job.job_template_dto import JobTemplateDTO, 
register_job_template
+
+
+@register_job_template(JobType.SHELL)
+@dataclass
+class ShellJobTemplateDTO(JobTemplateDTO):
+    """Represents a Shell Job Template Data Transfer Object (DTO)."""
+
+    _scripts: Optional[List[str]] = field(
+        default=None, metadata=config(field_name="scripts")
+    )
+
+    def scripts(self) -> Optional[List[str]]:
+        """Returns the scripts associated with the shell job template."""
+        return self._scripts
diff --git a/clients/client-python/gravitino/dto/job/spark_job_template_dto.py 
b/clients/client-python/gravitino/dto/job/spark_job_template_dto.py
new file mode 100644
index 0000000000..8eb0a71c6c
--- /dev/null
+++ b/clients/client-python/gravitino/dto/job/spark_job_template_dto.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, List, Optional
+from dataclasses import dataclass, field
+
+from dataclasses_json import config
+
+from .job_template_dto import JobTemplateDTO, register_job_template
+from ...api.job.job_template import JobType
+
+
+@register_job_template(JobType.SPARK)
+@dataclass
+class SparkJobTemplateDTO(JobTemplateDTO):
+    """Represents a Spark Job Template Data Transfer Object (DTO)."""
+
+    _class_name: str = field(default=None, 
metadata=config(field_name="className"))
+    _jars: Optional[List[str]] = field(default=None, 
metadata=config(field_name="jars"))
+    _files: Optional[List[str]] = field(
+        default=None, metadata=config(field_name="files")
+    )
+    _archives: Optional[List[str]] = field(
+        default=None, metadata=config(field_name="archives")
+    )
+    _configs: Optional[Dict[str, str]] = field(
+        default=None, metadata=config(field_name="configs")
+    )
+
+    def class_name(self) -> str:
+        """Returns the class name of the Spark job."""
+        return self._class_name
+
+    def jars(self) -> Optional[List[str]]:
+        """Returns the list of JAR files associated with this Spark job 
template."""
+        return self._jars
+
+    def files(self) -> Optional[List[str]]:
+        """Returns the list of files associated with this Spark job 
template."""
+        return self._files
+
+    def archives(self) -> Optional[List[str]]:
+        """Returns the list of archives associated with this Spark job 
template."""
+        return self._archives
+
+    def configs(self) -> Optional[Dict[str, str]]:
+        """Returns the configuration properties for the Spark job."""
+        return self._configs
+
+    def validate(self) -> None:
+        """Validates the SparkJobTemplateDTO. Ensures that required fields are 
not null or empty."""
+        super().validate()
+
+        if self._class_name is None or not self._class_name.strip():
+            raise ValueError('"className" is required and cannot be empty')
diff --git a/clients/client-python/gravitino/dto/requests/job_run_request.py 
b/clients/client-python/gravitino/dto/requests/job_run_request.py
new file mode 100644
index 0000000000..659b9e940a
--- /dev/null
+++ b/clients/client-python/gravitino/dto/requests/job_run_request.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import Optional, Dict
+
+from dataclasses_json import config
+
+from gravitino.rest.rest_message import RESTRequest
+
+
+@dataclass
+class JobRunRequest(RESTRequest):
+    """Represents a request to run a job using a specified job template."""
+
+    _job_template_name: str = 
field(metadata=config(field_name="jobTemplateName"))
+    _job_conf: Optional[Dict[str, str]] = field(
+        default=None, metadata=config(field_name="jobConf")
+    )
+
+    def job_template_name(self) -> str:
+        """Returns the job template name to use for running the job."""
+        return self._job_template_name
+
+    def job_conf(self) -> Optional[Dict[str, str]]:
+        """Returns the job configuration parameters."""
+        return self._job_conf
+
+    def validate(self) -> None:
+        """Validates the request. Raises ValueError if invalid."""
+        if self._job_template_name is None or not 
self._job_template_name.strip():
+            raise ValueError('"jobTemplateName" is required and cannot be 
empty')
diff --git 
a/clients/client-python/gravitino/dto/requests/job_template_register_request.py 
b/clients/client-python/gravitino/dto/requests/job_template_register_request.py
new file mode 100644
index 0000000000..6bf76e99b9
--- /dev/null
+++ 
b/clients/client-python/gravitino/dto/requests/job_template_register_request.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import Optional
+
+from dataclasses_json import config
+
+from gravitino.dto.job.job_template_dto import JobTemplateDTO
+from gravitino.rest.rest_message import RESTRequest
+
+
+@dataclass
+class JobTemplateRegisterRequest(RESTRequest):
+    """Represents a request to register a job template."""
+
+    _job_template: JobTemplateDTO = 
field(metadata=config(field_name="jobTemplate"))
+
+    def job_template(self) -> Optional[JobTemplateDTO]:
+        """Returns the job template to register."""
+        return self._job_template
+
+    def validate(self):
+        """Validates the request.
+
+        Raises:
+            ValueError: If the request is invalid.
+        """
+        if self._job_template is None:
+            raise ValueError('"jobTemplate" is required and cannot be None')
+
+        self._job_template.validate()
diff --git a/clients/client-python/gravitino/dto/responses/job_list_response.py 
b/clients/client-python/gravitino/dto/responses/job_list_response.py
new file mode 100644
index 0000000000..3f42e7c46a
--- /dev/null
+++ b/clients/client-python/gravitino/dto/responses/job_list_response.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import List
+
+from dataclasses_json import config
+from gravitino.exceptions.base import IllegalArgumentException
+
+from .base_response import BaseResponse
+from ..job.job_dto import JobDTO
+
+
+@dataclass
+class JobListResponse(BaseResponse):
+    """Represents a response containing a list of jobs."""
+
+    _jobs: List[JobDTO] = field(metadata=config(field_name="jobs"))
+
+    def validate(self):
+        """Validates the response data and contained jobs."""
+        super().validate()
+
+        if self._jobs is None:
+            raise IllegalArgumentException("jobs must not be None")
+
+        for job in self._jobs:
+            if job is not None:
+                job.validate()
+
+    def jobs(self) -> List[JobDTO]:
+        """Returns the list of jobs from the response."""
+        return self._jobs
diff --git a/clients/client-python/gravitino/api/job/job_handle.py 
b/clients/client-python/gravitino/dto/responses/job_response.py
similarity index 52%
copy from clients/client-python/gravitino/api/job/job_handle.py
copy to clients/client-python/gravitino/dto/responses/job_response.py
index d1b0ba734a..7beb06fe13 100644
--- a/clients/client-python/gravitino/api/job/job_handle.py
+++ b/clients/client-python/gravitino/dto/responses/job_response.py
@@ -15,26 +15,30 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from abc import ABC, abstractmethod
-from enum import Enum
+from dataclasses import dataclass, field
 
+from dataclasses_json import config
+from gravitino.exceptions.base import IllegalArgumentException
 
-class JobHandle(ABC):
-    class Status(Enum):
-        QUEUED = "QUEUED"
-        STARTED = "STARTED"
-        FAILED = "FAILED"
-        SUCCEEDED = "SUCCEEDED"
-        CANCELLED = "CANCELLED"
+from .base_response import BaseResponse
+from ..job.job_dto import JobDTO
 
-    @abstractmethod
-    def job_template_name(self) -> str:
-        pass
 
-    @abstractmethod
-    def job_id(self) -> str:
-        pass
+@dataclass
+class JobResponse(BaseResponse):
+    """Represents a response containing a single job."""
 
-    @abstractmethod
-    def job_status(self) -> Status:
-        pass
+    _job: JobDTO = field(metadata=config(field_name="job"))
+
+    def validate(self):
+        """Validates the response data and contained job."""
+        super().validate()
+
+        if self._job is None:
+            raise IllegalArgumentException('"job" must not be None')
+
+        self._job.validate()
+
+    def job(self) -> JobDTO:
+        """Returns the job from the response."""
+        return self._job
diff --git 
a/clients/client-python/gravitino/dto/responses/job_template_list_response.py 
b/clients/client-python/gravitino/dto/responses/job_template_list_response.py
new file mode 100644
index 0000000000..bf6973c920
--- /dev/null
+++ 
b/clients/client-python/gravitino/dto/responses/job_template_list_response.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from dataclasses import dataclass, field
+from typing import List
+
+from dataclasses_json import config
+from gravitino.exceptions.base import IllegalArgumentException
+
+from .base_response import BaseResponse
+from ..job.job_template_dto import JobTemplateDTO
+
+
+@dataclass
+class JobTemplateListResponse(BaseResponse):
+    """Represents a response containing a list of job templates."""
+
+    _job_templates: List[JobTemplateDTO] = field(
+        metadata=config(field_name="jobTemplates")
+    )
+
+    def validate(self):
+        """Validates the response data.
+
+        Raises:
+            IllegalArgumentException: If the job templates list is not set or 
contains invalid items.
+        """
+        super().validate()
+
+        if self._job_templates is None:
+            raise IllegalArgumentException("jobTemplates must not be None")
+
+        for job_template in self._job_templates:
+            if job_template is not None:
+                job_template.validate()
+
+    def job_templates(self) -> List[JobTemplateDTO]:
+        """Returns the list of job templates from the response."""
+        return self._job_templates
+
+    @classmethod
+    def from_json(
+        cls, s: str, infer_missing: bool = False, **kwargs
+    ) -> "JobTemplateListResponse":
+        """Deserialize JSON string into a JobTemplateListResponse object."""
+
+        data = json.loads(s)
+
+        # Deserialize each job template using JobTemplateDTO.from_json
+        job_templates = [
+            JobTemplateDTO.from_json(
+                json.dumps(item), infer_missing=infer_missing, **kwargs
+            )
+            for item in data.get("jobTemplates", [])
+        ]
+
+        return cls(_job_templates=job_templates, _code=data["code"])
diff --git 
a/clients/client-python/gravitino/dto/responses/job_template_response.py 
b/clients/client-python/gravitino/dto/responses/job_template_response.py
new file mode 100644
index 0000000000..ebeb560e2b
--- /dev/null
+++ b/clients/client-python/gravitino/dto/responses/job_template_response.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from dataclasses import dataclass, field
+
+from dataclasses_json import config
+from gravitino.exceptions.base import IllegalArgumentException
+
+from .base_response import BaseResponse
+from ..job.job_template_dto import JobTemplateDTO
+
+
+@dataclass
+class JobTemplateResponse(BaseResponse):
+    """Represents a response containing a single job template."""
+
+    _job_template: JobTemplateDTO = 
field(metadata=config(field_name="jobTemplate"))
+
+    def validate(self):
+        """Validates the response data.
+
+        Raises:
+            IllegalArgumentException: If the job template is not set or 
invalid.
+        """
+        super().validate()
+
+        if self._job_template is None:
+            raise IllegalArgumentException('"jobTemplate" must not be None')
+
+        self._job_template.validate()
+
+    def job_template(self) -> JobTemplateDTO:
+        """Returns the job template from the response."""
+        return self._job_template
+
+    @classmethod
+    def from_json(
+        cls, s: str, infer_missing: bool = False, **kwargs
+    ) -> "JobTemplateResponse":
+        """Deserialize JSON string into a JobTemplateResponse object."""
+        data = json.loads(s)
+        job_template_data = JobTemplateDTO.from_json(
+            json.dumps(data["jobTemplate"]), infer_missing=infer_missing, 
**kwargs
+        )
+        return cls(_job_template=job_template_data, _code=0)
diff --git a/clients/client-python/gravitino/exceptions/base.py 
b/clients/client-python/gravitino/exceptions/base.py
index ce2f0d536a..0193ad78fe 100644
--- a/clients/client-python/gravitino/exceptions/base.py
+++ b/clients/client-python/gravitino/exceptions/base.py
@@ -187,3 +187,7 @@ class NoSuchJobTemplateException(NotFoundException):
 
 class NoSuchJobException(NotFoundException):
     """An exception thrown when a job with specified name is not found."""
+
+
+class ForbiddenException(GravitinoRuntimeException):
+    """An exception thrown when a user is forbidden to perform an action."""
diff --git 
a/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py 
b/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py
new file mode 100644
index 0000000000..fc96e4c2b8
--- /dev/null
+++ b/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from gravitino.constants.error import ErrorConstants
+from gravitino.dto.responses.error_response import ErrorResponse
+from gravitino.exceptions.handlers.rest_error_handler import RestErrorHandler
+from gravitino.exceptions.base import (
+    NoSuchMetalakeException,
+    JobTemplateAlreadyExistsException,
+    NoSuchJobTemplateException,
+    NoSuchJobException,
+    InUseException,
+    MetalakeNotInUseException,
+)
+
+
+class JobErrorHandler(RestErrorHandler):
+    def handle(self, error_response: ErrorResponse):
+        error_message = error_response.format_error_message()
+        code = error_response.code()
+        exception_type = error_response.type()
+
+        if code == ErrorConstants.NOT_FOUND_CODE:
+            if exception_type == NoSuchMetalakeException.__name__:
+                raise NoSuchMetalakeException(error_message)
+            if exception_type == NoSuchJobTemplateException.__name__:
+                raise NoSuchJobTemplateException(error_message)
+            if exception_type == NoSuchJobException.__name__:
+                raise NoSuchJobException(error_message)
+
+        if code == ErrorConstants.ALREADY_EXISTS_CODE:
+            raise JobTemplateAlreadyExistsException(error_message)
+
+        if code == ErrorConstants.IN_USE_CODE:
+            raise InUseException(error_message)
+
+        if code == ErrorConstants.NOT_IN_USE_CODE:
+            if exception_type == MetalakeNotInUseException.__name__:
+                raise MetalakeNotInUseException(error_message)
+
+        super().handle(error_response)
+
+
+JOB_ERROR_HANDLER = JobErrorHandler()
diff --git a/clients/client-python/gravitino/api/job/job_handle.py 
b/clients/client-python/tests/unittests/dto/job/__init__.py
similarity index 63%
copy from clients/client-python/gravitino/api/job/job_handle.py
copy to clients/client-python/tests/unittests/dto/job/__init__.py
index d1b0ba734a..13a83393a9 100644
--- a/clients/client-python/gravitino/api/job/job_handle.py
+++ b/clients/client-python/tests/unittests/dto/job/__init__.py
@@ -14,27 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-from abc import ABC, abstractmethod
-from enum import Enum
-
-
-class JobHandle(ABC):
-    class Status(Enum):
-        QUEUED = "QUEUED"
-        STARTED = "STARTED"
-        FAILED = "FAILED"
-        SUCCEEDED = "SUCCEEDED"
-        CANCELLED = "CANCELLED"
-
-    @abstractmethod
-    def job_template_name(self) -> str:
-        pass
-
-    @abstractmethod
-    def job_id(self) -> str:
-        pass
-
-    @abstractmethod
-    def job_status(self) -> Status:
-        pass
diff --git 
a/clients/client-python/tests/unittests/dto/job/test_job_template_dto_serde.py 
b/clients/client-python/tests/unittests/dto/job/test_job_template_dto_serde.py
new file mode 100644
index 0000000000..9f05e45692
--- /dev/null
+++ 
b/clients/client-python/tests/unittests/dto/job/test_job_template_dto_serde.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+
+from gravitino.api.job.job_template import JobType
+from gravitino.dto.job.job_template_dto import JobTemplateDTO
+from gravitino.dto.job.shell_job_template_dto import ShellJobTemplateDTO
+from gravitino.dto.job.spark_job_template_dto import SparkJobTemplateDTO
+
+
+class TestJobTemplateDTOSerDe(unittest.TestCase):
+
+    def test_shell_job_template_dto(self):
+        shell_job = ShellJobTemplateDTO(
+            _job_type=JobType.SHELL,
+            _name="test_shell_job",
+            _comment="Test shell job",
+            _executable="/path/to/script.sh",
+            _arguments=["arg1", "arg2"],
+            _environments={"ENV_VAR": "value"},
+            _custom_fields={"custom_field1": "value1"},
+            _scripts=["/path/to/script1.sh", "/path/to/script2.sh"],
+            _audit=None,
+        )
+        shell_job.validate()
+
+        json_str = shell_job.to_json()
+        shell_job_template_dto = JobTemplateDTO.from_json(json_str, 
infer_missing=True)
+        self.assertIsInstance(shell_job_template_dto, ShellJobTemplateDTO)
+        self.assertEqual(shell_job, shell_job_template_dto)
+
+        shell_job = ShellJobTemplateDTO(
+            _job_type=JobType.SHELL,
+            _name="test_shell_job_1",
+            _executable="/path/to/script1.sh",
+            _audit=None,
+        )
+        shell_job.validate()
+
+        json_str = shell_job.to_json()
+        shell_job_template_dto = JobTemplateDTO.from_json(json_str, 
infer_missing=True)
+        self.assertIsInstance(shell_job_template_dto, ShellJobTemplateDTO)
+        self.assertEqual(shell_job, shell_job_template_dto)
+
+    def test_spark_job_template_dto(self):
+        spark_job = SparkJobTemplateDTO(
+            _job_type=JobType.SPARK,
+            _name="test_spark_job",
+            _comment="Test spark job",
+            _executable="/path/to/spark-demo.jar",
+            _arguments=["input", "output"],
+            _environments={"ENV_VAR": "value"},
+            _custom_fields={"custom_field1": "value1"},
+            _class_name="com.example.SparkDemo",
+            _jars=["/path/to/jar1", "/path/to/jar2"],
+            _files=["/path/to/file1", "/path/to/file2"],
+            _archives=["/path/to/a.zip", "/path/to/b.zip"],
+            _configs={"k1": "v1"},
+            _audit=None,
+        )
+        spark_job.validate()
+
+        json_str = spark_job.to_json()
+        spark_job_template_dto = JobTemplateDTO.from_json(json_str, 
infer_missing=True)
+        self.assertIsInstance(spark_job_template_dto, SparkJobTemplateDTO)
+        self.assertEqual(spark_job, spark_job_template_dto)
+
+        spark_job = SparkJobTemplateDTO(
+            _job_type=JobType.SPARK,
+            _name="test_spark_job",
+            _comment="Test spark job",
+            _executable="/path/to/spark-demo.jar",
+            _class_name="com.example.SparkDemo",
+            _audit=None,
+        )
+        spark_job.validate()
+
+        json_str = spark_job.to_json()
+        spark_job_template_dto = JobTemplateDTO.from_json(json_str, 
infer_missing=True)
+        self.assertIsInstance(spark_job_template_dto, SparkJobTemplateDTO)
+        self.assertEqual(spark_job, spark_job_template_dto)
diff --git a/clients/client-python/tests/unittests/mock_base.py 
b/clients/client-python/tests/unittests/mock_base.py
index fb30715441..9421d9d1fd 100644
--- a/clients/client-python/tests/unittests/mock_base.py
+++ b/clients/client-python/tests/unittests/mock_base.py
@@ -42,7 +42,9 @@ def mock_load_metalake():
         _properties={"k": "v"},
         _audit=audit_dto,
     )
-    return GravitinoMetalake(metalake_dto)
+    return GravitinoMetalake(
+        metalake_dto, HTTPClient("http://localhost:9090";, is_debug=True)
+    )
 
 
 def mock_load_catalog(name: str):
diff --git a/clients/client-python/tests/unittests/test_supports_jobs.py 
b/clients/client-python/tests/unittests/test_supports_jobs.py
new file mode 100644
index 0000000000..697c2a54b7
--- /dev/null
+++ b/clients/client-python/tests/unittests/test_supports_jobs.py
@@ -0,0 +1,298 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from http.client import HTTPResponse
+from unittest.mock import Mock, patch
+
+from gravitino import GravitinoClient
+from gravitino.api.job.job_handle import JobHandle
+from gravitino.api.job.job_template import JobType
+from gravitino.api.job.shell_job_template import ShellJobTemplate
+from gravitino.api.job.spark_job_template import SparkJobTemplate
+from gravitino.dto.audit_dto import AuditDTO
+from gravitino.dto.job.job_dto import JobDTO
+from gravitino.dto.job.shell_job_template_dto import ShellJobTemplateDTO
+from gravitino.dto.job.spark_job_template_dto import SparkJobTemplateDTO
+from gravitino.dto.responses.base_response import BaseResponse
+from gravitino.dto.responses.drop_response import DropResponse
+from gravitino.dto.responses.job_list_response import JobListResponse
+from gravitino.dto.responses.job_response import JobResponse
+from gravitino.dto.responses.job_template_list_response import 
JobTemplateListResponse
+from gravitino.dto.responses.job_template_response import JobTemplateResponse
+from gravitino.utils import Response
+from tests.unittests import mock_base
+
+
+@mock_base.mock_data
+class TestSupportsJobs(unittest.TestCase):
+    _metalake_name = "metalake_demo"
+
+    def test_list_job_templates(self, *mock_methods):
+
+        gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";,
+            metalake_name=self._metalake_name,
+        )
+
+        templates = [self._new_shell_job_template(), 
self._new_spark_job_template()]
+        template_dtos = [
+            self._new_shell_job_template_dto(templates[0]),
+            self._new_spark_job_template_dto(templates[1]),
+        ]
+        job_template_list_resp = JobTemplateListResponse(
+            _job_templates=template_dtos, _code=0
+        )
+        json_str = job_template_list_resp.to_json()
+        mock_resp = self._mock_http_response(json_str)
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.get", 
return_value=mock_resp
+        ):
+            result_templates = gravitino_client.list_job_templates()
+            self.assertEqual(templates, result_templates)
+
+        # test with empty response
+        job_template_list_resp = JobTemplateListResponse(_job_templates=[], 
_code=0)
+        json_str = job_template_list_resp.to_json()
+        mock_resp = self._mock_http_response(json_str)
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.get", 
return_value=mock_resp
+        ):
+            result_templates = gravitino_client.list_job_templates()
+            self.assertEqual([], result_templates)
+
+    def test_register_job_template(self, *mock_methods):
+        gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";,
+            metalake_name=self._metalake_name,
+        )
+
+        shell_template = self._new_shell_job_template()
+        resp = BaseResponse(_code=0)
+        mock_resp = self._mock_http_response(resp.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.post", 
return_value=mock_resp
+        ):
+            gravitino_client.register_job_template(shell_template)
+
+    def test_get_job_template(self, *mock_methods):
+        gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";,
+            metalake_name=self._metalake_name,
+        )
+
+        shell_template = self._new_shell_job_template()
+        shell_template_dto = self._new_shell_job_template_dto(shell_template)
+        resp = JobTemplateResponse(_job_template=shell_template_dto, _code=0)
+        mock_resp = self._mock_http_response(resp.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.get", 
return_value=mock_resp
+        ):
+            result_template = 
gravitino_client.get_job_template(shell_template.name)
+            self.assertEqual(shell_template, result_template)
+
+    def test_delete_job_template(self, *mock_methods):
+        gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";,
+            metalake_name=self._metalake_name,
+        )
+
+        shell_template = self._new_shell_job_template()
+        resp = DropResponse(_dropped=True, _code=0)
+        mock_resp = self._mock_http_response(resp.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.delete", 
return_value=mock_resp
+        ):
+            result = gravitino_client.delete_job_template(shell_template.name)
+            self.assertTrue(result)
+
+        resp = DropResponse(_dropped=False, _code=0)
+        mock_resp = self._mock_http_response(resp.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.delete", 
return_value=mock_resp
+        ):
+            result = gravitino_client.delete_job_template(shell_template.name)
+            self.assertFalse(result)
+
+    def test_list_jobs(self, *mock_methods):
+        gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";,
+            metalake_name=self._metalake_name,
+        )
+
+        job_template_name = "test_shell_job"
+        job_dtos = [
+            self._new_job_dto(job_template_name),
+            self._new_job_dto(job_template_name),
+        ]
+        resp = JobListResponse(_jobs=job_dtos, _code=0)
+        mock_resp = self._mock_http_response(resp.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.get", 
return_value=mock_resp
+        ):
+            result_jobs = gravitino_client.list_jobs(job_template_name)
+            self.assertEqual(len(job_dtos), len(result_jobs))
+            for i, job_dto in enumerate(job_dtos):
+                self._compare_job_handle(result_jobs[i], job_dto)
+
+    def test_run_job(self, *mock_methods):
+        gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";,
+            metalake_name=self._metalake_name,
+        )
+
+        job_template_name = "test_shell_job"
+        job_dto = self._new_job_dto(job_template_name)
+        resp = JobResponse(_job=job_dto, _code=0)
+        mock_resp = self._mock_http_response(resp.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.post", 
return_value=mock_resp
+        ):
+            job_handle = gravitino_client.run_job(job_template_name, {})
+            self._compare_job_handle(job_handle, job_dto)
+
+    def test_get_job(self, *mock_methods):
+        gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";,
+            metalake_name=self._metalake_name,
+        )
+
+        job_template_name = "test_shell_job"
+        job_dto = self._new_job_dto(job_template_name)
+        resp = JobResponse(_job=job_dto, _code=0)
+        mock_resp = self._mock_http_response(resp.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.get", 
return_value=mock_resp
+        ):
+            job_handle = gravitino_client.get_job(job_dto.job_id())
+            self._compare_job_handle(job_handle, job_dto)
+
+        # test with invalid input
+        with self.assertRaises(ValueError):
+            gravitino_client.get_job("")
+
+        with self.assertRaises(ValueError):
+            gravitino_client.get_job(None)
+
+    def test_cancel_job(self, *mock_methods):
+        gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";,
+            metalake_name=self._metalake_name,
+        )
+
+        job_template_name = "test_shell_job"
+        job_dto = self._new_job_dto(job_template_name)
+        resp = JobResponse(_job=job_dto, _code=0)
+        mock_resp = self._mock_http_response(resp.to_json())
+
+        with patch(
+            "gravitino.utils.http_client.HTTPClient.post", 
return_value=mock_resp
+        ):
+            job_handle = gravitino_client.cancel_job(job_dto.job_id())
+            self._compare_job_handle(job_handle, job_dto)
+
+        # test with invalid input
+        with self.assertRaises(ValueError):
+            gravitino_client.cancel_job("")
+
+        with self.assertRaises(ValueError):
+            gravitino_client.cancel_job(None)
+
+    def _new_shell_job_template(self):
+        return (
+            ShellJobTemplate.builder()
+            .with_name("test_shell_job")
+            .with_comment("This is a test shell job template")
+            .with_executable("/path/to/executable.sh")
+            .with_arguments(["arg1", "arg2"])
+            .with_environments({"ENV_VAR": "value"})
+            .with_custom_fields({"custom_field": "custom_value"})
+            .with_scripts(["/path/to/script1.sh", "/path/to/script2.sh"])
+            .build()
+        )
+
+    def _new_spark_job_template(self):
+        return (
+            SparkJobTemplate.builder()
+            .with_name("test_spark_job")
+            .with_comment("This is a test spark job template")
+            .with_executable("/path/to/spark-demo.jar")
+            .with_class_name("com.example.SparkJob")
+            .with_jars(["/path/to/jar1.jar", "/path/to/jar2.jar"])
+            .with_files(["/path/to/file1.txt", "/path/to/file2.txt"])
+            .with_archives(["/path/to/archive1.zip", "/path/to/archive2.zip"])
+            .with_configs({"spark.executor.memory": "2g", 
"spark.driver.memory": "1g"})
+            .build()
+        )
+
+    def _new_shell_job_template_dto(self, template: ShellJobTemplate):
+        return ShellJobTemplateDTO(
+            _job_type=JobType.SHELL,
+            _name=template.name,
+            _comment=template.comment,
+            _executable=template.executable,
+            _arguments=template.arguments,
+            _environments=template.environments,
+            _custom_fields=template.custom_fields,
+            _scripts=template.scripts,
+        )
+
+    def _new_spark_job_template_dto(self, template: SparkJobTemplate):
+        return SparkJobTemplateDTO(
+            _job_type=JobType.SPARK,
+            _name=template.name,
+            _comment=template.comment,
+            _executable=template.executable,
+            _arguments=template.arguments,
+            _environments=template.environments,
+            _custom_fields=template.custom_fields,
+            _class_name=template.class_name,
+            _jars=template.jars,
+            _files=template.files,
+            _archives=template.archives,
+            _configs=template.configs,
+        )
+
+    def _mock_http_response(self, json_str: str):
+        mock_http_resp = Mock(HTTPResponse)
+        mock_http_resp.getcode.return_value = 200
+        mock_http_resp.read.return_value = json_str
+        mock_http_resp.info.return_value = None
+        mock_http_resp.url = None
+        mock_resp = Response(mock_http_resp)
+        return mock_resp
+
+    def _new_job_dto(self, job_template_name: str) -> JobDTO:
+        return JobDTO(
+            _job_id="job-123",
+            _job_template_name=job_template_name,
+            _status=JobHandle.Status.QUEUED,
+            _audit=AuditDTO(_creator="test", 
_create_time="2023-10-01T00:00:00Z"),
+        )
+
+    def _compare_job_handle(self, job_handle: JobHandle, job_dto: JobDTO):
+        self.assertEqual(job_handle.job_id(), job_dto.job_id())
+        self.assertEqual(job_handle.job_template_name(), 
job_dto.job_template_name())
+        self.assertEqual(job_handle.job_status(), job_dto.status())

Reply via email to