This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 1112b1c08a [#7760] feat(client-python): Add the python client for Job
System (#7954)
1112b1c08a is described below
commit 1112b1c08a78106643ae3896e49cdd2789f1cc3f
Author: Jerry Shao <[email protected]>
AuthorDate: Mon Aug 11 14:06:07 2025 +0800
[#7760] feat(client-python): Add the python client for Job System (#7954)
### What changes were proposed in this pull request?
Add the Python client for Job System.
### Why are the changes needed?
Fix: #7760
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add UTs.
---
.../client-python/gravitino/api/job/job_handle.py | 24 +-
.../gravitino/api/job/job_template.py | 35 ++-
.../gravitino/api/job/shell_job_template.py | 10 +
.../gravitino/api/job/spark_job_template.py | 10 +
.../gravitino/client/dto_converters.py | 112 ++++++++
.../job_handle.py => client/generic_job_handle.py} | 28 +-
.../gravitino/client/gravitino_client.py | 115 +++++++-
.../gravitino/client/gravitino_metalake.py | 202 +++++++++++++-
clients/client-python/gravitino/constants/error.py | 9 +
.../{api/job/job_handle.py => dto/job/__init__.py} | 24 --
clients/client-python/gravitino/dto/job/job_dto.py | 65 +++++
.../gravitino/dto/job/job_template_dto.py | 130 +++++++++
.../gravitino/dto/job/shell_job_template_dto.py | 40 +++
.../gravitino/dto/job/spark_job_template_dto.py | 69 +++++
.../gravitino/dto/requests/job_run_request.py | 46 ++++
.../dto/requests/job_template_register_request.py | 46 ++++
.../gravitino/dto/responses/job_list_response.py | 47 ++++
.../responses/job_response.py} | 40 +--
.../dto/responses/job_template_list_response.py | 71 +++++
.../dto/responses/job_template_response.py | 59 ++++
clients/client-python/gravitino/exceptions/base.py | 4 +
.../exceptions/handlers/job_error_handler.py | 58 ++++
.../unittests/dto/job/__init__.py} | 24 --
.../dto/job/test_job_template_dto_serde.py | 95 +++++++
clients/client-python/tests/unittests/mock_base.py | 4 +-
.../tests/unittests/test_supports_jobs.py | 298 +++++++++++++++++++++
26 files changed, 1572 insertions(+), 93 deletions(-)
diff --git a/clients/client-python/gravitino/api/job/job_handle.py
b/clients/client-python/gravitino/api/job/job_handle.py
index d1b0ba734a..b8da91fcd3 100644
--- a/clients/client-python/gravitino/api/job/job_handle.py
+++ b/clients/client-python/gravitino/api/job/job_handle.py
@@ -21,11 +21,25 @@ from enum import Enum
class JobHandle(ABC):
class Status(Enum):
- QUEUED = "QUEUED"
- STARTED = "STARTED"
- FAILED = "FAILED"
- SUCCEEDED = "SUCCEEDED"
- CANCELLED = "CANCELLED"
+ QUEUED = "queued"
+ STARTED = "started"
+ FAILED = "failed"
+ SUCCEEDED = "succeeded"
+ CANCELLING = "cancelling"
+ CANCELLED = "cancelled"
+
+ @classmethod
+ def job_status_serialize(cls, status: "JobHandle.Status") -> str:
+ """Serializes the job status to a string."""
+ return status.value.lower()
+
+ @classmethod
+ def job_status_deserialize(cls, status: str) -> "JobHandle.Status":
+ """Deserializes a string to a job status."""
+ for m in cls:
+ if m.value.lower() == status.lower():
+ return m
+ raise ValueError(f"Unknown job status: {status}")
@abstractmethod
def job_template_name(self) -> str:
diff --git a/clients/client-python/gravitino/api/job/job_template.py
b/clients/client-python/gravitino/api/job/job_template.py
index 755905640c..0af7eb745a 100644
--- a/clients/client-python/gravitino/api/job/job_template.py
+++ b/clients/client-python/gravitino/api/job/job_template.py
@@ -28,11 +28,42 @@ class JobType(Enum):
required to determine the runtime environment for executing the job.
"""
- SPARK = "SPARK"
+ SPARK = "spark"
"""job type for executing Spark jobs"""
- SHELL = "SHELL"
+ SHELL = "shell"
"""job type for executing shell commands"""
+ @classmethod
+ def job_type_serialize(cls, job_type: "JobType") -> str:
+ """
+ Serializes the JobType to a string.
+
+ Args:
+ job_type: The JobType to serialize.
+
+ Returns:
+ The serialized string representation of the JobType.
+ """
+ return job_type.value.lower()
+
+ @classmethod
+ def job_type_deserialize(cls, job_type_str: str) -> "JobType":
+ """
+ Deserializes a string to a JobType.
+
+ Args:
+ job_type_str: The string representation of the JobType.
+
+ Returns:
+ The JobType corresponding to the string.
+ """
+ for m in cls:
+ if m.value.lower() == job_type_str.lower():
+ return m
+
+ # If no match found, raise an error
+ raise ValueError(f"Invalid job type string: {job_type_str}")
+
T = TypeVar("T", bound="JobTemplate")
B = TypeVar("B", bound="BaseBuilder")
diff --git a/clients/client-python/gravitino/api/job/shell_job_template.py
b/clients/client-python/gravitino/api/job/shell_job_template.py
index 2c321bc91b..fb3cece1c6 100644
--- a/clients/client-python/gravitino/api/job/shell_job_template.py
+++ b/clients/client-python/gravitino/api/job/shell_job_template.py
@@ -51,6 +51,16 @@ class ShellJobTemplate(JobTemplate):
"""
return JobType.SHELL
+ @staticmethod
+ def builder() -> "ShellJobTemplate.Builder":
+ """
+ Creates a new builder instance for constructing a ShellJobTemplate.
+
+ Returns:
+ ShellJobTemplate.Builder: A new builder instance.
+ """
+ return ShellJobTemplate.Builder()
+
def __eq__(self, other: Any) -> bool:
if not super().__eq__(other):
return False
diff --git a/clients/client-python/gravitino/api/job/spark_job_template.py
b/clients/client-python/gravitino/api/job/spark_job_template.py
index d0ac6c1d5e..c731ccadd5 100644
--- a/clients/client-python/gravitino/api/job/spark_job_template.py
+++ b/clients/client-python/gravitino/api/job/spark_job_template.py
@@ -95,6 +95,16 @@ class SparkJobTemplate(JobTemplate):
"""
return JobType.SPARK
+ @staticmethod
+ def builder() -> "SparkJobTemplate.Builder":
+ """
+ Creates a new builder instance for constructing a SparkJobTemplate.
+
+ Returns:
+ SparkJobTemplate.Builder: A new builder instance.
+ """
+ return SparkJobTemplate.Builder()
+
def __eq__(self, other: Any) -> bool:
if not super().__eq__(other):
return False
diff --git a/clients/client-python/gravitino/client/dto_converters.py
b/clients/client-python/gravitino/client/dto_converters.py
index e0f6819a92..ec35d537bb 100644
--- a/clients/client-python/gravitino/client/dto_converters.py
+++ b/clients/client-python/gravitino/client/dto_converters.py
@@ -17,9 +17,15 @@
from gravitino.api.catalog import Catalog
from gravitino.api.catalog_change import CatalogChange
+from gravitino.api.job.job_template import JobTemplate, JobType
+from gravitino.api.job.shell_job_template import ShellJobTemplate
+from gravitino.api.job.spark_job_template import SparkJobTemplate
from gravitino.client.fileset_catalog import FilesetCatalog
from gravitino.client.generic_model_catalog import GenericModelCatalog
from gravitino.dto.catalog_dto import CatalogDTO
+from gravitino.dto.job.job_template_dto import JobTemplateDTO
+from gravitino.dto.job.shell_job_template_dto import ShellJobTemplateDTO
+from gravitino.dto.job.spark_job_template_dto import SparkJobTemplateDTO
from gravitino.dto.requests.catalog_update_request import CatalogUpdateRequest
from gravitino.dto.requests.metalake_update_request import
MetalakeUpdateRequest
from gravitino.api.metalake_change import MetalakeChange
@@ -99,3 +105,109 @@ class DTOConverters:
)
raise ValueError(f"Unknown change type: {type(change).__name__}")
+
+ @staticmethod
+ def _to_shell_job_template_dto(dto: JobTemplateDTO) -> ShellJobTemplateDTO:
+ if isinstance(dto, ShellJobTemplateDTO):
+ return dto
+
+ raise TypeError(
+ f"The provided JobTemplateDTO is not of type ShellJobTemplateDTO:
{type(dto)}"
+ )
+
+ @staticmethod
+ def _to_spark_job_template_dto(dto: JobTemplateDTO) -> SparkJobTemplateDTO:
+ if isinstance(dto, SparkJobTemplateDTO):
+ return dto
+
+ raise TypeError(
+ f"The provided JobTemplateDTO is not of type SparkJobTemplateDTO:
{type(dto)}"
+ )
+
+ @staticmethod
+ def _to_shell_job_template(template: JobTemplate) -> ShellJobTemplate:
+ if isinstance(template, ShellJobTemplate):
+ return template
+
+ raise TypeError(
+ f"The provided JobTemplate is not of type ShellJobTemplate:
{type(template)}"
+ )
+
+ @staticmethod
+ def _to_spark_job_template(template: JobTemplate) -> SparkJobTemplate:
+ if isinstance(template, SparkJobTemplate):
+ return template
+
+ raise TypeError(
+ f"The provided JobTemplate is not of type SparkJobTemplate:
{type(template)}"
+ )
+
+ @staticmethod
+ def from_job_template_dto(dto: JobTemplateDTO) -> JobTemplate:
+ if dto.job_type() == JobType.SHELL:
+ shell_dto = DTOConverters._to_shell_job_template_dto(dto)
+ return (
+ ShellJobTemplate.builder()
+ .with_name(shell_dto.name())
+ .with_comment(shell_dto.comment())
+ .with_executable(shell_dto.executable())
+ .with_arguments(shell_dto.arguments())
+ .with_environments(shell_dto.environments())
+ .with_custom_fields(shell_dto.custom_fields())
+ .with_scripts(shell_dto.scripts())
+ .build()
+ )
+
+ if dto.job_type() == JobType.SPARK:
+ spark_dto = DTOConverters._to_spark_job_template_dto(dto)
+ return (
+ SparkJobTemplate.builder()
+ .with_name(spark_dto.name())
+ .with_comment(spark_dto.comment())
+ .with_executable(spark_dto.executable())
+ .with_arguments(spark_dto.arguments())
+ .with_environments(spark_dto.environments())
+ .with_custom_fields(spark_dto.custom_fields())
+ .with_class_name(spark_dto.class_name())
+ .with_jars(spark_dto.jars())
+ .with_files(spark_dto.files())
+ .with_archives(spark_dto.archives())
+ .with_configs(spark_dto.configs())
+ .build()
+ )
+
+ raise ValueError(f"Unsupported job type: {dto.job_type()}")
+
+ @staticmethod
+ def to_job_template_dto(template: JobTemplate) -> JobTemplateDTO:
+ if isinstance(template, ShellJobTemplate):
+ shell_template = DTOConverters._to_shell_job_template(template)
+ return ShellJobTemplateDTO(
+ _job_type=shell_template.job_type(),
+ _name=shell_template.name,
+ _comment=shell_template.comment,
+ _executable=shell_template.executable,
+ _arguments=shell_template.arguments,
+ _environments=shell_template.environments,
+ _custom_fields=shell_template.custom_fields,
+ _scripts=shell_template.scripts,
+ )
+
+ if isinstance(template, SparkJobTemplate):
+ spark_template = DTOConverters._to_spark_job_template(template)
+ return SparkJobTemplateDTO(
+ _job_type=spark_template.job_type(),
+ _name=spark_template.name,
+ _comment=spark_template.comment,
+ _executable=spark_template.executable,
+ _arguments=spark_template.arguments,
+ _environments=spark_template.environments,
+ _custom_fields=spark_template.custom_fields,
+ _class_name=spark_template.class_name,
+ _jars=spark_template.jars,
+ _files=spark_template.files,
+ _archives=spark_template.archives,
+ _configs=spark_template.configs,
+ )
+
+ raise ValueError(f"Unsupported job type: {type(template)}")
diff --git a/clients/client-python/gravitino/api/job/job_handle.py
b/clients/client-python/gravitino/client/generic_job_handle.py
similarity index 64%
copy from clients/client-python/gravitino/api/job/job_handle.py
copy to clients/client-python/gravitino/client/generic_job_handle.py
index d1b0ba734a..95b568a613 100644
--- a/clients/client-python/gravitino/api/job/job_handle.py
+++ b/clients/client-python/gravitino/client/generic_job_handle.py
@@ -6,7 +6,7 @@
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
@@ -14,27 +14,21 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from gravitino.api.job.job_handle import JobHandle
+from gravitino.dto.job.job_dto import JobDTO
-from abc import ABC, abstractmethod
-from enum import Enum
+class GenericJobHandle(JobHandle):
+ """Represents a generic job handle."""
-class JobHandle(ABC):
- class Status(Enum):
- QUEUED = "QUEUED"
- STARTED = "STARTED"
- FAILED = "FAILED"
- SUCCEEDED = "SUCCEEDED"
- CANCELLED = "CANCELLED"
+ def __init__(self, job_dto: JobDTO):
+ self._job_dto = job_dto
- @abstractmethod
def job_template_name(self) -> str:
- pass
+ return self._job_dto.job_template_name()
- @abstractmethod
def job_id(self) -> str:
- pass
+ return self._job_dto.job_id()
- @abstractmethod
- def job_status(self) -> Status:
- pass
+ def job_status(self):
+ return self._job_dto.status()
diff --git a/clients/client-python/gravitino/client/gravitino_client.py
b/clients/client-python/gravitino/client/gravitino_client.py
index cc70cdd971..ea27a86443 100644
--- a/clients/client-python/gravitino/client/gravitino_client.py
+++ b/clients/client-python/gravitino/client/gravitino_client.py
@@ -19,12 +19,15 @@ from typing import List, Dict
from gravitino.api.catalog import Catalog
from gravitino.api.catalog_change import CatalogChange
+from gravitino.api.job.job_handle import JobHandle
+from gravitino.api.job.job_template import JobTemplate
+from gravitino.api.job.supports_jobs import SupportsJobs
from gravitino.auth.auth_data_provider import AuthDataProvider
from gravitino.client.gravitino_client_base import GravitinoClientBase
from gravitino.client.gravitino_metalake import GravitinoMetalake
-class GravitinoClient(GravitinoClientBase):
+class GravitinoClient(GravitinoClientBase, SupportsJobs):
"""Gravitino Client for a user to interact with the Gravitino API,
allowing the client to list,
load, create, and alter Catalog.
@@ -103,3 +106,113 @@ class GravitinoClient(GravitinoClientBase):
def disable_catalog(self, name: str):
return self.get_metalake().disable_catalog(name)
+
+ def list_job_templates(self) -> List[JobTemplate]:
+ """Lists all job templates in the current metalake.
+
+ Returns:
+ A list of JobTemplate objects representing the job templates in
the metalake.
+ """
+ return self.get_metalake().list_job_templates()
+
+ def register_job_template(self, job_template) -> None:
+ """Register a job template with the specified job template to
Gravitino. The registered
+ job template will be maintained in Gravitino, allowing it to be
executed later.
+
+ Args:
+ job_template: The job template to register.
+
+ Raises:
+ JobTemplateAlreadyExists: If a job template with the same name
already exists.
+ """
+ self.get_metalake().register_job_template(job_template)
+
+ def get_job_template(self, job_template_name: str) -> JobTemplate:
+ """Retrieves a job template by its name.
+
+ Args:
+ job_template_name: The name of the job template to retrieve.
+
+ Returns:
+ The JobTemplate object corresponding to the specified name.
+
+ Raises:
+ NoSuchJobTemplateException: If no job template with the specified
name exists.
+ """
+ return self.get_metalake().get_job_template(job_template_name)
+
+ def delete_job_template(self, job_template_name: str) -> bool:
+ """
+ Deletes a job template by its name. This will remove the job template
from Gravitino, and it
+ will no longer be available for execution. Only when all the jobs
associated with this job
+ template are completed, failed or cancelled, the job template can be
deleted successfully,
+ otherwise it will throw InUseException. Returns false if the job
template to be deleted does
+ not exist.
+
+ The deletion of a job template will also delete all the jobs
associated with this template.
+
+ Args:
+ job_template_name: The name of the job template to delete.
+
+ Returns:
+ bool: True if the job template was deleted successfully, False if
the job template
+ does not exist.
+
+ Raises:
+ InUseException: If there are still queued or started jobs
associated with this job template.
+ """
+ return self.get_metalake().delete_job_template(job_template_name)
+
+ def list_jobs(self, job_template_name: str = None) -> List[JobHandle]:
+ """Lists all the jobs in the current metalake.
+
+ Args:
+ job_template_name: Optional; if provided, filters the jobs by the
specified job template name.
+
+ Returns:
+ A list of JobHandle objects representing the jobs in the metalake.
+ """
+ return self.get_metalake().list_jobs(job_template_name)
+
+ def get_job(self, job_id: str) -> JobHandle:
+ """Retrieves a job by its ID.
+
+ Args:
+ job_id: The ID of the job to retrieve.
+
+ Returns:
+ The JobHandle object corresponding to the specified job ID.
+
+ Raises:
+ NoSuchJobException: If no job with the specified ID exists.
+ """
+ return self.get_metalake().get_job(job_id)
+
+ def run_job(self, job_template_name: str, job_conf: Dict[str, str]) ->
JobHandle:
+ """Runs a job using the specified job template and configuration.
+
+ Args:
+ job_template_name: The name of the job template to use for running
the job.
+ job_conf: A dictionary containing the configuration for the job.
+
+ Returns:
+ A JobHandle object representing the started job.
+
+ Raises:
+ NoSuchJobTemplateException: If no job template with the specified
name exists.
+ """
+ return self.get_metalake().run_job(job_template_name, job_conf)
+
+ def cancel_job(self, job_id: str) -> JobHandle:
+ """Cancels a job by its ID.
+
+ Args:
+ job_id: The ID of the job to cancel.
+
+ Returns:
+ The JobHandle object representing the cancelled job.
+
+ Raises:
+ NoSuchJobException: If no job with the specified ID exists.
+ """
+ return self.get_metalake().cancel_job(job_id)
diff --git a/clients/client-python/gravitino/client/gravitino_metalake.py
b/clients/client-python/gravitino/client/gravitino_metalake.py
index a4a62559e1..b85fa41c7b 100644
--- a/clients/client-python/gravitino/client/gravitino_metalake.py
+++ b/clients/client-python/gravitino/client/gravitino_metalake.py
@@ -20,16 +20,29 @@ from typing import List, Dict
from gravitino.api.catalog import Catalog
from gravitino.api.catalog_change import CatalogChange
+from gravitino.api.job.job_handle import JobHandle
+from gravitino.api.job.job_template import JobTemplate
+from gravitino.api.job.supports_jobs import SupportsJobs
from gravitino.client.dto_converters import DTOConverters
+from gravitino.client.generic_job_handle import GenericJobHandle
from gravitino.dto.metalake_dto import MetalakeDTO
from gravitino.dto.requests.catalog_create_request import CatalogCreateRequest
from gravitino.dto.requests.catalog_set_request import CatalogSetRequest
from gravitino.dto.requests.catalog_updates_request import
CatalogUpdatesRequest
+from gravitino.dto.requests.job_run_request import JobRunRequest
+from gravitino.dto.requests.job_template_register_request import (
+ JobTemplateRegisterRequest,
+)
from gravitino.dto.responses.catalog_list_response import CatalogListResponse
from gravitino.dto.responses.catalog_response import CatalogResponse
from gravitino.dto.responses.drop_response import DropResponse
from gravitino.dto.responses.entity_list_response import EntityListResponse
+from gravitino.dto.responses.job_list_response import JobListResponse
+from gravitino.dto.responses.job_response import JobResponse
+from gravitino.dto.responses.job_template_list_response import
JobTemplateListResponse
+from gravitino.dto.responses.job_template_response import JobTemplateResponse
from gravitino.exceptions.handlers.catalog_error_handler import
CATALOG_ERROR_HANDLER
+from gravitino.exceptions.handlers.job_error_handler import JOB_ERROR_HANDLER
from gravitino.rest.rest_utils import encode_string
from gravitino.utils import HTTPClient
@@ -37,7 +50,7 @@ from gravitino.utils import HTTPClient
logger = logging.getLogger(__name__)
-class GravitinoMetalake(MetalakeDTO):
+class GravitinoMetalake(MetalakeDTO, SupportsJobs):
"""
Gravitino Metalake is the top-level metadata repository for users. It
contains a list of catalogs
as sub-level metadata collections. With GravitinoMetalake, users can list,
create, load,
@@ -47,6 +60,8 @@ class GravitinoMetalake(MetalakeDTO):
rest_client: HTTPClient
API_METALAKES_CATALOGS_PATH = "api/metalakes/{}/catalogs/{}"
+ API_METALAKES_JOB_TEMPLATES_PATH = "api/metalakes/{}/jobs/templates"
+ API_METALAKES_JOB_RUNS_PATH = "api/metalakes/{}/jobs/runs"
def __init__(self, metalake: MetalakeDTO = None, client: HTTPClient =
None):
super().__init__(
@@ -255,3 +270,188 @@ class GravitinoMetalake(MetalakeDTO):
self.rest_client.patch(
url, json=catalog_disable_request,
error_handler=CATALOG_ERROR_HANDLER
)
+
+ def list_job_templates(self) -> List[JobTemplate]:
+ """List all the registered job templates in Gravitino.
+
+ Returns:
+ List of job templates.
+ """
+ params = {"details": "true"}
+ url =
self.API_METALAKES_JOB_TEMPLATES_PATH.format(encode_string(self.name()))
+ response = self.rest_client.get(
+ url, params=params, error_handler=JOB_ERROR_HANDLER
+ )
+ resp = JobTemplateListResponse.from_json(response.body,
infer_missing=True)
+ resp.validate()
+
+ return [
+ DTOConverters.from_job_template_dto(dto) for dto in
resp.job_templates()
+ ]
+
+ def register_job_template(self, job_template: JobTemplate) -> None:
+ """Register a job template with the specified job template to
Gravitino. The registered
+ job template will be maintained in Gravitino, allowing it to be
executed later.
+
+ Args:
+ job_template: The job template to register.
+
+ Raises:
+ JobTemplateAlreadyExists: If a job template with the same name
already exists.
+ """
+ url =
self.API_METALAKES_JOB_TEMPLATES_PATH.format(encode_string(self.name()))
+ req = JobTemplateRegisterRequest(
+ DTOConverters.to_job_template_dto(job_template)
+ )
+
+ self.rest_client.post(url, json=req, error_handler=JOB_ERROR_HANDLER)
+
+ def get_job_template(self, job_template_name: str) -> JobTemplate:
+ """Retrieves a job template by its name.
+
+ Args:
+ job_template_name: The name of the job template to retrieve.
+
+ Returns:
+ The job template if found, otherwise raises an exception.
+
+ Raises:
+ NoSuchJobTemplateException: If no job template with the specified
name exists.
+ """
+ if not job_template_name or not job_template_name.strip():
+ raise ValueError("Job template name cannot be null or empty")
+
+ url = (
+
f"{self.API_METALAKES_JOB_TEMPLATES_PATH}/{encode_string(self.name())}/"
+ f"{encode_string(job_template_name)}"
+ )
+ response = self.rest_client.get(url, error_handler=JOB_ERROR_HANDLER)
+ resp = JobTemplateResponse.from_json(response.body, infer_missing=True)
+ resp.validate()
+
+ return DTOConverters.from_job_template_dto(resp.job_template())
+
+ def delete_job_template(self, job_template_name: str) -> bool:
+ """
+ Deletes a job template by its name. This will remove the job template
from Gravitino, and it
+ will no longer be available for execution. Only when all the jobs
associated with this job
+ template are completed, failed or cancelled, the job template can be
deleted successfully,
+ otherwise it will throw InUseException. Returns false if the job
template to be deleted does
+ not exist.
+
+ The deletion of a job template will also delete all the jobs
associated with this template.
+
+ Args:
+ job_template_name: The name of the job template to delete.
+
+ Returns:
+ bool: True if the job template was deleted successfully, False if
the job template
+ does not exist.
+
+ Raises:
+ InUseException: If the job template is currently in use by any
jobs, it cannot be deleted.
+ """
+ if not job_template_name or not job_template_name.strip():
+ raise ValueError("Job template name cannot be null or empty")
+
+ url = (
+
f"{self.API_METALAKES_JOB_TEMPLATES_PATH}/{encode_string(self.name())}/"
+ f"{encode_string(job_template_name)}"
+ )
+ response = self.rest_client.delete(url,
error_handler=JOB_ERROR_HANDLER)
+
+ drop_response = DropResponse.from_json(response.body,
infer_missing=True)
+ drop_response.validate()
+
+ return drop_response.dropped()
+
+ def list_jobs(self, job_template_name: str = None) -> List[JobHandle]:
+ """List all the jobs under this metalake.
+
+ Args:
+ job_template_name: The name of the job template to filter jobs by.
If None, all jobs are listed.
+
+ Returns:
+ A list of JobHandle objects representing the jobs.
+ """
+ params = (
+ {"jobTemplateName": encode_string(job_template_name)}
+ if job_template_name
+ else {}
+ )
+ url =
self.API_METALAKES_JOB_RUNS_PATH.format(encode_string(self.name()))
+ response = self.rest_client.get(
+ url, params=params, error_handler=JOB_ERROR_HANDLER
+ )
+ resp = JobListResponse.from_json(response.body, infer_missing=True)
+ resp.validate()
+
+ return [GenericJobHandle(dto) for dto in resp.jobs()]
+
+ def get_job(self, job_id: str) -> JobHandle:
+ """Retrieves a job by its ID.
+
+ Args:
+ job_id: The ID of the job to retrieve.
+
+ Returns:
+ The JobHandle representing the job if found, otherwise raises an
exception.
+
+ Raises:
+ NoSuchJobException: If no job with the specified ID exists.
+ """
+ if not job_id or not job_id.strip():
+ raise ValueError("Job ID cannot be null or empty")
+
+ url =
f"{self.API_METALAKES_JOB_RUNS_PATH}/{encode_string(self.name())}/{encode_string(job_id)}"
+ response = self.rest_client.get(url, error_handler=JOB_ERROR_HANDLER)
+ resp = JobResponse.from_json(response.body, infer_missing=True)
+ resp.validate()
+
+ return GenericJobHandle(resp.job())
+
+ def run_job(self, job_template_name: str, job_conf: Dict[str, str]) ->
JobHandle:
+ """Runs a job based on the specified job template and configuration.
+
+ Args:
+ job_template_name: The name of the job template to use for running
the job.
+ job_conf: A dictionary containing the configuration for the job.
+
+ Returns:
+ A JobHandle representing the started job.
+
+ Raises:
+ NoSuchJobTemplateException: If no job template with the specified
name exists.
+ """
+ url =
f"{self.API_METALAKES_JOB_RUNS_PATH}/{encode_string(self.name())}"
+ request = JobRunRequest(job_template_name, job_conf)
+
+ response = self.rest_client.post(
+ url, json=request, error_handler=JOB_ERROR_HANDLER
+ )
+ resp = JobResponse.from_json(response.body, infer_missing=True)
+ resp.validate()
+
+ return GenericJobHandle(resp.job())
+
+ def cancel_job(self, job_id: str) -> JobHandle:
+ """Cancels a job by its ID.
+
+ Args:
+ job_id: The ID of the job to cancel.
+
+ Returns:
+ A JobHandle representing the cancelled job.
+
+ Raises:
+ NoSuchJobException: If no job with the specified ID exists.
+ """
+ if not job_id or not job_id.strip():
+ raise ValueError("Job ID cannot be null or empty")
+
+ url =
f"{self.API_METALAKES_JOB_RUNS_PATH}/{encode_string(self.name())}/{encode_string(job_id)}"
+ response = self.rest_client.post(url, error_handler=JOB_ERROR_HANDLER)
+ resp = JobResponse.from_json(response.body, infer_missing=True)
+ resp.validate()
+
+ return GenericJobHandle(resp.job())
diff --git a/clients/client-python/gravitino/constants/error.py
b/clients/client-python/gravitino/constants/error.py
index 7ff56c6fee..27e6031451 100644
--- a/clients/client-python/gravitino/constants/error.py
+++ b/clients/client-python/gravitino/constants/error.py
@@ -26,6 +26,9 @@ from gravitino.exceptions.base import (
AlreadyExistsException,
NotEmptyException,
UnsupportedOperationException,
+ ForbiddenException,
+ NotInUseException,
+ InUseException,
)
@@ -56,6 +59,9 @@ class ErrorConstants(IntEnum):
# Error codes for connect to catalog failed.
CONNECTION_FAILED_CODE = 1007
+ # Error codes for forbidden operation.
+ FORBIDDEN_CODE = 1008
+
# Error codes for operation on a no in use entity.
NOT_IN_USE_CODE = 1009
@@ -75,6 +81,9 @@ EXCEPTION_MAPPING = {
NotEmptyException: ErrorConstants.NON_EMPTY_CODE,
UnsupportedOperationException: ErrorConstants.UNSUPPORTED_OPERATION_CODE,
ConnectionFailedException: ErrorConstants.CONNECTION_FAILED_CODE,
+ ForbiddenException: ErrorConstants.FORBIDDEN_CODE,
+ NotInUseException: ErrorConstants.NOT_IN_USE_CODE,
+ InUseException: ErrorConstants.IN_USE_CODE,
}
ERROR_CODE_MAPPING = {v: k for k, v in EXCEPTION_MAPPING.items()}
diff --git a/clients/client-python/gravitino/api/job/job_handle.py
b/clients/client-python/gravitino/dto/job/__init__.py
similarity index 63%
copy from clients/client-python/gravitino/api/job/job_handle.py
copy to clients/client-python/gravitino/dto/job/__init__.py
index d1b0ba734a..13a83393a9 100644
--- a/clients/client-python/gravitino/api/job/job_handle.py
+++ b/clients/client-python/gravitino/dto/job/__init__.py
@@ -14,27 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-from abc import ABC, abstractmethod
-from enum import Enum
-
-
-class JobHandle(ABC):
- class Status(Enum):
- QUEUED = "QUEUED"
- STARTED = "STARTED"
- FAILED = "FAILED"
- SUCCEEDED = "SUCCEEDED"
- CANCELLED = "CANCELLED"
-
- @abstractmethod
- def job_template_name(self) -> str:
- pass
-
- @abstractmethod
- def job_id(self) -> str:
- pass
-
- @abstractmethod
- def job_status(self) -> Status:
- pass
diff --git a/clients/client-python/gravitino/dto/job/job_dto.py
b/clients/client-python/gravitino/dto/job/job_dto.py
new file mode 100644
index 0000000000..eab42da57c
--- /dev/null
+++ b/clients/client-python/gravitino/dto/job/job_dto.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from dataclasses_json import config, DataClassJsonMixin
+
+from gravitino.api.job.job_handle import JobHandle
+from gravitino.dto.audit_dto import AuditDTO
+
+
+@dataclass
+class JobDTO(DataClassJsonMixin):
+ """Data transfer object representing a Job."""
+
+ _job_id: str = field(metadata=config(field_name="jobId"))
+ _job_template_name: str =
field(metadata=config(field_name="jobTemplateName"))
+ _status: JobHandle.Status = field(
+ metadata=config(
+ field_name="status",
+ encoder=JobHandle.Status.job_status_serialize,
+ decoder=JobHandle.Status.job_status_deserialize,
+ )
+ )
+ _audit: AuditDTO = field(metadata=config(field_name="audit"))
+
+ def job_id(self) -> str:
+ """Returns the job ID."""
+ return self._job_id
+
+ def job_template_name(self) -> str:
+ """Returns the job template name."""
+ return self._job_template_name
+
+ def status(self) -> JobHandle.Status:
+ """Returns the status of the job."""
+ return self._status
+
+ def audit(self) -> AuditDTO:
+ """Returns the audit information of the job."""
+ return self._audit
+
+ def validate(self) -> None:
+ """Validates the JobDTO, ensuring required fields are present and
non-empty."""
+ if self._job_id is None or not self._job_id.strip():
+ raise ValueError('"jobId" is required and cannot be empty')
+ if self._job_template_name is None or not
self._job_template_name.strip():
+ raise ValueError('"jobTemplateName" is required and cannot be
empty')
+ if self._status is None:
+ raise ValueError('"status" must not be None')
+ if self._audit is None:
+ raise ValueError('"audit" must not be None')
diff --git a/clients/client-python/gravitino/dto/job/job_template_dto.py
b/clients/client-python/gravitino/dto/job/job_template_dto.py
new file mode 100644
index 0000000000..6ed6b1a6cc
--- /dev/null
+++ b/clients/client-python/gravitino/dto/job/job_template_dto.py
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from abc import ABC
+from typing import Dict, List, Optional, Type
+from dataclasses import dataclass, field
+
+from dataclasses_json import config, DataClassJsonMixin
+
+from gravitino.api.job.job_template import JobType
+from gravitino.dto.audit_dto import AuditDTO
+
+
+@dataclass
+class JobTemplateDTO(DataClassJsonMixin, ABC):
+ """Represents a Job Template Data Transfer Object (DTO)."""
+
+ # pylint: disable=R0902
+ _job_type: JobType = field(
+ metadata=config(
+ field_name="jobType",
+ encoder=JobType.job_type_serialize,
+ decoder=JobType.job_type_deserialize,
+ )
+ )
+ _name: str = field(metadata=config(field_name="name"))
+ _executable: str = field(metadata=config(field_name="executable"))
+ _comment: Optional[str] = field(default=None,
metadata=config(field_name="comment"))
+ _arguments: Optional[List[str]] = field(
+ default=None, metadata=config(field_name="arguments")
+ )
+ _environments: Optional[Dict[str, str]] = field(
+ default=None, metadata=config(field_name="environments")
+ )
+ _custom_fields: Optional[Dict[str, str]] = field(
+ default=None, metadata=config(field_name="customFields")
+ )
+ _audit: Optional[AuditDTO] = field(
+ default=None, metadata=config(field_name="audit")
+ )
+
+ def job_type(self) -> JobType:
+ """Returns the type of the job."""
+ return self._job_type
+
+ def name(self) -> str:
+ """Returns the name of the job template."""
+ return self._name
+
+ def comment(self) -> Optional[str]:
+ """Returns the comment associated with the job template."""
+ return self._comment
+
+ def executable(self) -> str:
+ """Returns the executable associated with the job template."""
+ return self._executable
+
+ def arguments(self) -> Optional[List[str]]:
+ """Returns the list of arguments for the job template."""
+ return self._arguments
+
+ def environments(self) -> Optional[Dict[str, str]]:
+ """Returns the environment variables for the job template."""
+ return self._environments
+
+ def custom_fields(self) -> Optional[Dict[str, str]]:
+ """Returns custom fields for the job template."""
+ return self._custom_fields
+
+ def audit_info(self) -> Optional[AuditDTO]:
+ """Returns the audit information for the job template."""
+ return self._audit
+
+ def validate(self) -> None:
+ """Validates the JobTemplateDTO. Ensures that required fields are not
null or empty."""
+ if self._job_type is None:
+ raise ValueError('"jobType" is required and cannot be None')
+
+ if self._name is None or not self._name.strip():
+ raise ValueError('"name" is required and cannot be empty')
+
+ if self._executable is None or not self._executable.strip():
+ raise ValueError('"executable" is required and cannot be empty')
+
+ @classmethod
+ def from_json(
+ cls, s: str, infer_missing: bool = False, **kwargs
+ ) -> "JobTemplateDTO":
+ """Creates a JobTemplateDTO from a JSON string."""
+ data = json.loads(s)
+ job_type = JobType.job_type_deserialize(data.get("jobType"))
+ subclass = JOB_TYPE_TEMPLATE_MAPPING.get(job_type)
+ if not subclass:
+ raise ValueError(f"Unsupported job type: {job_type}")
+ return subclass.from_dict(data, infer_missing=infer_missing)
+
+
+JOB_TYPE_TEMPLATE_MAPPING: Dict[JobType, Type["JobTemplateDTO"]] = {}
+
+
+def register_job_template(job_type: JobType):
+ """
+ Decorator to register a subclass of JobTemplateDTO for a specific job type.
+
+ Args:
+ job_type (JobType): The job type to register the subclass for.
+
+ Returns:
+ Callable: The decorator function.
+ """
+
+ def decorator(subclass: Type["JobTemplateDTO"]):
+ JOB_TYPE_TEMPLATE_MAPPING[job_type] = subclass
+ return subclass
+
+ return decorator
diff --git a/clients/client-python/gravitino/dto/job/shell_job_template_dto.py
b/clients/client-python/gravitino/dto/job/shell_job_template_dto.py
new file mode 100644
index 0000000000..40e9c911aa
--- /dev/null
+++ b/clients/client-python/gravitino/dto/job/shell_job_template_dto.py
@@ -0,0 +1,40 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+
+from typing import List, Optional
+from dataclasses import dataclass, field
+
+from dataclasses_json import config
+
+from gravitino.api.job.job_template import JobType
+from gravitino.dto.job.job_template_dto import JobTemplateDTO,
register_job_template
+
+
+@register_job_template(JobType.SHELL)
+@dataclass
+class ShellJobTemplateDTO(JobTemplateDTO):
+ """Represents a Shell Job Template Data Transfer Object (DTO)."""
+
+ _scripts: Optional[List[str]] = field(
+ default=None, metadata=config(field_name="scripts")
+ )
+
+ def scripts(self) -> Optional[List[str]]:
+ """Returns the scripts associated with the shell job template."""
+ return self._scripts
diff --git a/clients/client-python/gravitino/dto/job/spark_job_template_dto.py
b/clients/client-python/gravitino/dto/job/spark_job_template_dto.py
new file mode 100644
index 0000000000..8eb0a71c6c
--- /dev/null
+++ b/clients/client-python/gravitino/dto/job/spark_job_template_dto.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, List, Optional
+from dataclasses import dataclass, field
+
+from dataclasses_json import config
+
+from .job_template_dto import JobTemplateDTO, register_job_template
+from ...api.job.job_template import JobType
+
+
+@register_job_template(JobType.SPARK)
+@dataclass
+class SparkJobTemplateDTO(JobTemplateDTO):
+ """Represents a Spark Job Template Data Transfer Object (DTO)."""
+
+ _class_name: str = field(default=None,
metadata=config(field_name="className"))
+ _jars: Optional[List[str]] = field(default=None,
metadata=config(field_name="jars"))
+ _files: Optional[List[str]] = field(
+ default=None, metadata=config(field_name="files")
+ )
+ _archives: Optional[List[str]] = field(
+ default=None, metadata=config(field_name="archives")
+ )
+ _configs: Optional[Dict[str, str]] = field(
+ default=None, metadata=config(field_name="configs")
+ )
+
+ def class_name(self) -> str:
+ """Returns the class name of the Spark job."""
+ return self._class_name
+
+ def jars(self) -> Optional[List[str]]:
+ """Returns the list of JAR files associated with this Spark job
template."""
+ return self._jars
+
+ def files(self) -> Optional[List[str]]:
+ """Returns the list of files associated with this Spark job
template."""
+ return self._files
+
+ def archives(self) -> Optional[List[str]]:
+ """Returns the list of archives associated with this Spark job
template."""
+ return self._archives
+
+ def configs(self) -> Optional[Dict[str, str]]:
+ """Returns the configuration properties for the Spark job."""
+ return self._configs
+
+ def validate(self) -> None:
+ """Validates the SparkJobTemplateDTO. Ensures that required fields are
not null or empty."""
+ super().validate()
+
+ if self._class_name is None or not self._class_name.strip():
+ raise ValueError('"className" is required and cannot be empty')
diff --git a/clients/client-python/gravitino/dto/requests/job_run_request.py
b/clients/client-python/gravitino/dto/requests/job_run_request.py
new file mode 100644
index 0000000000..659b9e940a
--- /dev/null
+++ b/clients/client-python/gravitino/dto/requests/job_run_request.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import Optional, Dict
+
+from dataclasses_json import config
+
+from gravitino.rest.rest_message import RESTRequest
+
+
+@dataclass
+class JobRunRequest(RESTRequest):
+ """Represents a request to run a job using a specified job template."""
+
+ _job_template_name: str =
field(metadata=config(field_name="jobTemplateName"))
+ _job_conf: Optional[Dict[str, str]] = field(
+ default=None, metadata=config(field_name="jobConf")
+ )
+
+ def job_template_name(self) -> str:
+ """Returns the job template name to use for running the job."""
+ return self._job_template_name
+
+ def job_conf(self) -> Optional[Dict[str, str]]:
+ """Returns the job configuration parameters."""
+ return self._job_conf
+
+ def validate(self) -> None:
+ """Validates the request. Raises ValueError if invalid."""
+ if self._job_template_name is None or not
self._job_template_name.strip():
+ raise ValueError('"jobTemplateName" is required and cannot be
empty')
diff --git
a/clients/client-python/gravitino/dto/requests/job_template_register_request.py
b/clients/client-python/gravitino/dto/requests/job_template_register_request.py
new file mode 100644
index 0000000000..6bf76e99b9
--- /dev/null
+++
b/clients/client-python/gravitino/dto/requests/job_template_register_request.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import Optional
+
+from dataclasses_json import config
+
+from gravitino.dto.job.job_template_dto import JobTemplateDTO
+from gravitino.rest.rest_message import RESTRequest
+
+
+@dataclass
+class JobTemplateRegisterRequest(RESTRequest):
+ """Represents a request to register a job template."""
+
+ _job_template: JobTemplateDTO =
field(metadata=config(field_name="jobTemplate"))
+
+ def job_template(self) -> Optional[JobTemplateDTO]:
+ """Returns the job template to register."""
+ return self._job_template
+
+ def validate(self):
+ """Validates the request.
+
+ Raises:
+ ValueError: If the request is invalid.
+ """
+ if self._job_template is None:
+ raise ValueError('"jobTemplate" is required and cannot be None')
+
+ self._job_template.validate()
diff --git a/clients/client-python/gravitino/dto/responses/job_list_response.py
b/clients/client-python/gravitino/dto/responses/job_list_response.py
new file mode 100644
index 0000000000..3f42e7c46a
--- /dev/null
+++ b/clients/client-python/gravitino/dto/responses/job_list_response.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import List
+
+from dataclasses_json import config
+from gravitino.exceptions.base import IllegalArgumentException
+
+from .base_response import BaseResponse
+from ..job.job_dto import JobDTO
+
+
+@dataclass
+class JobListResponse(BaseResponse):
+ """Represents a response containing a list of jobs."""
+
+ _jobs: List[JobDTO] = field(metadata=config(field_name="jobs"))
+
+ def validate(self):
+ """Validates the response data and contained jobs."""
+ super().validate()
+
+ if self._jobs is None:
+ raise IllegalArgumentException("jobs must not be None")
+
+ for job in self._jobs:
+ if job is not None:
+ job.validate()
+
+ def jobs(self) -> List[JobDTO]:
+ """Returns the list of jobs from the response."""
+ return self._jobs
diff --git a/clients/client-python/gravitino/api/job/job_handle.py
b/clients/client-python/gravitino/dto/responses/job_response.py
similarity index 52%
copy from clients/client-python/gravitino/api/job/job_handle.py
copy to clients/client-python/gravitino/dto/responses/job_response.py
index d1b0ba734a..7beb06fe13 100644
--- a/clients/client-python/gravitino/api/job/job_handle.py
+++ b/clients/client-python/gravitino/dto/responses/job_response.py
@@ -15,26 +15,30 @@
# specific language governing permissions and limitations
# under the License.
-from abc import ABC, abstractmethod
-from enum import Enum
+from dataclasses import dataclass, field
+from dataclasses_json import config
+from gravitino.exceptions.base import IllegalArgumentException
-class JobHandle(ABC):
- class Status(Enum):
- QUEUED = "QUEUED"
- STARTED = "STARTED"
- FAILED = "FAILED"
- SUCCEEDED = "SUCCEEDED"
- CANCELLED = "CANCELLED"
+from .base_response import BaseResponse
+from ..job.job_dto import JobDTO
- @abstractmethod
- def job_template_name(self) -> str:
- pass
- @abstractmethod
- def job_id(self) -> str:
- pass
+@dataclass
+class JobResponse(BaseResponse):
+ """Represents a response containing a single job."""
- @abstractmethod
- def job_status(self) -> Status:
- pass
+ _job: JobDTO = field(metadata=config(field_name="job"))
+
+ def validate(self):
+ """Validates the response data and contained job."""
+ super().validate()
+
+ if self._job is None:
+ raise IllegalArgumentException('"job" must not be None')
+
+ self._job.validate()
+
+ def job(self) -> JobDTO:
+ """Returns the job from the response."""
+ return self._job
diff --git
a/clients/client-python/gravitino/dto/responses/job_template_list_response.py
b/clients/client-python/gravitino/dto/responses/job_template_list_response.py
new file mode 100644
index 0000000000..bf6973c920
--- /dev/null
+++
b/clients/client-python/gravitino/dto/responses/job_template_list_response.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from dataclasses import dataclass, field
+from typing import List
+
+from dataclasses_json import config
+from gravitino.exceptions.base import IllegalArgumentException
+
+from .base_response import BaseResponse
+from ..job.job_template_dto import JobTemplateDTO
+
+
+@dataclass
+class JobTemplateListResponse(BaseResponse):
+ """Represents a response containing a list of job templates."""
+
+ _job_templates: List[JobTemplateDTO] = field(
+ metadata=config(field_name="jobTemplates")
+ )
+
+ def validate(self):
+ """Validates the response data.
+
+ Raises:
+ IllegalArgumentException: If the job templates list is not set or
contains invalid items.
+ """
+ super().validate()
+
+ if self._job_templates is None:
+ raise IllegalArgumentException("jobTemplates must not be None")
+
+ for job_template in self._job_templates:
+ if job_template is not None:
+ job_template.validate()
+
+ def job_templates(self) -> List[JobTemplateDTO]:
+ """Returns the list of job templates from the response."""
+ return self._job_templates
+
+ @classmethod
+ def from_json(
+ cls, s: str, infer_missing: bool = False, **kwargs
+ ) -> "JobTemplateListResponse":
+ """Deserialize JSON string into a JobTemplateListResponse object."""
+
+ data = json.loads(s)
+
+ # Deserialize each job template using JobTemplateDTO.from_json
+ job_templates = [
+ JobTemplateDTO.from_json(
+ json.dumps(item), infer_missing=infer_missing, **kwargs
+ )
+ for item in data.get("jobTemplates", [])
+ ]
+
+ return cls(_job_templates=job_templates, _code=data["code"])
diff --git
a/clients/client-python/gravitino/dto/responses/job_template_response.py
b/clients/client-python/gravitino/dto/responses/job_template_response.py
new file mode 100644
index 0000000000..ebeb560e2b
--- /dev/null
+++ b/clients/client-python/gravitino/dto/responses/job_template_response.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from dataclasses import dataclass, field
+
+from dataclasses_json import config
+from gravitino.exceptions.base import IllegalArgumentException
+
+from .base_response import BaseResponse
+from ..job.job_template_dto import JobTemplateDTO
+
+
+@dataclass
+class JobTemplateResponse(BaseResponse):
+ """Represents a response containing a single job template."""
+
+ _job_template: JobTemplateDTO =
field(metadata=config(field_name="jobTemplate"))
+
+ def validate(self):
+ """Validates the response data.
+
+ Raises:
+ IllegalArgumentException: If the job template is not set or
invalid.
+ """
+ super().validate()
+
+ if self._job_template is None:
+ raise IllegalArgumentException('"jobTemplate" must not be None')
+
+ self._job_template.validate()
+
+ def job_template(self) -> JobTemplateDTO:
+ """Returns the job template from the response."""
+ return self._job_template
+
+ @classmethod
+ def from_json(
+ cls, s: str, infer_missing: bool = False, **kwargs
+ ) -> "JobTemplateResponse":
+ """Deserialize JSON string into a JobTemplateResponse object."""
+ data = json.loads(s)
+ job_template_data = JobTemplateDTO.from_json(
+ json.dumps(data["jobTemplate"]), infer_missing=infer_missing,
**kwargs
+ )
+ return cls(_job_template=job_template_data, _code=0)
diff --git a/clients/client-python/gravitino/exceptions/base.py
b/clients/client-python/gravitino/exceptions/base.py
index ce2f0d536a..0193ad78fe 100644
--- a/clients/client-python/gravitino/exceptions/base.py
+++ b/clients/client-python/gravitino/exceptions/base.py
@@ -187,3 +187,7 @@ class NoSuchJobTemplateException(NotFoundException):
class NoSuchJobException(NotFoundException):
"""An exception thrown when a job with specified name is not found."""
+
+
+class ForbiddenException(GravitinoRuntimeException):
+ """An exception thrown when a user is forbidden to perform an action."""
diff --git
a/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py
b/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py
new file mode 100644
index 0000000000..fc96e4c2b8
--- /dev/null
+++ b/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from gravitino.constants.error import ErrorConstants
+from gravitino.dto.responses.error_response import ErrorResponse
+from gravitino.exceptions.handlers.rest_error_handler import RestErrorHandler
+from gravitino.exceptions.base import (
+ NoSuchMetalakeException,
+ JobTemplateAlreadyExistsException,
+ NoSuchJobTemplateException,
+ NoSuchJobException,
+ InUseException,
+ MetalakeNotInUseException,
+)
+
+
+class JobErrorHandler(RestErrorHandler):
+ def handle(self, error_response: ErrorResponse):
+ error_message = error_response.format_error_message()
+ code = error_response.code()
+ exception_type = error_response.type()
+
+ if code == ErrorConstants.NOT_FOUND_CODE:
+ if exception_type == NoSuchMetalakeException.__name__:
+ raise NoSuchMetalakeException(error_message)
+ if exception_type == NoSuchJobTemplateException.__name__:
+ raise NoSuchJobTemplateException(error_message)
+ if exception_type == NoSuchJobException.__name__:
+ raise NoSuchJobException(error_message)
+
+ if code == ErrorConstants.ALREADY_EXISTS_CODE:
+ raise JobTemplateAlreadyExistsException(error_message)
+
+ if code == ErrorConstants.IN_USE_CODE:
+ raise InUseException(error_message)
+
+ if code == ErrorConstants.NOT_IN_USE_CODE:
+ if exception_type == MetalakeNotInUseException.__name__:
+ raise MetalakeNotInUseException(error_message)
+
+ super().handle(error_response)
+
+
+JOB_ERROR_HANDLER = JobErrorHandler()
diff --git a/clients/client-python/gravitino/api/job/job_handle.py
b/clients/client-python/tests/unittests/dto/job/__init__.py
similarity index 63%
copy from clients/client-python/gravitino/api/job/job_handle.py
copy to clients/client-python/tests/unittests/dto/job/__init__.py
index d1b0ba734a..13a83393a9 100644
--- a/clients/client-python/gravitino/api/job/job_handle.py
+++ b/clients/client-python/tests/unittests/dto/job/__init__.py
@@ -14,27 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-from abc import ABC, abstractmethod
-from enum import Enum
-
-
-class JobHandle(ABC):
- class Status(Enum):
- QUEUED = "QUEUED"
- STARTED = "STARTED"
- FAILED = "FAILED"
- SUCCEEDED = "SUCCEEDED"
- CANCELLED = "CANCELLED"
-
- @abstractmethod
- def job_template_name(self) -> str:
- pass
-
- @abstractmethod
- def job_id(self) -> str:
- pass
-
- @abstractmethod
- def job_status(self) -> Status:
- pass
diff --git
a/clients/client-python/tests/unittests/dto/job/test_job_template_dto_serde.py
b/clients/client-python/tests/unittests/dto/job/test_job_template_dto_serde.py
new file mode 100644
index 0000000000..9f05e45692
--- /dev/null
+++
b/clients/client-python/tests/unittests/dto/job/test_job_template_dto_serde.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+
+from gravitino.api.job.job_template import JobType
+from gravitino.dto.job.job_template_dto import JobTemplateDTO
+from gravitino.dto.job.shell_job_template_dto import ShellJobTemplateDTO
+from gravitino.dto.job.spark_job_template_dto import SparkJobTemplateDTO
+
+
+class TestJobTemplateDTOSerDe(unittest.TestCase):
+
+ def test_shell_job_template_dto(self):
+ shell_job = ShellJobTemplateDTO(
+ _job_type=JobType.SHELL,
+ _name="test_shell_job",
+ _comment="Test shell job",
+ _executable="/path/to/script.sh",
+ _arguments=["arg1", "arg2"],
+ _environments={"ENV_VAR": "value"},
+ _custom_fields={"custom_field1": "value1"},
+ _scripts=["/path/to/script1.sh", "/path/to/script2.sh"],
+ _audit=None,
+ )
+ shell_job.validate()
+
+ json_str = shell_job.to_json()
+ shell_job_template_dto = JobTemplateDTO.from_json(json_str,
infer_missing=True)
+ self.assertIsInstance(shell_job_template_dto, ShellJobTemplateDTO)
+ self.assertEqual(shell_job, shell_job_template_dto)
+
+ shell_job = ShellJobTemplateDTO(
+ _job_type=JobType.SHELL,
+ _name="test_shell_job_1",
+ _executable="/path/to/script1.sh",
+ _audit=None,
+ )
+ shell_job.validate()
+
+ json_str = shell_job.to_json()
+ shell_job_template_dto = JobTemplateDTO.from_json(json_str,
infer_missing=True)
+ self.assertIsInstance(shell_job_template_dto, ShellJobTemplateDTO)
+ self.assertEqual(shell_job, shell_job_template_dto)
+
+ def test_spark_job_template_dto(self):
+ spark_job = SparkJobTemplateDTO(
+ _job_type=JobType.SPARK,
+ _name="test_spark_job",
+ _comment="Test spark job",
+ _executable="/path/to/spark-demo.jar",
+ _arguments=["input", "output"],
+ _environments={"ENV_VAR": "value"},
+ _custom_fields={"custom_field1": "value1"},
+ _class_name="com.example.SparkDemo",
+ _jars=["/path/to/jar1", "/path/to/jar2"],
+ _files=["/path/to/file1", "/path/to/file2"],
+ _archives=["/path/to/a.zip", "/path/to/b.zip"],
+ _configs={"k1": "v1"},
+ _audit=None,
+ )
+ spark_job.validate()
+
+ json_str = spark_job.to_json()
+ spark_job_template_dto = JobTemplateDTO.from_json(json_str,
infer_missing=True)
+ self.assertIsInstance(spark_job_template_dto, SparkJobTemplateDTO)
+ self.assertEqual(spark_job, spark_job_template_dto)
+
+ spark_job = SparkJobTemplateDTO(
+ _job_type=JobType.SPARK,
+ _name="test_spark_job",
+ _comment="Test spark job",
+ _executable="/path/to/spark-demo.jar",
+ _class_name="com.example.SparkDemo",
+ _audit=None,
+ )
+ spark_job.validate()
+
+ json_str = spark_job.to_json()
+ spark_job_template_dto = JobTemplateDTO.from_json(json_str,
infer_missing=True)
+ self.assertIsInstance(spark_job_template_dto, SparkJobTemplateDTO)
+ self.assertEqual(spark_job, spark_job_template_dto)
diff --git a/clients/client-python/tests/unittests/mock_base.py
b/clients/client-python/tests/unittests/mock_base.py
index fb30715441..9421d9d1fd 100644
--- a/clients/client-python/tests/unittests/mock_base.py
+++ b/clients/client-python/tests/unittests/mock_base.py
@@ -42,7 +42,9 @@ def mock_load_metalake():
_properties={"k": "v"},
_audit=audit_dto,
)
- return GravitinoMetalake(metalake_dto)
+ return GravitinoMetalake(
+ metalake_dto, HTTPClient("http://localhost:9090", is_debug=True)
+ )
def mock_load_catalog(name: str):
diff --git a/clients/client-python/tests/unittests/test_supports_jobs.py
b/clients/client-python/tests/unittests/test_supports_jobs.py
new file mode 100644
index 0000000000..697c2a54b7
--- /dev/null
+++ b/clients/client-python/tests/unittests/test_supports_jobs.py
@@ -0,0 +1,298 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from http.client import HTTPResponse
+from unittest.mock import Mock, patch
+
+from gravitino import GravitinoClient
+from gravitino.api.job.job_handle import JobHandle
+from gravitino.api.job.job_template import JobType
+from gravitino.api.job.shell_job_template import ShellJobTemplate
+from gravitino.api.job.spark_job_template import SparkJobTemplate
+from gravitino.dto.audit_dto import AuditDTO
+from gravitino.dto.job.job_dto import JobDTO
+from gravitino.dto.job.shell_job_template_dto import ShellJobTemplateDTO
+from gravitino.dto.job.spark_job_template_dto import SparkJobTemplateDTO
+from gravitino.dto.responses.base_response import BaseResponse
+from gravitino.dto.responses.drop_response import DropResponse
+from gravitino.dto.responses.job_list_response import JobListResponse
+from gravitino.dto.responses.job_response import JobResponse
+from gravitino.dto.responses.job_template_list_response import
JobTemplateListResponse
+from gravitino.dto.responses.job_template_response import JobTemplateResponse
+from gravitino.utils import Response
+from tests.unittests import mock_base
+
+
+@mock_base.mock_data
+class TestSupportsJobs(unittest.TestCase):
+ _metalake_name = "metalake_demo"
+
+ def test_list_job_templates(self, *mock_methods):
+
+ gravitino_client = GravitinoClient(
+ uri="http://localhost:8090",
+ metalake_name=self._metalake_name,
+ )
+
+ templates = [self._new_shell_job_template(),
self._new_spark_job_template()]
+ template_dtos = [
+ self._new_shell_job_template_dto(templates[0]),
+ self._new_spark_job_template_dto(templates[1]),
+ ]
+ job_template_list_resp = JobTemplateListResponse(
+ _job_templates=template_dtos, _code=0
+ )
+ json_str = job_template_list_resp.to_json()
+ mock_resp = self._mock_http_response(json_str)
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
return_value=mock_resp
+ ):
+ result_templates = gravitino_client.list_job_templates()
+ self.assertEqual(templates, result_templates)
+
+ # test with empty response
+ job_template_list_resp = JobTemplateListResponse(_job_templates=[],
_code=0)
+ json_str = job_template_list_resp.to_json()
+ mock_resp = self._mock_http_response(json_str)
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
return_value=mock_resp
+ ):
+ result_templates = gravitino_client.list_job_templates()
+ self.assertEqual([], result_templates)
+
+ def test_register_job_template(self, *mock_methods):
+ gravitino_client = GravitinoClient(
+ uri="http://localhost:8090",
+ metalake_name=self._metalake_name,
+ )
+
+ shell_template = self._new_shell_job_template()
+ resp = BaseResponse(_code=0)
+ mock_resp = self._mock_http_response(resp.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.post",
return_value=mock_resp
+ ):
+ gravitino_client.register_job_template(shell_template)
+
+ def test_get_job_template(self, *mock_methods):
+ gravitino_client = GravitinoClient(
+ uri="http://localhost:8090",
+ metalake_name=self._metalake_name,
+ )
+
+ shell_template = self._new_shell_job_template()
+ shell_template_dto = self._new_shell_job_template_dto(shell_template)
+ resp = JobTemplateResponse(_job_template=shell_template_dto, _code=0)
+ mock_resp = self._mock_http_response(resp.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
return_value=mock_resp
+ ):
+ result_template =
gravitino_client.get_job_template(shell_template.name)
+ self.assertEqual(shell_template, result_template)
+
+ def test_delete_job_template(self, *mock_methods):
+ gravitino_client = GravitinoClient(
+ uri="http://localhost:8090",
+ metalake_name=self._metalake_name,
+ )
+
+ shell_template = self._new_shell_job_template()
+ resp = DropResponse(_dropped=True, _code=0)
+ mock_resp = self._mock_http_response(resp.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.delete",
return_value=mock_resp
+ ):
+ result = gravitino_client.delete_job_template(shell_template.name)
+ self.assertTrue(result)
+
+ resp = DropResponse(_dropped=False, _code=0)
+ mock_resp = self._mock_http_response(resp.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.delete",
return_value=mock_resp
+ ):
+ result = gravitino_client.delete_job_template(shell_template.name)
+ self.assertFalse(result)
+
+ def test_list_jobs(self, *mock_methods):
+ gravitino_client = GravitinoClient(
+ uri="http://localhost:8090",
+ metalake_name=self._metalake_name,
+ )
+
+ job_template_name = "test_shell_job"
+ job_dtos = [
+ self._new_job_dto(job_template_name),
+ self._new_job_dto(job_template_name),
+ ]
+ resp = JobListResponse(_jobs=job_dtos, _code=0)
+ mock_resp = self._mock_http_response(resp.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
return_value=mock_resp
+ ):
+ result_jobs = gravitino_client.list_jobs(job_template_name)
+ self.assertEqual(len(job_dtos), len(result_jobs))
+ for i, job_dto in enumerate(job_dtos):
+ self._compare_job_handle(result_jobs[i], job_dto)
+
+ def test_run_job(self, *mock_methods):
+ gravitino_client = GravitinoClient(
+ uri="http://localhost:8090",
+ metalake_name=self._metalake_name,
+ )
+
+ job_template_name = "test_shell_job"
+ job_dto = self._new_job_dto(job_template_name)
+ resp = JobResponse(_job=job_dto, _code=0)
+ mock_resp = self._mock_http_response(resp.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.post",
return_value=mock_resp
+ ):
+ job_handle = gravitino_client.run_job(job_template_name, {})
+ self._compare_job_handle(job_handle, job_dto)
+
+ def test_get_job(self, *mock_methods):
+ gravitino_client = GravitinoClient(
+ uri="http://localhost:8090",
+ metalake_name=self._metalake_name,
+ )
+
+ job_template_name = "test_shell_job"
+ job_dto = self._new_job_dto(job_template_name)
+ resp = JobResponse(_job=job_dto, _code=0)
+ mock_resp = self._mock_http_response(resp.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
return_value=mock_resp
+ ):
+ job_handle = gravitino_client.get_job(job_dto.job_id())
+ self._compare_job_handle(job_handle, job_dto)
+
+ # test with invalid input
+ with self.assertRaises(ValueError):
+ gravitino_client.get_job("")
+
+ with self.assertRaises(ValueError):
+ gravitino_client.get_job(None)
+
+ def test_cancel_job(self, *mock_methods):
+ gravitino_client = GravitinoClient(
+ uri="http://localhost:8090",
+ metalake_name=self._metalake_name,
+ )
+
+ job_template_name = "test_shell_job"
+ job_dto = self._new_job_dto(job_template_name)
+ resp = JobResponse(_job=job_dto, _code=0)
+ mock_resp = self._mock_http_response(resp.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.post",
return_value=mock_resp
+ ):
+ job_handle = gravitino_client.cancel_job(job_dto.job_id())
+ self._compare_job_handle(job_handle, job_dto)
+
+ # test with invalid input
+ with self.assertRaises(ValueError):
+ gravitino_client.cancel_job("")
+
+ with self.assertRaises(ValueError):
+ gravitino_client.cancel_job(None)
+
+ def _new_shell_job_template(self):
+ return (
+ ShellJobTemplate.builder()
+ .with_name("test_shell_job")
+ .with_comment("This is a test shell job template")
+ .with_executable("/path/to/executable.sh")
+ .with_arguments(["arg1", "arg2"])
+ .with_environments({"ENV_VAR": "value"})
+ .with_custom_fields({"custom_field": "custom_value"})
+ .with_scripts(["/path/to/script1.sh", "/path/to/script2.sh"])
+ .build()
+ )
+
+ def _new_spark_job_template(self):
+ return (
+ SparkJobTemplate.builder()
+ .with_name("test_spark_job")
+ .with_comment("This is a test spark job template")
+ .with_executable("/path/to/spark-demo.jar")
+ .with_class_name("com.example.SparkJob")
+ .with_jars(["/path/to/jar1.jar", "/path/to/jar2.jar"])
+ .with_files(["/path/to/file1.txt", "/path/to/file2.txt"])
+ .with_archives(["/path/to/archive1.zip", "/path/to/archive2.zip"])
+ .with_configs({"spark.executor.memory": "2g",
"spark.driver.memory": "1g"})
+ .build()
+ )
+
+ def _new_shell_job_template_dto(self, template: ShellJobTemplate):
+ return ShellJobTemplateDTO(
+ _job_type=JobType.SHELL,
+ _name=template.name,
+ _comment=template.comment,
+ _executable=template.executable,
+ _arguments=template.arguments,
+ _environments=template.environments,
+ _custom_fields=template.custom_fields,
+ _scripts=template.scripts,
+ )
+
+ def _new_spark_job_template_dto(self, template: SparkJobTemplate):
+ return SparkJobTemplateDTO(
+ _job_type=JobType.SPARK,
+ _name=template.name,
+ _comment=template.comment,
+ _executable=template.executable,
+ _arguments=template.arguments,
+ _environments=template.environments,
+ _custom_fields=template.custom_fields,
+ _class_name=template.class_name,
+ _jars=template.jars,
+ _files=template.files,
+ _archives=template.archives,
+ _configs=template.configs,
+ )
+
+ def _mock_http_response(self, json_str: str):
+ mock_http_resp = Mock(HTTPResponse)
+ mock_http_resp.getcode.return_value = 200
+ mock_http_resp.read.return_value = json_str
+ mock_http_resp.info.return_value = None
+ mock_http_resp.url = None
+ mock_resp = Response(mock_http_resp)
+ return mock_resp
+
+ def _new_job_dto(self, job_template_name: str) -> JobDTO:
+ return JobDTO(
+ _job_id="job-123",
+ _job_template_name=job_template_name,
+ _status=JobHandle.Status.QUEUED,
+ _audit=AuditDTO(_creator="test",
_create_time="2023-10-01T00:00:00Z"),
+ )
+
+ def _compare_job_handle(self, job_handle: JobHandle, job_dto: JobDTO):
+ self.assertEqual(job_handle.job_id(), job_dto.job_id())
+ self.assertEqual(job_handle.job_template_name(),
job_dto.job_template_name())
+ self.assertEqual(job_handle.job_status(), job_dto.status())