This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 0ac29c0557 [#7759]feat(client-java): Add Java client for Job System
(#7944)
0ac29c0557 is described below
commit 0ac29c055727a60936347aa344faf545c1c8f03d
Author: Jerry Shao <[email protected]>
AuthorDate: Fri Aug 8 16:28:26 2025 +0800
[#7759]feat(client-java): Add Java client for Job System (#7944)
### What changes were proposed in this pull request?
This PR adds the java client support for job system.
### Why are the changes needed?
Fix: #7759
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Add UTs to cover the code.
---
.../org/apache/gravitino/client/DTOConverters.java | 41 +++
.../org/apache/gravitino/client/ErrorHandlers.java | 65 ++++
.../apache/gravitino/client/GenericJobHandle.java | 47 +++
.../apache/gravitino/client/GravitinoClient.java | 56 +++-
.../apache/gravitino/client/GravitinoMetalake.java | 173 ++++++++++-
.../apache/gravitino/client/TestSupportsJobs.java | 333 +++++++++++++++++++++
6 files changed, 712 insertions(+), 3 deletions(-)
diff --git
a/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
b/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
index b33c27a9e6..9721dabba2 100644
---
a/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
+++
b/clients/client-java/src/main/java/org/apache/gravitino/client/DTOConverters.java
@@ -35,6 +35,9 @@ import org.apache.gravitino.dto.CatalogDTO;
import org.apache.gravitino.dto.MetalakeDTO;
import org.apache.gravitino.dto.authorization.PrivilegeDTO;
import org.apache.gravitino.dto.authorization.SecurableObjectDTO;
+import org.apache.gravitino.dto.job.JobTemplateDTO;
+import org.apache.gravitino.dto.job.ShellJobTemplateDTO;
+import org.apache.gravitino.dto.job.SparkJobTemplateDTO;
import org.apache.gravitino.dto.requests.CatalogUpdateRequest;
import org.apache.gravitino.dto.requests.FilesetUpdateRequest;
import org.apache.gravitino.dto.requests.MetalakeUpdateRequest;
@@ -45,6 +48,9 @@ import org.apache.gravitino.dto.requests.TableUpdateRequest;
import org.apache.gravitino.dto.requests.TagUpdateRequest;
import org.apache.gravitino.dto.requests.TopicUpdateRequest;
import org.apache.gravitino.file.FilesetChange;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.ShellJobTemplate;
+import org.apache.gravitino.job.SparkJobTemplate;
import org.apache.gravitino.messaging.TopicChange;
import org.apache.gravitino.model.ModelChange;
import org.apache.gravitino.model.ModelVersionChange;
@@ -419,4 +425,39 @@ class DTOConverters {
"Unknown model version change type: " +
change.getClass().getSimpleName());
}
}
+
+ static JobTemplateDTO toJobTemplateDTO(JobTemplate jobTemplate) {
+ switch (jobTemplate.jobType()) {
+ case SHELL:
+ return ShellJobTemplateDTO.builder()
+ .withJobType(jobTemplate.jobType())
+ .withName(jobTemplate.name())
+ .withComment(jobTemplate.comment())
+ .withExecutable(jobTemplate.executable())
+ .withArguments(jobTemplate.arguments())
+ .withEnvironments(jobTemplate.environments())
+ .withCustomFields(jobTemplate.customFields())
+ .withScripts(((ShellJobTemplate) jobTemplate).scripts())
+ .build();
+
+ case SPARK:
+ return SparkJobTemplateDTO.builder()
+ .withJobType(jobTemplate.jobType())
+ .withName(jobTemplate.name())
+ .withComment(jobTemplate.comment())
+ .withExecutable(jobTemplate.executable())
+ .withArguments(jobTemplate.arguments())
+ .withEnvironments(jobTemplate.environments())
+ .withCustomFields(jobTemplate.customFields())
+ .withClassName(((SparkJobTemplate) jobTemplate).className())
+ .withJars(((SparkJobTemplate) jobTemplate).jars())
+ .withFiles(((SparkJobTemplate) jobTemplate).files())
+ .withArchives(((SparkJobTemplate) jobTemplate).archives())
+ .withConfigs(((SparkJobTemplate) jobTemplate).configs())
+ .build();
+
+ default:
+ throw new IllegalArgumentException("Unsupported job type: " +
jobTemplate.jobType());
+ }
+ }
}
diff --git
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
index b7e47d239c..b9d5627e7e 100644
---
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
+++
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
@@ -38,6 +38,7 @@ import
org.apache.gravitino.exceptions.IllegalMetadataObjectException;
import org.apache.gravitino.exceptions.IllegalPrivilegeException;
import org.apache.gravitino.exceptions.IllegalRoleException;
import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
import org.apache.gravitino.exceptions.MetalakeAlreadyExistsException;
import org.apache.gravitino.exceptions.MetalakeInUseException;
import org.apache.gravitino.exceptions.MetalakeNotInUseException;
@@ -46,6 +47,8 @@ import
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
import org.apache.gravitino.exceptions.NoSuchGroupException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
import org.apache.gravitino.exceptions.NoSuchLocationNameException;
import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
@@ -233,6 +236,15 @@ public class ErrorHandlers {
return ModelErrorHandler.INSTANCE;
}
+ /**
+ * Creates an error handler specific to job and job template operations.
+ *
+ * @return A Consumer representing the job error handler.
+ */
+ public static Consumer<ErrorResponse> jobErrorHandler() {
+ return JobErrorHandler.INSTANCE;
+ }
+
private ErrorHandlers() {}
/**
@@ -1067,6 +1079,59 @@ public class ErrorHandlers {
}
}
+ /** Error handler specific to job and job template operations. */
+ @SuppressWarnings("FormatStringAnnotation")
+ private static class JobErrorHandler extends RestErrorHandler {
+
+ private static final JobErrorHandler INSTANCE = new JobErrorHandler();
+
+ @Override
+ public void accept(ErrorResponse errorResponse) {
+ String errorMsg = formatErrorMessage(errorResponse);
+
+ switch (errorResponse.getCode()) {
+ case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
+ throw new IllegalArgumentException(errorMsg);
+
+ case ErrorConstants.NOT_FOUND_CODE:
+ if
(errorResponse.getType().equals(NoSuchMetalakeException.class.getSimpleName()))
{
+ throw new NoSuchMetalakeException(errorMsg);
+ } else if (errorResponse
+ .getType()
+ .equals(NoSuchJobTemplateException.class.getSimpleName())) {
+ throw new NoSuchJobTemplateException(errorMsg);
+ } else if
(errorResponse.getType().equals(NoSuchJobException.class.getSimpleName())) {
+ throw new NoSuchJobException(errorMsg);
+ } else {
+ throw new NotFoundException(errorMsg);
+ }
+
+ case ErrorConstants.ALREADY_EXISTS_CODE:
+ throw new JobTemplateAlreadyExistsException(errorMsg);
+
+ case ErrorConstants.FORBIDDEN_CODE:
+ throw new ForbiddenException(errorMsg);
+
+ case ErrorConstants.INTERNAL_ERROR_CODE:
+ throw new RuntimeException(errorMsg);
+
+ case ErrorConstants.NOT_IN_USE_CODE:
+ if
(errorResponse.getType().equals(MetalakeNotInUseException.class.getSimpleName()))
{
+ throw new MetalakeNotInUseException(errorMsg);
+
+ } else {
+ throw new NotInUseException(errorMsg);
+ }
+
+ case ErrorConstants.IN_USE_CODE:
+ throw new InUseException(errorMsg);
+
+ default:
+ super.accept(errorResponse);
+ }
+ }
+ }
+
/** Generic error handler for REST requests. */
private static class RestErrorHandler extends ErrorHandler {
private static final ErrorHandler INSTANCE = new RestErrorHandler();
diff --git
a/clients/client-java/src/main/java/org/apache/gravitino/client/GenericJobHandle.java
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericJobHandle.java
new file mode 100644
index 0000000000..0c270cc429
--- /dev/null
+++
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericJobHandle.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client;
+
+import org.apache.gravitino.dto.job.JobDTO;
+import org.apache.gravitino.job.JobHandle;
+
+/** Represents a generic job handle. */
+public class GenericJobHandle implements JobHandle {
+
+ private final JobDTO jobDTO;
+
+ GenericJobHandle(JobDTO jobDTO) {
+ this.jobDTO = jobDTO;
+ }
+
+ @Override
+ public String jobTemplateName() {
+ return jobDTO.jobTemplateName();
+ }
+
+ @Override
+ public String jobId() {
+ return jobDTO.jobId();
+ }
+
+ @Override
+ public Status jobStatus() {
+ return jobDTO.status();
+ }
+}
diff --git
a/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoClient.java
b/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoClient.java
index 316192d7f2..c913ca071c 100644
---
a/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoClient.java
+++
b/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoClient.java
@@ -41,8 +41,12 @@ import
org.apache.gravitino.exceptions.GroupAlreadyExistsException;
import org.apache.gravitino.exceptions.IllegalMetadataObjectException;
import org.apache.gravitino.exceptions.IllegalPrivilegeException;
import org.apache.gravitino.exceptions.IllegalRoleException;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchGroupException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
import org.apache.gravitino.exceptions.NoSuchRoleException;
@@ -53,6 +57,9 @@ import org.apache.gravitino.exceptions.NotFoundException;
import org.apache.gravitino.exceptions.RoleAlreadyExistsException;
import org.apache.gravitino.exceptions.TagAlreadyExistsException;
import org.apache.gravitino.exceptions.UserAlreadyExistsException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.SupportsJobs;
import org.apache.gravitino.tag.Tag;
import org.apache.gravitino.tag.TagChange;
import org.apache.gravitino.tag.TagOperations;
@@ -65,7 +72,7 @@ import org.apache.gravitino.tag.TagOperations;
* API.
*/
public class GravitinoClient extends GravitinoClientBase
- implements SupportsCatalogs, TagOperations {
+ implements SupportsCatalogs, TagOperations, SupportsJobs {
private final GravitinoMetalake metalake;
@@ -560,6 +567,53 @@ public class GravitinoClient extends GravitinoClientBase
return getMetalake().deleteTag(name);
}
+ @Override
+ public List<JobTemplate> listJobTemplates() {
+ return getMetalake().listJobTemplates();
+ }
+
+ @Override
+ public void registerJobTemplate(JobTemplate jobTemplate)
+ throws JobTemplateAlreadyExistsException {
+ getMetalake().registerJobTemplate(jobTemplate);
+ }
+
+ @Override
+ public JobTemplate getJobTemplate(String jobTemplateName) throws
NoSuchJobTemplateException {
+ return getMetalake().getJobTemplate(jobTemplateName);
+ }
+
+ @Override
+ public boolean deleteJobTemplate(String jobTemplateName) throws
InUseException {
+ return getMetalake().deleteJobTemplate(jobTemplateName);
+ }
+
+ @Override
+ public List<JobHandle> listJobs(String jobTemplateName) throws
NoSuchJobTemplateException {
+ return getMetalake().listJobs(jobTemplateName);
+ }
+
+ @Override
+ public List<JobHandle> listJobs() {
+ return getMetalake().listJobs();
+ }
+
+ @Override
+ public JobHandle runJob(String jobTemplateName, Map<String, String> jobConf)
+ throws NoSuchJobTemplateException {
+ return getMetalake().runJob(jobTemplateName, jobConf);
+ }
+
+ @Override
+ public JobHandle getJob(String jobId) throws NoSuchJobException {
+ return getMetalake().getJob(jobId);
+ }
+
+ @Override
+ public JobHandle cancelJob(String jobId) throws NoSuchJobException {
+ return getMetalake().cancelJob(jobId);
+ }
+
/** Builder class for constructing a GravitinoClient. */
public static class ClientBuilder extends
GravitinoClientBase.Builder<GravitinoClient> {
diff --git
a/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoMetalake.java
b/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoMetalake.java
index 50d03373a4..efdb3fb627 100644
---
a/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoMetalake.java
+++
b/clients/client-java/src/main/java/org/apache/gravitino/client/GravitinoMetalake.java
@@ -52,6 +52,8 @@ import org.apache.gravitino.dto.requests.CatalogSetRequest;
import org.apache.gravitino.dto.requests.CatalogUpdateRequest;
import org.apache.gravitino.dto.requests.CatalogUpdatesRequest;
import org.apache.gravitino.dto.requests.GroupAddRequest;
+import org.apache.gravitino.dto.requests.JobRunRequest;
+import org.apache.gravitino.dto.requests.JobTemplateRegisterRequest;
import org.apache.gravitino.dto.requests.OwnerSetRequest;
import org.apache.gravitino.dto.requests.PrivilegeGrantRequest;
import org.apache.gravitino.dto.requests.PrivilegeRevokeRequest;
@@ -62,6 +64,7 @@ import org.apache.gravitino.dto.requests.TagCreateRequest;
import org.apache.gravitino.dto.requests.TagUpdateRequest;
import org.apache.gravitino.dto.requests.TagUpdatesRequest;
import org.apache.gravitino.dto.requests.UserAddRequest;
+import org.apache.gravitino.dto.responses.BaseResponse;
import org.apache.gravitino.dto.responses.CatalogListResponse;
import org.apache.gravitino.dto.responses.CatalogResponse;
import org.apache.gravitino.dto.responses.DeleteResponse;
@@ -70,6 +73,10 @@ import org.apache.gravitino.dto.responses.EntityListResponse;
import org.apache.gravitino.dto.responses.ErrorResponse;
import org.apache.gravitino.dto.responses.GroupListResponse;
import org.apache.gravitino.dto.responses.GroupResponse;
+import org.apache.gravitino.dto.responses.JobListResponse;
+import org.apache.gravitino.dto.responses.JobResponse;
+import org.apache.gravitino.dto.responses.JobTemplateListResponse;
+import org.apache.gravitino.dto.responses.JobTemplateResponse;
import org.apache.gravitino.dto.responses.NameListResponse;
import org.apache.gravitino.dto.responses.OwnerResponse;
import org.apache.gravitino.dto.responses.RemoveResponse;
@@ -85,8 +92,12 @@ import
org.apache.gravitino.exceptions.GroupAlreadyExistsException;
import org.apache.gravitino.exceptions.IllegalMetadataObjectException;
import org.apache.gravitino.exceptions.IllegalPrivilegeException;
import org.apache.gravitino.exceptions.IllegalRoleException;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchGroupException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
import org.apache.gravitino.exceptions.NoSuchRoleException;
@@ -97,6 +108,9 @@ import org.apache.gravitino.exceptions.NotFoundException;
import org.apache.gravitino.exceptions.RoleAlreadyExistsException;
import org.apache.gravitino.exceptions.TagAlreadyExistsException;
import org.apache.gravitino.exceptions.UserAlreadyExistsException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.SupportsJobs;
import org.apache.gravitino.rest.RESTUtils;
import org.apache.gravitino.tag.Tag;
import org.apache.gravitino.tag.TagChange;
@@ -108,15 +122,16 @@ import org.apache.gravitino.tag.TagOperations;
* create, load, alter and drop a catalog with specified identifier.
*/
public class GravitinoMetalake extends MetalakeDTO
- implements SupportsCatalogs, TagOperations, SupportsRoles {
+ implements SupportsCatalogs, TagOperations, SupportsRoles, SupportsJobs {
private static final String API_METALAKES_CATALOGS_PATH =
"api/metalakes/%s/catalogs/%s";
private static final String API_PERMISSION_PATH =
"api/metalakes/%s/permissions/%s";
private static final String API_METALAKES_USERS_PATH =
"api/metalakes/%s/users/%s";
private static final String API_METALAKES_GROUPS_PATH =
"api/metalakes/%s/groups/%s";
private static final String API_METALAKES_ROLES_PATH =
"api/metalakes/%s/roles/%s";
private static final String API_METALAKES_OWNERS_PATH =
"api/metalakes/%s/owners/%s";
-
private static final String API_METALAKES_TAGS_PATH =
"api/metalakes/%s/tags";
+ private static final String API_METALAKES_JOB_TEMPLATES_PATH =
"api/metalakes/%s/jobs/templates";
+ private static final String API_METALAKES_JOB_PATH =
"api/metalakes/%s/jobs/runs";
private static final String BLANK_PLACEHOLDER = "";
private final RESTClient restClient;
@@ -1189,6 +1204,160 @@ public class GravitinoMetalake extends MetalakeDTO
return metadataObjectRoleOperations.listBindingRoleNames();
}
+ @Override
+ public List<JobTemplate> listJobTemplates() {
+ JobTemplateListResponse resp =
+ restClient.get(
+ String.format(API_METALAKES_JOB_TEMPLATES_PATH,
RESTUtils.encodeString(this.name())),
+ ImmutableMap.of("details", "true"),
+ JobTemplateListResponse.class,
+ Collections.emptyMap(),
+ ErrorHandlers.jobErrorHandler());
+ resp.validate();
+
+ return resp.getJobTemplates().stream()
+ .map(org.apache.gravitino.dto.util.DTOConverters::fromDTO)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void registerJobTemplate(JobTemplate jobTemplate)
+ throws JobTemplateAlreadyExistsException {
+ JobTemplateRegisterRequest req =
+ new
JobTemplateRegisterRequest(DTOConverters.toJobTemplateDTO(jobTemplate));
+
+ BaseResponse resp =
+ restClient.post(
+ String.format(API_METALAKES_JOB_TEMPLATES_PATH,
RESTUtils.encodeString(this.name())),
+ req,
+ BaseResponse.class,
+ Collections.emptyMap(),
+ ErrorHandlers.jobErrorHandler());
+ resp.validate();
+ }
+
+ @Override
+ public JobTemplate getJobTemplate(String jobTemplateName) throws
NoSuchJobTemplateException {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(jobTemplateName), "job template name must not
be null or empty");
+
+ JobTemplateResponse resp =
+ restClient.get(
+ String.format(API_METALAKES_JOB_TEMPLATES_PATH,
RESTUtils.encodeString(this.name()))
+ + "/"
+ + RESTUtils.encodeString(jobTemplateName),
+ JobTemplateResponse.class,
+ Collections.emptyMap(),
+ ErrorHandlers.jobErrorHandler());
+ resp.validate();
+
+ return
org.apache.gravitino.dto.util.DTOConverters.fromDTO(resp.getJobTemplate());
+ }
+
+ @Override
+ public boolean deleteJobTemplate(String jobTemplateName) throws
InUseException {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(jobTemplateName), "job template name must not
be null or empty");
+
+ DropResponse resp =
+ restClient.delete(
+ String.format(API_METALAKES_JOB_TEMPLATES_PATH,
RESTUtils.encodeString(this.name()))
+ + "/"
+ + RESTUtils.encodeString(jobTemplateName),
+ DropResponse.class,
+ Collections.emptyMap(),
+ ErrorHandlers.jobErrorHandler());
+ resp.validate();
+
+ return resp.dropped();
+ }
+
+ @Override
+ public List<JobHandle> listJobs(String jobTemplateName) throws
NoSuchJobTemplateException {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(jobTemplateName), "job template name must not
be null or empty");
+
+ JobListResponse resp =
+ restClient.get(
+ String.format(API_METALAKES_JOB_PATH,
RESTUtils.encodeString(this.name())),
+ ImmutableMap.of("jobTemplateName",
RESTUtils.encodeString(jobTemplateName)),
+ JobListResponse.class,
+ Collections.emptyMap(),
+ ErrorHandlers.jobErrorHandler());
+ resp.validate();
+
+ return
resp.getJobs().stream().map(GenericJobHandle::new).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<JobHandle> listJobs() {
+ JobListResponse resp =
+ restClient.get(
+ String.format(API_METALAKES_JOB_PATH,
RESTUtils.encodeString(this.name())),
+ Collections.emptyMap(),
+ JobListResponse.class,
+ Collections.emptyMap(),
+ ErrorHandlers.jobErrorHandler());
+ resp.validate();
+
+ return
resp.getJobs().stream().map(GenericJobHandle::new).collect(Collectors.toList());
+ }
+
+ @Override
+ public JobHandle runJob(String jobTemplateName, Map<String, String> jobConf)
+ throws NoSuchJobTemplateException {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(jobTemplateName), "job template name must not
be null or empty");
+
+ JobRunRequest req = new JobRunRequest(jobTemplateName, jobConf);
+
+ JobResponse resp =
+ restClient.post(
+ String.format(API_METALAKES_JOB_PATH,
RESTUtils.encodeString(this.name())),
+ req,
+ JobResponse.class,
+ Collections.emptyMap(),
+ ErrorHandlers.jobErrorHandler());
+
+ resp.validate();
+ return new GenericJobHandle(resp.getJob());
+ }
+
+ @Override
+ public JobHandle getJob(String jobId) throws NoSuchJobException {
+ Preconditions.checkArgument(StringUtils.isNotBlank(jobId), "job id must
not be null or empty");
+
+ JobResponse resp =
+ restClient.get(
+ String.format(API_METALAKES_JOB_PATH,
RESTUtils.encodeString(this.name()))
+ + "/"
+ + RESTUtils.encodeString(jobId),
+ JobResponse.class,
+ Collections.emptyMap(),
+ ErrorHandlers.jobErrorHandler());
+ resp.validate();
+
+ return new GenericJobHandle(resp.getJob());
+ }
+
+ @Override
+ public JobHandle cancelJob(String jobId) throws NoSuchJobException {
+ Preconditions.checkArgument(StringUtils.isNotBlank(jobId), "job id must
not be null or empty");
+
+ JobResponse resp =
+ restClient.post(
+ String.format(API_METALAKES_JOB_PATH,
RESTUtils.encodeString(this.name()))
+ + "/"
+ + RESTUtils.encodeString(jobId),
+ null,
+ JobResponse.class,
+ Collections.emptyMap(),
+ ErrorHandlers.jobErrorHandler());
+ resp.validate();
+
+ return new GenericJobHandle(resp.getJob());
+ }
+
static class Builder extends MetalakeDTO.Builder<Builder> {
private RESTClient restClient;
diff --git
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportsJobs.java
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportsJobs.java
new file mode 100644
index 0000000000..f51874f2f8
--- /dev/null
+++
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportsJobs.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.dto.job.JobDTO;
+import org.apache.gravitino.dto.job.JobTemplateDTO;
+import org.apache.gravitino.dto.job.ShellJobTemplateDTO;
+import org.apache.gravitino.dto.job.SparkJobTemplateDTO;
+import org.apache.gravitino.dto.requests.JobRunRequest;
+import org.apache.gravitino.dto.requests.JobTemplateRegisterRequest;
+import org.apache.gravitino.dto.responses.BaseResponse;
+import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.ErrorResponse;
+import org.apache.gravitino.dto.responses.JobListResponse;
+import org.apache.gravitino.dto.responses.JobResponse;
+import org.apache.gravitino.dto.responses.JobTemplateListResponse;
+import org.apache.gravitino.dto.responses.JobTemplateResponse;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.MetalakeNotInUseException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Method;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class TestSupportsJobs extends TestBase {
+
+ private static final String METALAKE_FOR_JOB_TEST = "metalake_for_job_test";
+
+ private static GravitinoMetalake metalake;
+
+ @BeforeAll
+ public static void setUp() throws Exception {
+ TestBase.setUp();
+ metalake = TestGravitinoMetalake.createMetalake(client,
METALAKE_FOR_JOB_TEST);
+ }
+
+ @Test
+ public void testListJobTemplates() throws JsonProcessingException {
+ JobTemplateDTO template1 = newShellJobTemplateDTO("shell-job-template");
+ JobTemplateDTO template2 = newSparkJobTemplateDTO("spark-job-template");
+ List<JobTemplateDTO> templates = Lists.newArrayList(template1, template2);
+ JobTemplateListResponse resp = new JobTemplateListResponse(templates);
+
+ buildMockResource(Method.GET, jobTemplatesPath(), null, resp,
HttpStatus.SC_OK);
+
+ List<JobTemplate> jobTemplates = metalake.listJobTemplates();
+ Assertions.assertEquals(2, jobTemplates.size());
+
+ List<JobTemplate> expected =
+ templates.stream()
+ .map(org.apache.gravitino.dto.util.DTOConverters::fromDTO)
+ .collect(Collectors.toList());
+
+ Assertions.assertEquals(expected, jobTemplates);
+
+ // Test throw NoSuchMetalakeException
+ ErrorResponse errorResp =
+ ErrorResponse.notFound(NoSuchMetalakeException.class.getSimpleName(),
"mock error");
+ buildMockResource(Method.GET, jobTemplatesPath(), null, errorResp,
HttpStatus.SC_NOT_FOUND);
+ Assertions.assertThrows(NoSuchMetalakeException.class, () ->
metalake.listJobTemplates());
+
+ // Test throw MetalakeNotInUseException
+ ErrorResponse errorResp2 =
+ ErrorResponse.notInUse(
+ MetalakeNotInUseException.class.getSimpleName(),
+ "mock error",
+ new MetalakeNotInUseException("mock error"));
+ buildMockResource(Method.GET, jobTemplatesPath(), null, errorResp2,
HttpStatus.SC_CONFLICT);
+ Assertions.assertThrows(MetalakeNotInUseException.class, () ->
metalake.listJobTemplates());
+
+ // Test throw RuntimeException
+ ErrorResponse errorResp3 = ErrorResponse.internalError("mock error");
+ buildMockResource(
+ Method.GET, jobTemplatesPath(), null, errorResp3,
HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ Assertions.assertThrows(RuntimeException.class, () ->
metalake.listJobTemplates());
+ }
+
+ @Test
+ public void testRegisterJobTemplate() throws JsonProcessingException {
+ JobTemplateDTO templateDTO = newShellJobTemplateDTO("shell-job-template");
+ JobTemplate template =
org.apache.gravitino.dto.util.DTOConverters.fromDTO(templateDTO);
+ JobTemplateRegisterRequest req = new
JobTemplateRegisterRequest(templateDTO);
+
+ BaseResponse resp = new BaseResponse();
+ buildMockResource(Method.POST, jobTemplatesPath(), req, resp,
HttpStatus.SC_OK);
+
+ Assertions.assertDoesNotThrow(() ->
metalake.registerJobTemplate(template));
+
+ // Test throw JobTemplateAlreadyExistsException
+ ErrorResponse errorResp =
+ ErrorResponse.alreadyExists(
+ JobTemplateAlreadyExistsException.class.getSimpleName(), "mock
error");
+ buildMockResource(Method.POST, jobTemplatesPath(), req, errorResp,
HttpStatus.SC_CONFLICT);
+
+ Assertions.assertThrows(
+ JobTemplateAlreadyExistsException.class, () ->
metalake.registerJobTemplate(template));
+ }
+
+ @Test
+ public void testGetJobTemplate() throws JsonProcessingException {
+ String jobTemplateName = "shell-job-template";
+ JobTemplateDTO templateDTO = newShellJobTemplateDTO(jobTemplateName);
+ JobTemplate expected =
org.apache.gravitino.dto.util.DTOConverters.fromDTO(templateDTO);
+ JobTemplateResponse resp = new JobTemplateResponse(templateDTO);
+
+ buildMockResource(
+ Method.GET, jobTemplatesPath() + "/" + jobTemplateName, null, resp,
HttpStatus.SC_OK);
+
+ JobTemplate actual = metalake.getJobTemplate(jobTemplateName);
+ Assertions.assertEquals(expected, actual);
+
+ // Test throw NoSuchJobTemplateException
+ ErrorResponse errorResp =
+
ErrorResponse.notFound(NoSuchJobTemplateException.class.getSimpleName(), "mock
error");
+ buildMockResource(
+ Method.GET,
+ jobTemplatesPath() + "/" + jobTemplateName,
+ null,
+ errorResp,
+ HttpStatus.SC_NOT_FOUND);
+ Assertions.assertThrows(
+ NoSuchJobTemplateException.class, () ->
metalake.getJobTemplate(jobTemplateName));
+ }
+
+ @Test
+ public void testDeleteJobTemplate() throws JsonProcessingException {
+ String jobTemplateName = "shell-job-template";
+ DropResponse resp = new DropResponse(true);
+
+ buildMockResource(
+ Method.DELETE, jobTemplatesPath() + "/" + jobTemplateName, null, resp,
HttpStatus.SC_OK);
+
+ Assertions.assertTrue(metalake.deleteJobTemplate(jobTemplateName));
+
+ // Test throw InUseException
+ ErrorResponse errorResp2 =
+ ErrorResponse.inUse(
+ InUseException.class.getSimpleName(), "mock error", new
InUseException("mock error"));
+ buildMockResource(
+ Method.DELETE,
+ jobTemplatesPath() + "/" + jobTemplateName,
+ null,
+ errorResp2,
+ HttpStatus.SC_CONFLICT);
+ Assertions.assertThrows(
+ InUseException.class, () ->
metalake.deleteJobTemplate(jobTemplateName));
+ }
+
+ @Test
+ public void testListJobs() throws JsonProcessingException {
+ String jobTemplateName = "shell-job-template";
+ String jobId1 = "job-1";
+ String jobId2 = "job-2";
+
+ List<JobDTO> jobs =
+ Lists.newArrayList(newJobDTO(jobId1, jobTemplateName),
newJobDTO(jobId2, jobTemplateName));
+
+ JobListResponse resp = new JobListResponse(jobs);
+
+ buildMockResource(Method.GET, jobRunsPath(), null, resp, HttpStatus.SC_OK);
+
+ List<JobHandle> actualJobs = metalake.listJobs();
+ Assertions.assertEquals(2, actualJobs.size());
+ compare(jobs.get(0), actualJobs.get(0));
+ compare(jobs.get(1), actualJobs.get(1));
+
+ // Test throw NoSuchJobTemplateException
+ ErrorResponse errorResp =
+
ErrorResponse.notFound(NoSuchJobTemplateException.class.getSimpleName(), "mock
error");
+ buildMockResource(Method.GET, jobRunsPath(), null, errorResp,
HttpStatus.SC_NOT_FOUND);
+ Assertions.assertThrows(
+ NoSuchJobTemplateException.class, () ->
metalake.listJobs(jobTemplateName));
+
+ // Test list jobs by job template name
+ buildMockResource(
+ Method.GET,
+ jobRunsPath(),
+ ImmutableMap.of("jobTemplateName", jobTemplateName),
+ null,
+ resp,
+ HttpStatus.SC_OK);
+
+ List<JobHandle> jobsByTemplate = metalake.listJobs(jobTemplateName);
+ Assertions.assertEquals(2, jobsByTemplate.size());
+ compare(jobs.get(0), jobsByTemplate.get(0));
+ compare(jobs.get(1), jobsByTemplate.get(1));
+ }
+
+ @Test
+ public void testGetJob() throws JsonProcessingException {
+ String jobId = "job-1";
+ String jobTemplateName = "shell-job-template";
+ JobDTO expectedJob = newJobDTO(jobId, jobTemplateName);
+ JobResponse resp = new JobResponse(expectedJob);
+
+ buildMockResource(Method.GET, jobRunsPath() + "/" + jobId, null, resp,
HttpStatus.SC_OK);
+
+ JobHandle actualHandle = metalake.getJob(jobId);
+ compare(expectedJob, actualHandle);
+
+ // Test throw NoSuchJobTemplateException
+ ErrorResponse errorResp =
+ ErrorResponse.notFound(NoSuchJobException.class.getSimpleName(), "mock
error");
+ buildMockResource(
+ Method.GET, jobRunsPath() + "/" + jobId, null, errorResp,
HttpStatus.SC_NOT_FOUND);
+ Assertions.assertThrows(NoSuchJobException.class, () ->
metalake.getJob(jobId));
+ }
+
+ @Test
+ public void testRunJob() throws JsonProcessingException {
+ String jobTemplateName = "shell-job-template";
+ String jobId = "job-1";
+ JobDTO expectedJob = newJobDTO(jobId, jobTemplateName);
+ JobResponse resp = new JobResponse(expectedJob);
+ JobRunRequest req = new JobRunRequest(jobTemplateName, ImmutableMap.of());
+
+ buildMockResource(Method.POST, jobRunsPath(), req, resp, HttpStatus.SC_OK);
+
+ JobHandle actualHandle = metalake.runJob(jobTemplateName,
ImmutableMap.of());
+ compare(expectedJob, actualHandle);
+
+ // Test throw NoSuchJobTemplateException
+ ErrorResponse errorResp =
+
ErrorResponse.notFound(NoSuchJobTemplateException.class.getSimpleName(), "mock
error");
+ buildMockResource(Method.POST, jobRunsPath(), req, errorResp,
HttpStatus.SC_NOT_FOUND);
+ Assertions.assertThrows(
+ NoSuchJobTemplateException.class,
+ () -> metalake.runJob(jobTemplateName, ImmutableMap.of()));
+ }
+
+ @Test
+ public void testCancelJob() throws JsonProcessingException {
+ String jobId = "job-1";
+ String jobTemplateName = "shell-job-template";
+ JobDTO expectedJob = newJobDTO(jobId, jobTemplateName);
+ JobResponse resp = new JobResponse(expectedJob);
+
+ buildMockResource(Method.POST, jobRunsPath() + "/" + jobId, null, resp,
HttpStatus.SC_OK);
+
+ JobHandle actualHandle = metalake.cancelJob(jobId);
+ compare(expectedJob, actualHandle);
+
+ // Test throw NoSuchJobException
+ ErrorResponse errorResp =
+ ErrorResponse.notFound(NoSuchJobException.class.getSimpleName(), "mock
error");
+ buildMockResource(
+ Method.POST, jobRunsPath() + "/" + jobId, null, errorResp,
HttpStatus.SC_NOT_FOUND);
+ Assertions.assertThrows(NoSuchJobException.class, () ->
metalake.cancelJob(jobId));
+ }
+
+ private void compare(JobDTO expected, JobHandle actual) {
+ Assertions.assertEquals(expected.jobId(), actual.jobId());
+ Assertions.assertEquals(expected.jobTemplateName(),
actual.jobTemplateName());
+ Assertions.assertEquals(expected.status(), actual.jobStatus());
+ }
+
+ private String jobTemplatesPath() {
+ return "/api/metalakes/" + METALAKE_FOR_JOB_TEST + "/jobs/templates";
+ }
+
+ private String jobRunsPath() {
+ return "/api/metalakes/" + METALAKE_FOR_JOB_TEST + "/jobs/runs";
+ }
+
+ private JobTemplateDTO newShellJobTemplateDTO(String name) {
+ return ShellJobTemplateDTO.builder()
+ .withJobType(JobTemplate.JobType.SHELL)
+ .withName(name)
+ .withComment("This is a shell job template")
+ .withExecutable("/path/to/script.sh")
+ .withArguments(Collections.emptyList())
+ .withEnvironments(Collections.emptyMap())
+ .withCustomFields(Collections.emptyMap())
+ .withScripts(Collections.emptyList())
+ .build();
+ }
+
+ private JobTemplateDTO newSparkJobTemplateDTO(String name) {
+ return SparkJobTemplateDTO.builder()
+ .withJobType(JobTemplate.JobType.SPARK)
+ .withName(name)
+ .withComment("This is a spark job template")
+ .withExecutable("/path/to/spark-job.jar")
+ .withArguments(Collections.emptyList())
+ .withEnvironments(Collections.emptyMap())
+ .withCustomFields(Collections.emptyMap())
+ .withClassName("org.example.SparkJob")
+ .withJars(Collections.emptyList())
+ .withFiles(Collections.emptyList())
+ .withArchives(Collections.emptyList())
+ .withConfigs(Collections.emptyMap())
+ .build();
+ }
+
+ private JobDTO newJobDTO(String jobId, String templateName) {
+ return new JobDTO(
+ jobId,
+ templateName,
+ JobHandle.Status.QUEUED,
+
AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build());
+ }
+}