This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ac29c0557 [#7759]feat(client-java): Add Java client for Job System 
(#7944)
0ac29c0557 is described below

commit 0ac29c055727a60936347aa344faf545c1c8f03d
Author: Jerry Shao <[email protected]>
AuthorDate: Fri Aug 8 16:28:26 2025 +0800

    [#7759]feat(client-java): Add Java client for Job System (#7944)
    
    ### What changes were proposed in this pull request?
    
    This PR adds the java client support for job system.
    
    ### Why are the changes needed?
    
    Fix: #7759
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO.
    
    ### How was this patch tested?
    
    Add UTs to cover the code.
---
 .../org/apache/gravitino/client/DTOConverters.java |  41 +++
 .../org/apache/gravitino/client/ErrorHandlers.java |  65 ++++
 .../apache/gravitino/client/GenericJobHandle.java  |  47 +++
 .../apache/gravitino/client/GravitinoClient.java   |  56 +++-
 .../apache/gravitino/client/GravitinoMetalake.java | 173 ++++++++++-
 .../apache/gravitino/client/TestSupportsJobs.java  | 333 +++++++++++++++++++++
 6 files changed, 712 insertions(+), 3 deletions(-)

diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
index b33c27a9e6..9721dabba2 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
@@ -35,6 +35,9 @@ import org.apache.gravitino.dto.CatalogDTO;
 import org.apache.gravitino.dto.MetalakeDTO;
 import org.apache.gravitino.dto.authorization.PrivilegeDTO;
 import org.apache.gravitino.dto.authorization.SecurableObjectDTO;
+import org.apache.gravitino.dto.job.JobTemplateDTO;
+import org.apache.gravitino.dto.job.ShellJobTemplateDTO;
+import org.apache.gravitino.dto.job.SparkJobTemplateDTO;
 import org.apache.gravitino.dto.requests.CatalogUpdateRequest;
 import org.apache.gravitino.dto.requests.FilesetUpdateRequest;
 import org.apache.gravitino.dto.requests.MetalakeUpdateRequest;
@@ -45,6 +48,9 @@ import org.apache.gravitino.dto.requests.TableUpdateRequest;
 import org.apache.gravitino.dto.requests.TagUpdateRequest;
 import org.apache.gravitino.dto.requests.TopicUpdateRequest;
 import org.apache.gravitino.file.FilesetChange;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.ShellJobTemplate;
+import org.apache.gravitino.job.SparkJobTemplate;
 import org.apache.gravitino.messaging.TopicChange;
 import org.apache.gravitino.model.ModelChange;
 import org.apache.gravitino.model.ModelVersionChange;
@@ -419,4 +425,39 @@ class DTOConverters {
           "Unknown model version change type: " + 
change.getClass().getSimpleName());
     }
   }
+
+  static JobTemplateDTO toJobTemplateDTO(JobTemplate jobTemplate) {
+    switch (jobTemplate.jobType()) {
+      case SHELL:
+        return ShellJobTemplateDTO.builder()
+            .withJobType(jobTemplate.jobType())
+            .withName(jobTemplate.name())
+            .withComment(jobTemplate.comment())
+            .withExecutable(jobTemplate.executable())
+            .withArguments(jobTemplate.arguments())
+            .withEnvironments(jobTemplate.environments())
+            .withCustomFields(jobTemplate.customFields())
+            .withScripts(((ShellJobTemplate) jobTemplate).scripts())
+            .build();
+
+      case SPARK:
+        return SparkJobTemplateDTO.builder()
+            .withJobType(jobTemplate.jobType())
+            .withName(jobTemplate.name())
+            .withComment(jobTemplate.comment())
+            .withExecutable(jobTemplate.executable())
+            .withArguments(jobTemplate.arguments())
+            .withEnvironments(jobTemplate.environments())
+            .withCustomFields(jobTemplate.customFields())
+            .withClassName(((SparkJobTemplate) jobTemplate).className())
+            .withJars(((SparkJobTemplate) jobTemplate).jars())
+            .withFiles(((SparkJobTemplate) jobTemplate).files())
+            .withArchives(((SparkJobTemplate) jobTemplate).archives())
+            .withConfigs(((SparkJobTemplate) jobTemplate).configs())
+            .build();
+
+      default:
+        throw new IllegalArgumentException("Unsupported job type: " + 
jobTemplate.jobType());
+    }
+  }
 }
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
index b7e47d239c..b9d5627e7e 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
@@ -38,6 +38,7 @@ import 
org.apache.gravitino.exceptions.IllegalMetadataObjectException;
 import org.apache.gravitino.exceptions.IllegalPrivilegeException;
 import org.apache.gravitino.exceptions.IllegalRoleException;
 import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
 import org.apache.gravitino.exceptions.MetalakeAlreadyExistsException;
 import org.apache.gravitino.exceptions.MetalakeInUseException;
 import org.apache.gravitino.exceptions.MetalakeNotInUseException;
@@ -46,6 +47,8 @@ import 
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
 import org.apache.gravitino.exceptions.NoSuchGroupException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
 import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
@@ -233,6 +236,15 @@ public class ErrorHandlers {
     return ModelErrorHandler.INSTANCE;
   }
 
+  /**
+   * Creates an error handler specific to job and job template operations.
+   *
+   * @return A Consumer representing the job error handler.
+   */
+  public static Consumer<ErrorResponse> jobErrorHandler() {
+    return JobErrorHandler.INSTANCE;
+  }
+
   private ErrorHandlers() {}
 
   /**
@@ -1067,6 +1079,59 @@ public class ErrorHandlers {
     }
   }
 
+  /** Error handler specific to job and job template operations. */
+  @SuppressWarnings("FormatStringAnnotation")
+  private static class JobErrorHandler extends RestErrorHandler {
+
+    private static final JobErrorHandler INSTANCE = new JobErrorHandler();
+
+    @Override
+    public void accept(ErrorResponse errorResponse) {
+      String errorMsg = formatErrorMessage(errorResponse);
+
+      switch (errorResponse.getCode()) {
+        case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
+          throw new IllegalArgumentException(errorMsg);
+
+        case ErrorConstants.NOT_FOUND_CODE:
+          if 
(errorResponse.getType().equals(NoSuchMetalakeException.class.getSimpleName())) 
{
+            throw new NoSuchMetalakeException(errorMsg);
+          } else if (errorResponse
+              .getType()
+              .equals(NoSuchJobTemplateException.class.getSimpleName())) {
+            throw new NoSuchJobTemplateException(errorMsg);
+          } else if 
(errorResponse.getType().equals(NoSuchJobException.class.getSimpleName())) {
+            throw new NoSuchJobException(errorMsg);
+          } else {
+            throw new NotFoundException(errorMsg);
+          }
+
+        case ErrorConstants.ALREADY_EXISTS_CODE:
+          throw new JobTemplateAlreadyExistsException(errorMsg);
+
+        case ErrorConstants.FORBIDDEN_CODE:
+          throw new ForbiddenException(errorMsg);
+
+        case ErrorConstants.INTERNAL_ERROR_CODE:
+          throw new RuntimeException(errorMsg);
+
+        case ErrorConstants.NOT_IN_USE_CODE:
+          if 
(errorResponse.getType().equals(MetalakeNotInUseException.class.getSimpleName()))
 {
+            throw new MetalakeNotInUseException(errorMsg);
+
+          } else {
+            throw new NotInUseException(errorMsg);
+          }
+
+        case ErrorConstants.IN_USE_CODE:
+          throw new InUseException(errorMsg);
+
+        default:
+          super.accept(errorResponse);
+      }
+    }
+  }
+
   /** Generic error handler for REST requests. */
   private static class RestErrorHandler extends ErrorHandler {
     private static final ErrorHandler INSTANCE = new RestErrorHandler();
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GenericJobHandle.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericJobHandle.java
new file mode 100644
index 0000000000..0c270cc429
--- /dev/null
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericJobHandle.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client;
+
+import org.apache.gravitino.dto.job.JobDTO;
+import org.apache.gravitino.job.JobHandle;
+
+/** Represents a generic job handle. */
+public class GenericJobHandle implements JobHandle {
+
+  private final JobDTO jobDTO;
+
+  GenericJobHandle(JobDTO jobDTO) {
+    this.jobDTO = jobDTO;
+  }
+
+  @Override
+  public String jobTemplateName() {
+    return jobDTO.jobTemplateName();
+  }
+
+  @Override
+  public String jobId() {
+    return jobDTO.jobId();
+  }
+
+  @Override
+  public Status jobStatus() {
+    return jobDTO.status();
+  }
+}
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoClient.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoClient.java
index 316192d7f2..c913ca071c 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoClient.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoClient.java
@@ -41,8 +41,12 @@ import 
org.apache.gravitino.exceptions.GroupAlreadyExistsException;
 import org.apache.gravitino.exceptions.IllegalMetadataObjectException;
 import org.apache.gravitino.exceptions.IllegalPrivilegeException;
 import org.apache.gravitino.exceptions.IllegalRoleException;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchGroupException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.exceptions.NoSuchRoleException;
@@ -53,6 +57,9 @@ import org.apache.gravitino.exceptions.NotFoundException;
 import org.apache.gravitino.exceptions.RoleAlreadyExistsException;
 import org.apache.gravitino.exceptions.TagAlreadyExistsException;
 import org.apache.gravitino.exceptions.UserAlreadyExistsException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.SupportsJobs;
 import org.apache.gravitino.tag.Tag;
 import org.apache.gravitino.tag.TagChange;
 import org.apache.gravitino.tag.TagOperations;
@@ -65,7 +72,7 @@ import org.apache.gravitino.tag.TagOperations;
  * API.
  */
 public class GravitinoClient extends GravitinoClientBase
-    implements SupportsCatalogs, TagOperations {
+    implements SupportsCatalogs, TagOperations, SupportsJobs {
 
   private final GravitinoMetalake metalake;
 
@@ -560,6 +567,53 @@ public class GravitinoClient extends GravitinoClientBase
     return getMetalake().deleteTag(name);
   }
 
+  @Override
+  public List<JobTemplate> listJobTemplates() {
+    return getMetalake().listJobTemplates();
+  }
+
+  @Override
+  public void registerJobTemplate(JobTemplate jobTemplate)
+      throws JobTemplateAlreadyExistsException {
+    getMetalake().registerJobTemplate(jobTemplate);
+  }
+
+  @Override
+  public JobTemplate getJobTemplate(String jobTemplateName) throws 
NoSuchJobTemplateException {
+    return getMetalake().getJobTemplate(jobTemplateName);
+  }
+
+  @Override
+  public boolean deleteJobTemplate(String jobTemplateName) throws 
InUseException {
+    return getMetalake().deleteJobTemplate(jobTemplateName);
+  }
+
+  @Override
+  public List<JobHandle> listJobs(String jobTemplateName) throws 
NoSuchJobTemplateException {
+    return getMetalake().listJobs(jobTemplateName);
+  }
+
+  @Override
+  public List<JobHandle> listJobs() {
+    return getMetalake().listJobs();
+  }
+
+  @Override
+  public JobHandle runJob(String jobTemplateName, Map<String, String> jobConf)
+      throws NoSuchJobTemplateException {
+    return getMetalake().runJob(jobTemplateName, jobConf);
+  }
+
+  @Override
+  public JobHandle getJob(String jobId) throws NoSuchJobException {
+    return getMetalake().getJob(jobId);
+  }
+
+  @Override
+  public JobHandle cancelJob(String jobId) throws NoSuchJobException {
+    return getMetalake().cancelJob(jobId);
+  }
+
   /** Builder class for constructing a GravitinoClient. */
   public static class ClientBuilder extends 
GravitinoClientBase.Builder<GravitinoClient> {
 
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoMetalake.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoMetalake.java
index 50d03373a4..efdb3fb627 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoMetalake.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoMetalake.java
@@ -52,6 +52,8 @@ import org.apache.gravitino.dto.requests.CatalogSetRequest;
 import org.apache.gravitino.dto.requests.CatalogUpdateRequest;
 import org.apache.gravitino.dto.requests.CatalogUpdatesRequest;
 import org.apache.gravitino.dto.requests.GroupAddRequest;
+import org.apache.gravitino.dto.requests.JobRunRequest;
+import org.apache.gravitino.dto.requests.JobTemplateRegisterRequest;
 import org.apache.gravitino.dto.requests.OwnerSetRequest;
 import org.apache.gravitino.dto.requests.PrivilegeGrantRequest;
 import org.apache.gravitino.dto.requests.PrivilegeRevokeRequest;
@@ -62,6 +64,7 @@ import org.apache.gravitino.dto.requests.TagCreateRequest;
 import org.apache.gravitino.dto.requests.TagUpdateRequest;
 import org.apache.gravitino.dto.requests.TagUpdatesRequest;
 import org.apache.gravitino.dto.requests.UserAddRequest;
+import org.apache.gravitino.dto.responses.BaseResponse;
 import org.apache.gravitino.dto.responses.CatalogListResponse;
 import org.apache.gravitino.dto.responses.CatalogResponse;
 import org.apache.gravitino.dto.responses.DeleteResponse;
@@ -70,6 +73,10 @@ import org.apache.gravitino.dto.responses.EntityListResponse;
 import org.apache.gravitino.dto.responses.ErrorResponse;
 import org.apache.gravitino.dto.responses.GroupListResponse;
 import org.apache.gravitino.dto.responses.GroupResponse;
+import org.apache.gravitino.dto.responses.JobListResponse;
+import org.apache.gravitino.dto.responses.JobResponse;
+import org.apache.gravitino.dto.responses.JobTemplateListResponse;
+import org.apache.gravitino.dto.responses.JobTemplateResponse;
 import org.apache.gravitino.dto.responses.NameListResponse;
 import org.apache.gravitino.dto.responses.OwnerResponse;
 import org.apache.gravitino.dto.responses.RemoveResponse;
@@ -85,8 +92,12 @@ import 
org.apache.gravitino.exceptions.GroupAlreadyExistsException;
 import org.apache.gravitino.exceptions.IllegalMetadataObjectException;
 import org.apache.gravitino.exceptions.IllegalPrivilegeException;
 import org.apache.gravitino.exceptions.IllegalRoleException;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchGroupException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.exceptions.NoSuchRoleException;
@@ -97,6 +108,9 @@ import org.apache.gravitino.exceptions.NotFoundException;
 import org.apache.gravitino.exceptions.RoleAlreadyExistsException;
 import org.apache.gravitino.exceptions.TagAlreadyExistsException;
 import org.apache.gravitino.exceptions.UserAlreadyExistsException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.SupportsJobs;
 import org.apache.gravitino.rest.RESTUtils;
 import org.apache.gravitino.tag.Tag;
 import org.apache.gravitino.tag.TagChange;
@@ -108,15 +122,16 @@ import org.apache.gravitino.tag.TagOperations;
  * create, load, alter and drop a catalog with specified identifier.
  */
 public class GravitinoMetalake extends MetalakeDTO
-    implements SupportsCatalogs, TagOperations, SupportsRoles {
+    implements SupportsCatalogs, TagOperations, SupportsRoles, SupportsJobs {
   private static final String API_METALAKES_CATALOGS_PATH = 
"api/metalakes/%s/catalogs/%s";
   private static final String API_PERMISSION_PATH = 
"api/metalakes/%s/permissions/%s";
   private static final String API_METALAKES_USERS_PATH = 
"api/metalakes/%s/users/%s";
   private static final String API_METALAKES_GROUPS_PATH = 
"api/metalakes/%s/groups/%s";
   private static final String API_METALAKES_ROLES_PATH = 
"api/metalakes/%s/roles/%s";
   private static final String API_METALAKES_OWNERS_PATH = 
"api/metalakes/%s/owners/%s";
-
   private static final String API_METALAKES_TAGS_PATH = 
"api/metalakes/%s/tags";
+  private static final String API_METALAKES_JOB_TEMPLATES_PATH = 
"api/metalakes/%s/jobs/templates";
+  private static final String API_METALAKES_JOB_PATH = 
"api/metalakes/%s/jobs/runs";
   private static final String BLANK_PLACEHOLDER = "";
 
   private final RESTClient restClient;
@@ -1189,6 +1204,160 @@ public class GravitinoMetalake extends MetalakeDTO
     return metadataObjectRoleOperations.listBindingRoleNames();
   }
 
+  @Override
+  public List<JobTemplate> listJobTemplates() {
+    JobTemplateListResponse resp =
+        restClient.get(
+            String.format(API_METALAKES_JOB_TEMPLATES_PATH, 
RESTUtils.encodeString(this.name())),
+            ImmutableMap.of("details", "true"),
+            JobTemplateListResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.jobErrorHandler());
+    resp.validate();
+
+    return resp.getJobTemplates().stream()
+        .map(org.apache.gravitino.dto.util.DTOConverters::fromDTO)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void registerJobTemplate(JobTemplate jobTemplate)
+      throws JobTemplateAlreadyExistsException {
+    JobTemplateRegisterRequest req =
+        new 
JobTemplateRegisterRequest(DTOConverters.toJobTemplateDTO(jobTemplate));
+
+    BaseResponse resp =
+        restClient.post(
+            String.format(API_METALAKES_JOB_TEMPLATES_PATH, 
RESTUtils.encodeString(this.name())),
+            req,
+            BaseResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.jobErrorHandler());
+    resp.validate();
+  }
+
+  @Override
+  public JobTemplate getJobTemplate(String jobTemplateName) throws 
NoSuchJobTemplateException {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(jobTemplateName), "job template name must not 
be null or empty");
+
+    JobTemplateResponse resp =
+        restClient.get(
+            String.format(API_METALAKES_JOB_TEMPLATES_PATH, 
RESTUtils.encodeString(this.name()))
+                + "/"
+                + RESTUtils.encodeString(jobTemplateName),
+            JobTemplateResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.jobErrorHandler());
+    resp.validate();
+
+    return 
org.apache.gravitino.dto.util.DTOConverters.fromDTO(resp.getJobTemplate());
+  }
+
+  @Override
+  public boolean deleteJobTemplate(String jobTemplateName) throws 
InUseException {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(jobTemplateName), "job template name must not 
be null or empty");
+
+    DropResponse resp =
+        restClient.delete(
+            String.format(API_METALAKES_JOB_TEMPLATES_PATH, 
RESTUtils.encodeString(this.name()))
+                + "/"
+                + RESTUtils.encodeString(jobTemplateName),
+            DropResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.jobErrorHandler());
+    resp.validate();
+
+    return resp.dropped();
+  }
+
+  @Override
+  public List<JobHandle> listJobs(String jobTemplateName) throws 
NoSuchJobTemplateException {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(jobTemplateName), "job template name must not 
be null or empty");
+
+    JobListResponse resp =
+        restClient.get(
+            String.format(API_METALAKES_JOB_PATH, 
RESTUtils.encodeString(this.name())),
+            ImmutableMap.of("jobTemplateName", 
RESTUtils.encodeString(jobTemplateName)),
+            JobListResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.jobErrorHandler());
+    resp.validate();
+
+    return 
resp.getJobs().stream().map(GenericJobHandle::new).collect(Collectors.toList());
+  }
+
+  @Override
+  public List<JobHandle> listJobs() {
+    JobListResponse resp =
+        restClient.get(
+            String.format(API_METALAKES_JOB_PATH, 
RESTUtils.encodeString(this.name())),
+            Collections.emptyMap(),
+            JobListResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.jobErrorHandler());
+    resp.validate();
+
+    return 
resp.getJobs().stream().map(GenericJobHandle::new).collect(Collectors.toList());
+  }
+
+  @Override
+  public JobHandle runJob(String jobTemplateName, Map<String, String> jobConf)
+      throws NoSuchJobTemplateException {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(jobTemplateName), "job template name must not 
be null or empty");
+
+    JobRunRequest req = new JobRunRequest(jobTemplateName, jobConf);
+
+    JobResponse resp =
+        restClient.post(
+            String.format(API_METALAKES_JOB_PATH, 
RESTUtils.encodeString(this.name())),
+            req,
+            JobResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.jobErrorHandler());
+
+    resp.validate();
+    return new GenericJobHandle(resp.getJob());
+  }
+
+  @Override
+  public JobHandle getJob(String jobId) throws NoSuchJobException {
+    Preconditions.checkArgument(StringUtils.isNotBlank(jobId), "job id must 
not be null or empty");
+
+    JobResponse resp =
+        restClient.get(
+            String.format(API_METALAKES_JOB_PATH, 
RESTUtils.encodeString(this.name()))
+                + "/"
+                + RESTUtils.encodeString(jobId),
+            JobResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.jobErrorHandler());
+    resp.validate();
+
+    return new GenericJobHandle(resp.getJob());
+  }
+
+  @Override
+  public JobHandle cancelJob(String jobId) throws NoSuchJobException {
+    Preconditions.checkArgument(StringUtils.isNotBlank(jobId), "job id must 
not be null or empty");
+
+    JobResponse resp =
+        restClient.post(
+            String.format(API_METALAKES_JOB_PATH, 
RESTUtils.encodeString(this.name()))
+                + "/"
+                + RESTUtils.encodeString(jobId),
+            null,
+            JobResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.jobErrorHandler());
+    resp.validate();
+
+    return new GenericJobHandle(resp.getJob());
+  }
+
   static class Builder extends MetalakeDTO.Builder<Builder> {
     private RESTClient restClient;
 
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportsJobs.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportsJobs.java
new file mode 100644
index 0000000000..f51874f2f8
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportsJobs.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.dto.job.JobDTO;
+import org.apache.gravitino.dto.job.JobTemplateDTO;
+import org.apache.gravitino.dto.job.ShellJobTemplateDTO;
+import org.apache.gravitino.dto.job.SparkJobTemplateDTO;
+import org.apache.gravitino.dto.requests.JobRunRequest;
+import org.apache.gravitino.dto.requests.JobTemplateRegisterRequest;
+import org.apache.gravitino.dto.responses.BaseResponse;
+import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.ErrorResponse;
+import org.apache.gravitino.dto.responses.JobListResponse;
+import org.apache.gravitino.dto.responses.JobResponse;
+import org.apache.gravitino.dto.responses.JobTemplateListResponse;
+import org.apache.gravitino.dto.responses.JobTemplateResponse;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.MetalakeNotInUseException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Method;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class TestSupportsJobs extends TestBase {
+
+  private static final String METALAKE_FOR_JOB_TEST = "metalake_for_job_test";
+
+  private static GravitinoMetalake metalake;
+
+  @BeforeAll
+  public static void setUp() throws Exception {
+    TestBase.setUp();
+    metalake = TestGravitinoMetalake.createMetalake(client, 
METALAKE_FOR_JOB_TEST);
+  }
+
+  @Test
+  public void testListJobTemplates() throws JsonProcessingException {
+    JobTemplateDTO template1 = newShellJobTemplateDTO("shell-job-template");
+    JobTemplateDTO template2 = newSparkJobTemplateDTO("spark-job-template");
+    List<JobTemplateDTO> templates = Lists.newArrayList(template1, template2);
+    JobTemplateListResponse resp = new JobTemplateListResponse(templates);
+
+    buildMockResource(Method.GET, jobTemplatesPath(), null, resp, 
HttpStatus.SC_OK);
+
+    List<JobTemplate> jobTemplates = metalake.listJobTemplates();
+    Assertions.assertEquals(2, jobTemplates.size());
+
+    List<JobTemplate> expected =
+        templates.stream()
+            .map(org.apache.gravitino.dto.util.DTOConverters::fromDTO)
+            .collect(Collectors.toList());
+
+    Assertions.assertEquals(expected, jobTemplates);
+
+    // Test throw NoSuchMetalakeException
+    ErrorResponse errorResp =
+        ErrorResponse.notFound(NoSuchMetalakeException.class.getSimpleName(), 
"mock error");
+    buildMockResource(Method.GET, jobTemplatesPath(), null, errorResp, 
HttpStatus.SC_NOT_FOUND);
+    Assertions.assertThrows(NoSuchMetalakeException.class, () -> 
metalake.listJobTemplates());
+
+    // Test throw MetalakeNotInUseException
+    ErrorResponse errorResp2 =
+        ErrorResponse.notInUse(
+            MetalakeNotInUseException.class.getSimpleName(),
+            "mock error",
+            new MetalakeNotInUseException("mock error"));
+    buildMockResource(Method.GET, jobTemplatesPath(), null, errorResp2, 
HttpStatus.SC_CONFLICT);
+    Assertions.assertThrows(MetalakeNotInUseException.class, () -> 
metalake.listJobTemplates());
+
+    // Test throw RuntimeException
+    ErrorResponse errorResp3 = ErrorResponse.internalError("mock error");
+    buildMockResource(
+        Method.GET, jobTemplatesPath(), null, errorResp3, 
HttpStatus.SC_INTERNAL_SERVER_ERROR);
+    Assertions.assertThrows(RuntimeException.class, () -> 
metalake.listJobTemplates());
+  }
+
+  @Test
+  public void testRegisterJobTemplate() throws JsonProcessingException {
+    JobTemplateDTO templateDTO = newShellJobTemplateDTO("shell-job-template");
+    JobTemplate template = 
org.apache.gravitino.dto.util.DTOConverters.fromDTO(templateDTO);
+    JobTemplateRegisterRequest req = new 
JobTemplateRegisterRequest(templateDTO);
+
+    BaseResponse resp = new BaseResponse();
+    buildMockResource(Method.POST, jobTemplatesPath(), req, resp, 
HttpStatus.SC_OK);
+
+    Assertions.assertDoesNotThrow(() -> 
metalake.registerJobTemplate(template));
+
+    // Test throw JobTemplateAlreadyExistsException
+    ErrorResponse errorResp =
+        ErrorResponse.alreadyExists(
+            JobTemplateAlreadyExistsException.class.getSimpleName(), "mock 
error");
+    buildMockResource(Method.POST, jobTemplatesPath(), req, errorResp, 
HttpStatus.SC_CONFLICT);
+
+    Assertions.assertThrows(
+        JobTemplateAlreadyExistsException.class, () -> 
metalake.registerJobTemplate(template));
+  }
+
+  @Test
+  public void testGetJobTemplate() throws JsonProcessingException {
+    String jobTemplateName = "shell-job-template";
+    JobTemplateDTO templateDTO = newShellJobTemplateDTO(jobTemplateName);
+    JobTemplate expected = 
org.apache.gravitino.dto.util.DTOConverters.fromDTO(templateDTO);
+    JobTemplateResponse resp = new JobTemplateResponse(templateDTO);
+
+    buildMockResource(
+        Method.GET, jobTemplatesPath() + "/" + jobTemplateName, null, resp, 
HttpStatus.SC_OK);
+
+    JobTemplate actual = metalake.getJobTemplate(jobTemplateName);
+    Assertions.assertEquals(expected, actual);
+
+    // Test throw NoSuchJobTemplateException
+    ErrorResponse errorResp =
+        
ErrorResponse.notFound(NoSuchJobTemplateException.class.getSimpleName(), "mock 
error");
+    buildMockResource(
+        Method.GET,
+        jobTemplatesPath() + "/" + jobTemplateName,
+        null,
+        errorResp,
+        HttpStatus.SC_NOT_FOUND);
+    Assertions.assertThrows(
+        NoSuchJobTemplateException.class, () -> 
metalake.getJobTemplate(jobTemplateName));
+  }
+
+  @Test
+  public void testDeleteJobTemplate() throws JsonProcessingException {
+    String jobTemplateName = "shell-job-template";
+    DropResponse resp = new DropResponse(true);
+
+    buildMockResource(
+        Method.DELETE, jobTemplatesPath() + "/" + jobTemplateName, null, resp, 
HttpStatus.SC_OK);
+
+    Assertions.assertTrue(metalake.deleteJobTemplate(jobTemplateName));
+
+    // Test throw InUseException
+    ErrorResponse errorResp2 =
+        ErrorResponse.inUse(
+            InUseException.class.getSimpleName(), "mock error", new 
InUseException("mock error"));
+    buildMockResource(
+        Method.DELETE,
+        jobTemplatesPath() + "/" + jobTemplateName,
+        null,
+        errorResp2,
+        HttpStatus.SC_CONFLICT);
+    Assertions.assertThrows(
+        InUseException.class, () -> 
metalake.deleteJobTemplate(jobTemplateName));
+  }
+
+  @Test
+  public void testListJobs() throws JsonProcessingException {
+    String jobTemplateName = "shell-job-template";
+    String jobId1 = "job-1";
+    String jobId2 = "job-2";
+
+    List<JobDTO> jobs =
+        Lists.newArrayList(newJobDTO(jobId1, jobTemplateName), 
newJobDTO(jobId2, jobTemplateName));
+
+    JobListResponse resp = new JobListResponse(jobs);
+
+    buildMockResource(Method.GET, jobRunsPath(), null, resp, HttpStatus.SC_OK);
+
+    List<JobHandle> actualJobs = metalake.listJobs();
+    Assertions.assertEquals(2, actualJobs.size());
+    compare(jobs.get(0), actualJobs.get(0));
+    compare(jobs.get(1), actualJobs.get(1));
+
+    // Test throw NoSuchJobTemplateException
+    ErrorResponse errorResp =
+        
ErrorResponse.notFound(NoSuchJobTemplateException.class.getSimpleName(), "mock 
error");
+    buildMockResource(Method.GET, jobRunsPath(), null, errorResp, 
HttpStatus.SC_NOT_FOUND);
+    Assertions.assertThrows(
+        NoSuchJobTemplateException.class, () -> 
metalake.listJobs(jobTemplateName));
+
+    // Test list jobs by job template name
+    buildMockResource(
+        Method.GET,
+        jobRunsPath(),
+        ImmutableMap.of("jobTemplateName", jobTemplateName),
+        null,
+        resp,
+        HttpStatus.SC_OK);
+
+    List<JobHandle> jobsByTemplate = metalake.listJobs(jobTemplateName);
+    Assertions.assertEquals(2, jobsByTemplate.size());
+    compare(jobs.get(0), jobsByTemplate.get(0));
+    compare(jobs.get(1), jobsByTemplate.get(1));
+  }
+
+  @Test
+  public void testGetJob() throws JsonProcessingException {
+    String jobId = "job-1";
+    String jobTemplateName = "shell-job-template";
+    JobDTO expectedJob = newJobDTO(jobId, jobTemplateName);
+    JobResponse resp = new JobResponse(expectedJob);
+
+    buildMockResource(Method.GET, jobRunsPath() + "/" + jobId, null, resp, 
HttpStatus.SC_OK);
+
+    JobHandle actualHandle = metalake.getJob(jobId);
+    compare(expectedJob, actualHandle);
+
+    // Test throw NoSuchJobTemplateException
+    ErrorResponse errorResp =
+        ErrorResponse.notFound(NoSuchJobException.class.getSimpleName(), "mock 
error");
+    buildMockResource(
+        Method.GET, jobRunsPath() + "/" + jobId, null, errorResp, 
HttpStatus.SC_NOT_FOUND);
+    Assertions.assertThrows(NoSuchJobException.class, () -> 
metalake.getJob(jobId));
+  }
+
+  @Test
+  public void testRunJob() throws JsonProcessingException {
+    String jobTemplateName = "shell-job-template";
+    String jobId = "job-1";
+    JobDTO expectedJob = newJobDTO(jobId, jobTemplateName);
+    JobResponse resp = new JobResponse(expectedJob);
+    JobRunRequest req = new JobRunRequest(jobTemplateName, ImmutableMap.of());
+
+    buildMockResource(Method.POST, jobRunsPath(), req, resp, HttpStatus.SC_OK);
+
+    JobHandle actualHandle = metalake.runJob(jobTemplateName, 
ImmutableMap.of());
+    compare(expectedJob, actualHandle);
+
+    // Test throw NoSuchJobTemplateException
+    ErrorResponse errorResp =
+        
ErrorResponse.notFound(NoSuchJobTemplateException.class.getSimpleName(), "mock 
error");
+    buildMockResource(Method.POST, jobRunsPath(), req, errorResp, 
HttpStatus.SC_NOT_FOUND);
+    Assertions.assertThrows(
+        NoSuchJobTemplateException.class,
+        () -> metalake.runJob(jobTemplateName, ImmutableMap.of()));
+  }
+
+  @Test
+  public void testCancelJob() throws JsonProcessingException {
+    String jobId = "job-1";
+    String jobTemplateName = "shell-job-template";
+    JobDTO expectedJob = newJobDTO(jobId, jobTemplateName);
+    JobResponse resp = new JobResponse(expectedJob);
+
+    buildMockResource(Method.POST, jobRunsPath() + "/" + jobId, null, resp, 
HttpStatus.SC_OK);
+
+    JobHandle actualHandle = metalake.cancelJob(jobId);
+    compare(expectedJob, actualHandle);
+
+    // Test throw NoSuchJobException
+    ErrorResponse errorResp =
+        ErrorResponse.notFound(NoSuchJobException.class.getSimpleName(), "mock 
error");
+    buildMockResource(
+        Method.POST, jobRunsPath() + "/" + jobId, null, errorResp, 
HttpStatus.SC_NOT_FOUND);
+    Assertions.assertThrows(NoSuchJobException.class, () -> 
metalake.cancelJob(jobId));
+  }
+
+  private void compare(JobDTO expected, JobHandle actual) {
+    Assertions.assertEquals(expected.jobId(), actual.jobId());
+    Assertions.assertEquals(expected.jobTemplateName(), 
actual.jobTemplateName());
+    Assertions.assertEquals(expected.status(), actual.jobStatus());
+  }
+
+  private String jobTemplatesPath() {
+    return "/api/metalakes/" + METALAKE_FOR_JOB_TEST + "/jobs/templates";
+  }
+
+  private String jobRunsPath() {
+    return "/api/metalakes/" + METALAKE_FOR_JOB_TEST + "/jobs/runs";
+  }
+
+  private JobTemplateDTO newShellJobTemplateDTO(String name) {
+    return ShellJobTemplateDTO.builder()
+        .withJobType(JobTemplate.JobType.SHELL)
+        .withName(name)
+        .withComment("This is a shell job template")
+        .withExecutable("/path/to/script.sh")
+        .withArguments(Collections.emptyList())
+        .withEnvironments(Collections.emptyMap())
+        .withCustomFields(Collections.emptyMap())
+        .withScripts(Collections.emptyList())
+        .build();
+  }
+
+  private JobTemplateDTO newSparkJobTemplateDTO(String name) {
+    return SparkJobTemplateDTO.builder()
+        .withJobType(JobTemplate.JobType.SPARK)
+        .withName(name)
+        .withComment("This is a spark job template")
+        .withExecutable("/path/to/spark-job.jar")
+        .withArguments(Collections.emptyList())
+        .withEnvironments(Collections.emptyMap())
+        .withCustomFields(Collections.emptyMap())
+        .withClassName("org.example.SparkJob")
+        .withJars(Collections.emptyList())
+        .withFiles(Collections.emptyList())
+        .withArchives(Collections.emptyList())
+        .withConfigs(Collections.emptyMap())
+        .build();
+  }
+
+  private JobDTO newJobDTO(String jobId, String templateName) {
+    return new JobDTO(
+        jobId,
+        templateName,
+        JobHandle.Status.QUEUED,
+        
AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build());
+  }
+}


Reply via email to