This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 36c27512f0 [#8638] feat(api): Add Java and Python interface for job
template alteration (#8644)
36c27512f0 is described below
commit 36c27512f0ea1e75f74b14b4d945d39fe428925e
Author: Jerry Shao <[email protected]>
AuthorDate: Sun Sep 28 10:09:50 2025 +0800
[#8638] feat(api): Add Java and Python interface for job template
alteration (#8644)
### What changes were proposed in this pull request?
Add the Java and Python APIs for job template alteration.
### Why are the changes needed?
This is the 1st pr of supporting job template alteration.
Fix: #8638
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Add UTs.
---
.../apache/gravitino/job/JobTemplateChange.java | 701 +++++++++++++++++++++
.../org/apache/gravitino/job/SupportsJobs.java | 15 +
.../gravitino/job/TestJobTemplateChange.java | 125 ++++
.../gravitino/api/job/job_template_change.py | 425 +++++++++++++
.../gravitino/api/job/supports_jobs.py | 22 +
.../client-python/tests/unittests/job/__init__.py | 16 +
.../unittests/job/test_job_template_change.py | 104 +++
.../java/org/apache/gravitino/job/JobManager.java | 32 +-
.../gravitino/job/local/LocalJobExecutor.java | 2 +-
9 files changed, 1439 insertions(+), 3 deletions(-)
diff --git a/api/src/main/java/org/apache/gravitino/job/JobTemplateChange.java
b/api/src/main/java/org/apache/gravitino/job/JobTemplateChange.java
new file mode 100644
index 0000000000..88d9f58d0f
--- /dev/null
+++ b/api/src/main/java/org/apache/gravitino/job/JobTemplateChange.java
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * The interface for job template changes. A job template change is an
operation that modifies a job
+ * template. It can be one of the following:
+ *
+ * <ul>
+ * <li>Rename the job template.
+ * <li>Update the comment of the job template.
+ * <li>Update the job template details, such as executable, arguments,
environments, custom
+ * fields, etc.
+ * </ul>
+ */
+public interface JobTemplateChange {
+
+ /**
+ * Creates a new job template change to update the name of the job template.
+ *
+ * @param newName The new name of the job template.
+ * @return The job template change.
+ */
+ static JobTemplateChange rename(String newName) {
+ return new RenameJobTemplate(newName);
+ }
+
+ /**
+ * Creates a new job template change to update the comment of the job
template.
+ *
+ * @param newComment The new comment of the job template.
+ * @return The job template change.
+ */
+ static JobTemplateChange updateComment(String newComment) {
+ return new UpdateJobTemplateComment(newComment);
+ }
+
+ /**
+ * Creates a new job template change to update the details of the job
template.
+ *
+ * @param templateUpdate The job template update details.
+ * @return The job template change.
+ */
+ static JobTemplateChange updateTemplate(TemplateUpdate templateUpdate) {
+ return new UpdateJobTemplate(templateUpdate);
+ }
+
+ /** A job template change to rename the job template. */
+ final class RenameJobTemplate implements JobTemplateChange {
+ private final String newName;
+
+ private RenameJobTemplate(String newName) {
+ this.newName = newName;
+ }
+
+ /**
+ * Get the new name of the job template.
+ *
+ * @return The new name of the job template.
+ */
+ public String getNewName() {
+ return newName;
+ }
+
+ /**
+ * Checks if this RenameJobTemplate is equal to another object.
+ *
+ * @param o The object to compare with.
+ * @return true if the objects are equal, false otherwise.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ RenameJobTemplate that = (RenameJobTemplate) o;
+ return Objects.equals(newName, that.newName);
+ }
+
+ /**
+ * Generates a hash code for this RenameJobTemplate.
+ *
+ * @return The hash code.
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(newName);
+ }
+
+ /**
+ * Get the string representation of the job template change.
+ *
+ * @return The string representation of the job template change.
+ */
+ @Override
+ public String toString() {
+ return "RENAME JOB TEMPLATE " + newName;
+ }
+ }
+
+ /** A job template change to update the comment of the job template. */
+ final class UpdateJobTemplateComment implements JobTemplateChange {
+ private final String newComment;
+
+ private UpdateJobTemplateComment(String newComment) {
+ this.newComment = newComment;
+ }
+
+ /**
+ * Get the new comment of the job template.
+ *
+ * @return The new comment of the job template.
+ */
+ public String getNewComment() {
+ return newComment;
+ }
+
+ /**
+ * Checks if this UpdateJobTemplateComment is equal to another object.
+ *
+ * @param o The object to compare with.
+ * @return true if the objects are equal, false otherwise.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ UpdateJobTemplateComment that = (UpdateJobTemplateComment) o;
+ return Objects.equals(newComment, that.newComment);
+ }
+
+ /**
+ * Generates a hash code for this UpdateJobTemplateComment.
+ *
+ * @return The hash code.
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(newComment);
+ }
+
+ /**
+ * Get the string representation of the job template change.
+ *
+ * @return The string representation of the job template change.
+ */
+ @Override
+ public String toString() {
+ return "UPDATE JOB TEMPLATE COMMENT " + newComment;
+ }
+ }
+
+ /** A job template change to update the details of the job template. */
+ final class UpdateJobTemplate implements JobTemplateChange {
+
+ private final TemplateUpdate templateUpdate;
+
+ private UpdateJobTemplate(TemplateUpdate templateUpdate) {
+ this.templateUpdate = templateUpdate;
+ }
+
+ /**
+ * Get the template update.
+ *
+ * @return The template update.
+ */
+ public TemplateUpdate getTemplateUpdate() {
+ return templateUpdate;
+ }
+
+ /**
+ * Checks if this UpdateTemplate is equal to another object.
+ *
+ * @param o The object to compare with.
+ * @return true if the objects are equal, false otherwise.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ UpdateJobTemplate that = (UpdateJobTemplate) o;
+ return Objects.equals(templateUpdate, that.templateUpdate);
+ }
+
+ /**
+ * Generates a hash code for this UpdateJobTemplate.
+ *
+ * @return The hash code.
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(templateUpdate);
+ }
+
+ /**
+ * Get the string representation of the job template change.
+ *
+ * @return The string representation of the job template change.
+ */
+ @Override
+ public String toString() {
+ return "UPDATE JOB TEMPLATE " +
templateUpdate.getClass().getSimpleName();
+ }
+ }
+
+ /** Base class for template updates. */
+ abstract class TemplateUpdate {
+
+ private final String newExecutable;
+
+ private final List<String> newArguments;
+
+ private final Map<String, String> newEnvironments;
+
+ private final Map<String, String> newCustomFields;
+
+ /**
+ * Constructor for TemplateUpdate.
+ *
+ * @param builder The builder to construct the TemplateUpdate.
+ */
+ protected TemplateUpdate(BaseBuilder<?, ?> builder) {
+ this.newExecutable = builder.newExecutable;
+ this.newArguments = builder.newArguments;
+ this.newEnvironments = builder.newEnvironments;
+ this.newCustomFields = builder.newCustomFields;
+ }
+
+ /**
+ * Get the new executable of the job template.
+ *
+ * @return The new executable of the job template.
+ */
+ public String getNewExecutable() {
+ return newExecutable;
+ }
+
+ /**
+ * Get the new arguments of the job template.
+ *
+ * @return The new arguments of the job template.
+ */
+ public List<String> getNewArguments() {
+ return newArguments;
+ }
+
+ /**
+ * Get the new environments of the job template.
+ *
+ * @return The new environments of the job template.
+ */
+ public Map<String, String> getNewEnvironments() {
+ return newEnvironments;
+ }
+
+ /**
+ * Get the new custom fields of the job template.
+ *
+ * @return The new custom fields of the job template.
+ */
+ public Map<String, String> getNewCustomFields() {
+ return newCustomFields;
+ }
+
+ /**
+ * Checks if this TemplateUpdate is equal to another object.
+ *
+ * @param o The object to compare with.
+ * @return true if the objects are equal, false otherwise.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TemplateUpdate)) return false;
+
+ TemplateUpdate that = (TemplateUpdate) o;
+ return Objects.equals(newExecutable, that.newExecutable)
+ && Objects.equals(newArguments, that.newArguments)
+ && Objects.equals(newEnvironments, that.newEnvironments)
+ && Objects.equals(newCustomFields, that.newCustomFields);
+ }
+
+ /**
+ * Generates a hash code for this TemplateUpdate.
+ *
+ * @return The hash code.
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(newExecutable, newArguments, newEnvironments,
newCustomFields);
+ }
+
+ /** Base builder class for constructing TemplateUpdate instances. */
+ protected abstract static class BaseBuilder<
+ B extends BaseBuilder<B, P>, P extends TemplateUpdate> {
+ private String newExecutable;
+ private List<String> newArguments;
+ private Map<String, String> newEnvironments;
+ private Map<String, String> newCustomFields;
+
+ /** Protected constructor to prevent direct instantiation. */
+ protected BaseBuilder() {}
+
+ /**
+ * Returns the builder instance itself for method chaining.
+ *
+ * @return The builder instance.
+ */
+ protected abstract B self();
+
+ /**
+ * Builds the TemplateUpdate instance.
+ *
+ * @return A new TemplateUpdate instance.
+ */
+ public abstract P build();
+
+ /**
+ * Sets the new executable for the job template.
+ *
+ * @param newExecutable The new executable to set.
+ * @return The builder instance for chaining.
+ */
+ public B withNewExecutable(String newExecutable) {
+ this.newExecutable = newExecutable;
+ return self();
+ }
+
+ /**
+ * Sets the new arguments for the job template.
+ *
+ * @param newArguments The new arguments to set.
+ * @return The builder instance for chaining.
+ */
+ public B withNewArguments(List<String> newArguments) {
+ this.newArguments = newArguments;
+ return self();
+ }
+
+ /**
+ * Sets the new environments for the job template.
+ *
+ * @param newEnvironments The new environments to set.
+ * @return The builder instance for chaining.
+ */
+ public B withNewEnvironments(Map<String, String> newEnvironments) {
+ this.newEnvironments = newEnvironments;
+ return self();
+ }
+
+ /**
+ * Sets the new custom fields for the job template.
+ *
+ * @param newCustomFields The new custom fields to set.
+ * @return The builder instance for chaining.
+ */
+ public B withNewCustomFields(Map<String, String> newCustomFields) {
+ this.newCustomFields = newCustomFields;
+ return self();
+ }
+
+ /** Validates the builder fields before building the TemplateUpdate
instance. */
+ protected void validate() {
+ Preconditions.checkArgument(
+ StringUtils.isNoneBlank(newExecutable), "Executable cannot be null
or blank");
+ this.newArguments =
+ newArguments == null ? Collections.emptyList() :
ImmutableList.copyOf(newArguments);
+ this.newEnvironments =
+ newEnvironments == null ? Collections.emptyMap() :
ImmutableMap.copyOf(newEnvironments);
+ this.newCustomFields =
+ newCustomFields == null ? Collections.emptyMap() :
ImmutableMap.copyOf(newCustomFields);
+ }
+ }
+ }
+
+ /** A job template update for shell templates. */
+ final class ShellTemplateUpdate extends TemplateUpdate {
+
+ private final List<String> newScripts;
+
+ private ShellTemplateUpdate(Builder builder) {
+ super(builder);
+ this.newScripts = builder.newScripts;
+ }
+
+ /**
+ * Get the new scripts of the shell job template.
+ *
+ * @return The new scripts of the shell job template.
+ */
+ public List<String> getNewScripts() {
+ return newScripts;
+ }
+
+ /**
+ * Checks if this ShellTemplateUpdate is equal to another object.
+ *
+ * @param o The object to compare with.
+ * @return true if the objects are equal, false otherwise.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ ShellTemplateUpdate that = (ShellTemplateUpdate) o;
+ return Objects.equals(newScripts, that.newScripts);
+ }
+
+ /**
+ * Generates a hash code for this ShellTemplateUpdate.
+ *
+ * @return The hash code.
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), newScripts);
+ }
+
+ /**
+ * Creates a new builder for ShellTemplateUpdate.
+ *
+ * @return A new Builder instance.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Builder class for constructing ShellTemplateUpdate instances. */
+ public static class Builder extends BaseBuilder<Builder,
ShellTemplateUpdate> {
+
+ private List<String> newScripts;
+
+ private Builder() {}
+
+ /**
+ * Sets the new scripts for the shell job template.
+ *
+ * @param newScripts The new scripts to set.
+ * @return The builder instance for chaining.
+ */
+ public Builder withNewScripts(List<String> newScripts) {
+ this.newScripts = newScripts;
+ return this;
+ }
+
+ /**
+ * Builds the ShellTemplateUpdate instance.
+ *
+ * @return A new ShellTemplateUpdate instance.
+ */
+ @Override
+ public ShellTemplateUpdate build() {
+ validate();
+ return new ShellTemplateUpdate(this);
+ }
+
+ /**
+ * Returns the builder instance itself for method chaining.
+ *
+ * @return The builder instance.
+ */
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ /** Validates the builder fields before building the ShellTemplateUpdate
instance. */
+ @Override
+ protected void validate() {
+ super.validate();
+ this.newScripts =
+ newScripts == null ? Collections.emptyList() :
ImmutableList.copyOf(newScripts);
+ }
+ }
+ }
+
+ /** A job template update for spark templates. */
+ final class SparkTemplateUpdate extends TemplateUpdate {
+
+ private final String newClassName;
+
+ private final List<String> newJars;
+
+ private final List<String> newFiles;
+
+ private final List<String> newArchives;
+
+ private final Map<String, String> newConfigs;
+
+ private SparkTemplateUpdate(Builder builder) {
+ super(builder);
+ this.newClassName = builder.newClassName;
+ this.newJars = builder.newJars;
+ this.newFiles = builder.newFiles;
+ this.newArchives = builder.newArchives;
+ this.newConfigs = builder.newConfigs;
+ }
+
+ /**
+ * Get the new class name of the spark job template.
+ *
+ * @return The new class name of the spark job template.
+ */
+ public String getNewClassName() {
+ return newClassName;
+ }
+
+ /**
+ * Get the new jars of the spark job template.
+ *
+ * @return The new jars of the spark job template.
+ */
+ public List<String> getNewJars() {
+ return newJars;
+ }
+
+ /**
+ * Get the new files of the spark job template.
+ *
+ * @return The new files of the spark job template.
+ */
+ public List<String> getNewFiles() {
+ return newFiles;
+ }
+
+ /**
+ * Get the new archives of the spark job template.
+ *
+ * @return The new archives of the spark job template.
+ */
+ public List<String> getNewArchives() {
+ return newArchives;
+ }
+
+ /**
+ * Get the new configs of the spark job template.
+ *
+ * @return The new configs of the spark job template.
+ */
+ public Map<String, String> getNewConfigs() {
+ return newConfigs;
+ }
+
+ /**
+ * Checks if this SparkTemplateUpdate is equal to another object.
+ *
+ * @param o The object to compare with.
+ * @return true if the objects are equal, false otherwise.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ SparkTemplateUpdate that = (SparkTemplateUpdate) o;
+ return Objects.equals(newClassName, that.newClassName)
+ && Objects.equals(newJars, that.newJars)
+ && Objects.equals(newFiles, that.newFiles)
+ && Objects.equals(newArchives, that.newArchives)
+ && Objects.equals(newConfigs, that.newConfigs);
+ }
+
+ /**
+ * Generates a hash code for this SparkTemplateUpdate.
+ *
+ * @return The hash code.
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ super.hashCode(), newClassName, newJars, newFiles, newArchives,
newConfigs);
+ }
+
+ /**
+ * Creates a new builder for SparkTemplateUpdate.
+ *
+ * @return A new Builder instance.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Builder class for constructing SparkTemplateUpdate instances. */
+ public static class Builder extends BaseBuilder<Builder,
SparkTemplateUpdate> {
+ private String newClassName;
+ private List<String> newJars;
+ private List<String> newFiles;
+ private List<String> newArchives;
+ private Map<String, String> newConfigs;
+
+ private Builder() {}
+
+ /**
+ * Sets the new class name for the spark job template.
+ *
+ * @param newClassName The new class name to set.
+ * @return The builder instance for chaining.
+ */
+ public Builder withNewClassName(String newClassName) {
+ this.newClassName = newClassName;
+ return this;
+ }
+
+ /**
+ * Sets the new jars for the spark job template.
+ *
+ * @param newJars The new jars to set.
+ * @return The builder instance for chaining.
+ */
+ public Builder withNewJars(List<String> newJars) {
+ this.newJars = newJars;
+ return this;
+ }
+
+ /**
+ * Sets the new files for the spark job template.
+ *
+ * @param newFiles The new files to set.
+ * @return The builder instance for chaining.
+ */
+ public Builder withNewFiles(List<String> newFiles) {
+ this.newFiles = newFiles;
+ return this;
+ }
+
+ /**
+ * Sets the new archives for the spark job template.
+ *
+ * @param newArchives The new archives to set.
+ * @return The builder instance for chaining.
+ */
+ public Builder withNewArchives(List<String> newArchives) {
+ this.newArchives = newArchives;
+ return this;
+ }
+
+ /**
+ * Sets the new configs for the spark job template.
+ *
+ * @param newConfigs The new configs to set.
+ * @return The builder instance for chaining.
+ */
+ public Builder withNewConfigs(Map<String, String> newConfigs) {
+ this.newConfigs = newConfigs;
+ return this;
+ }
+
+ /**
+ * Builds the SparkTemplateUpdate instance.
+ *
+ * @return A new SparkTemplateUpdate instance.
+ */
+ @Override
+ public SparkTemplateUpdate build() {
+ validate();
+ return new SparkTemplateUpdate(this);
+ }
+
+ /**
+ * Returns the builder instance itself for method chaining.
+ *
+ * @return The builder instance.
+ */
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ /** Validates the builder fields before building the SparkTemplateUpdate
instance. */
+ @Override
+ protected void validate() {
+ super.validate();
+ this.newJars = newJars == null ? Collections.emptyList() :
ImmutableList.copyOf(newJars);
+ this.newFiles = newFiles == null ? Collections.emptyList() :
ImmutableList.copyOf(newFiles);
+ this.newArchives =
+ newArchives == null ? Collections.emptyList() :
ImmutableList.copyOf(newArchives);
+ this.newConfigs =
+ newConfigs == null ? Collections.emptyMap() :
ImmutableMap.copyOf(newConfigs);
+ }
+ }
+ }
+}
diff --git a/api/src/main/java/org/apache/gravitino/job/SupportsJobs.java
b/api/src/main/java/org/apache/gravitino/job/SupportsJobs.java
index e199107806..b5ca1698e2 100644
--- a/api/src/main/java/org/apache/gravitino/job/SupportsJobs.java
+++ b/api/src/main/java/org/apache/gravitino/job/SupportsJobs.java
@@ -72,6 +72,21 @@ public interface SupportsJobs {
*/
boolean deleteJobTemplate(String jobTemplateName) throws InUseException;
+ /**
+ * Alters a job template by applying the specified changes. This allows for
modifying the
+ * properties of an existing job template, such as its name, description, or
parameters.
+ *
+ * @param jobTemplateName the name of the job template to alter
+ * @param changes the changes to apply to the job template
+ * @return the updated job template after applying the changes
+ * @throws NoSuchJobTemplateException if no job template with the specified
name exists
+ * @throws IllegalArgumentException if any of the changes cannot be applied
to the job template
+ */
+ default JobTemplate alterJobTemplate(String jobTemplateName,
JobTemplateChange... changes)
+ throws NoSuchJobTemplateException, IllegalArgumentException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
/**
* Lists all the jobs by the specified job template name. This will return a
list of job handles
* associated with the specified job template. Each job handle represents a
specific job.
diff --git
a/api/src/test/java/org/apache/gravitino/job/TestJobTemplateChange.java
b/api/src/test/java/org/apache/gravitino/job/TestJobTemplateChange.java
new file mode 100644
index 0000000000..8d9d17257f
--- /dev/null
+++ b/api/src/test/java/org/apache/gravitino/job/TestJobTemplateChange.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gravitino.job;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestJobTemplateChange {
+
+ @Test
+ public void testRenameJobTemplate() {
+ JobTemplateChange change = JobTemplateChange.rename("newTemplateName");
+ Assertions.assertInstanceOf(JobTemplateChange.RenameJobTemplate.class,
change);
+ Assertions.assertEquals(
+ "newTemplateName", ((JobTemplateChange.RenameJobTemplate)
change).getNewName());
+
+ JobTemplateChange change1 = JobTemplateChange.rename("newTemplateName");
+ Assertions.assertEquals(change, change1);
+ Assertions.assertEquals(change.hashCode(), change1.hashCode());
+ }
+
+ @Test
+ public void testUpdateJobTemplateComment() {
+ JobTemplateChange change = JobTemplateChange.updateComment("New comment");
+
Assertions.assertInstanceOf(JobTemplateChange.UpdateJobTemplateComment.class,
change);
+ Assertions.assertEquals(
+ "New comment", ((JobTemplateChange.UpdateJobTemplateComment)
change).getNewComment());
+
+ JobTemplateChange change1 = JobTemplateChange.updateComment("New comment");
+ Assertions.assertEquals(change, change1);
+ Assertions.assertEquals(change.hashCode(), change1.hashCode());
+ }
+
+ @Test
+ public void testUpdateShellJobTemplate() {
+ JobTemplateChange.ShellTemplateUpdate update =
+ JobTemplateChange.ShellTemplateUpdate.builder()
+ .withNewExecutable("newExecutable.sh")
+ .withNewArguments(Lists.newArrayList("arg1", "arg2"))
+ .withNewEnvironments(ImmutableMap.of("ENV1", "value1", "ENV2",
"value2"))
+ .withNewCustomFields(Collections.emptyMap())
+ .withNewScripts(Lists.newArrayList("test1.sh", "test2.sh"))
+ .build();
+
+ JobTemplateChange change = JobTemplateChange.updateTemplate(update);
+
+ Assertions.assertInstanceOf(JobTemplateChange.UpdateJobTemplate.class,
change);
+ JobTemplateChange.UpdateJobTemplate updateChange =
(JobTemplateChange.UpdateJobTemplate) change;
+ Assertions.assertEquals(
+ update.getNewExecutable(),
updateChange.getTemplateUpdate().getNewExecutable());
+ Assertions.assertEquals(
+ update.getNewArguments(),
updateChange.getTemplateUpdate().getNewArguments());
+ Assertions.assertEquals(
+ update.getNewEnvironments(),
updateChange.getTemplateUpdate().getNewEnvironments());
+ Assertions.assertEquals(
+ update.getNewCustomFields(),
updateChange.getTemplateUpdate().getNewCustomFields());
+ Assertions.assertEquals(
+ update.getNewScripts(),
+ ((JobTemplateChange.ShellTemplateUpdate)
updateChange.getTemplateUpdate()).getNewScripts());
+ }
+
+ @Test
+ public void testUpdateSparkJobTemplate() {
+ JobTemplateChange.SparkTemplateUpdate update =
+ JobTemplateChange.SparkTemplateUpdate.builder()
+ .withNewClassName("org.apache.spark.examples.NewExample")
+ .withNewExecutable("new-spark-examples.jar")
+ .withNewArguments(Lists.newArrayList("arg1", "arg2"))
+ .withNewEnvironments(ImmutableMap.of("ENV1", "value1", "ENV2",
"value2"))
+ .withNewCustomFields(Collections.emptyMap())
+ .withNewJars(Lists.newArrayList("lib1.jar", "lib2.jar"))
+ .withNewFiles(Lists.newArrayList("file1.txt", "file2.txt"))
+ .withNewArchives(Lists.newArrayList("archive1.zip",
"archive2.zip"))
+ .withNewConfigs(ImmutableMap.of("spark.master", "local[*]",
"spark.app.name", "NewApp"))
+ .build();
+
+ JobTemplateChange change = JobTemplateChange.updateTemplate(update);
+
+ Assertions.assertInstanceOf(JobTemplateChange.UpdateJobTemplate.class,
change);
+ JobTemplateChange.UpdateJobTemplate updateChange =
(JobTemplateChange.UpdateJobTemplate) change;
+ Assertions.assertEquals(
+ update.getNewClassName(),
+ ((JobTemplateChange.SparkTemplateUpdate)
updateChange.getTemplateUpdate())
+ .getNewClassName());
+ Assertions.assertEquals(
+ update.getNewExecutable(),
updateChange.getTemplateUpdate().getNewExecutable());
+ Assertions.assertEquals(
+ update.getNewArguments(),
updateChange.getTemplateUpdate().getNewArguments());
+ Assertions.assertEquals(
+ update.getNewEnvironments(),
updateChange.getTemplateUpdate().getNewEnvironments());
+ Assertions.assertEquals(
+ update.getNewCustomFields(),
updateChange.getTemplateUpdate().getNewCustomFields());
+ Assertions.assertEquals(
+ update.getNewJars(),
+ ((JobTemplateChange.SparkTemplateUpdate)
updateChange.getTemplateUpdate()).getNewJars());
+ Assertions.assertEquals(
+ update.getNewFiles(),
+ ((JobTemplateChange.SparkTemplateUpdate)
updateChange.getTemplateUpdate()).getNewFiles());
+ Assertions.assertEquals(
+ update.getNewArchives(),
+ ((JobTemplateChange.SparkTemplateUpdate)
updateChange.getTemplateUpdate())
+ .getNewArchives());
+ Assertions.assertEquals(
+ update.getNewConfigs(),
+ ((JobTemplateChange.SparkTemplateUpdate)
updateChange.getTemplateUpdate()).getNewConfigs());
+ }
+}
diff --git a/clients/client-python/gravitino/api/job/job_template_change.py
b/clients/client-python/gravitino/api/job/job_template_change.py
new file mode 100644
index 0000000000..1c3c1c50db
--- /dev/null
+++ b/clients/client-python/gravitino/api/job/job_template_change.py
@@ -0,0 +1,425 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from abc import ABC
+from typing import List, Dict, Optional, Any
+from copy import deepcopy
+
+
+class JobTemplateChange:
+ """
+ The interface for job template changes. A job template change is an
operation that modifies a job
+ template. It can be one of the following:
+
+ - Rename the job template.
+ - Update the comment of the job template.
+ - Update the job template details, such as executable, arguments,
environments, custom
+ fields, etc.
+ """
+
+ @staticmethod
+ def rename(new_name: str) -> "JobTemplateChange":
+ """
+ Creates a new job template change to update the name of the job
template.
+
+ Args:
+ new_name: The new name of the job template.
+
+ Returns:
+ The job template change.
+ """
+ return RenameJobTemplate(new_name)
+
+ @staticmethod
+ def update_comment(new_comment: str) -> "JobTemplateChange":
+ """
+ Creates a new job template change to update the comment of the job
template.
+
+ Args:
+ new_comment: The new comment of the job template.
+
+ Returns:
+ The job template change.
+ """
+ return UpdateJobTemplateComment(new_comment)
+
+ @staticmethod
+ def update_template(template_update: "TemplateUpdate") ->
"JobTemplateChange":
+ """
+ Creates a new job template change to update the details of the job
template.
+
+ Args:
+ template_update: The template update details.
+
+ Returns:
+ The job template change.
+ """
+ return UpdateJobTemplate(template_update)
+
+
+class RenameJobTemplate(JobTemplateChange):
+ """
+ A job template change to rename the job template.
+ """
+
+ def __init__(self, new_name: str):
+ """
+ Initialize a RenameJobTemplate change.
+
+ Args:
+ new_name: The new name of the job template.
+ """
+ self._new_name = new_name
+
+ def get_new_name(self) -> str:
+ """
+ Get the new name of the job template.
+
+ Returns:
+ The new name of the job template.
+ """
+ return self._new_name
+
+ def __eq__(self, other: Any) -> bool:
+ return (
+ isinstance(other, RenameJobTemplate) and self._new_name ==
other._new_name
+ )
+
+ def __hash__(self) -> int:
+ return hash(self._new_name)
+
+ def __str__(self) -> str:
+ return f"RENAME JOB TEMPLATE {self._new_name}"
+
+
+class UpdateJobTemplateComment(JobTemplateChange):
+ """
+ A job template change to update the comment of the job template.
+ """
+
+ def __init__(self, new_comment: str):
+ """
+ Initialize an UpdateJobTemplateComment change.
+
+ Args:
+ new_comment: The new comment of the job template.
+ """
+ self._new_comment = new_comment
+
+ def get_new_comment(self) -> str:
+ """
+ Get the new comment of the job template.
+
+ Returns:
+ The new comment of the job template.
+ """
+ return self._new_comment
+
+ def __eq__(self, other: Any) -> bool:
+ return (
+ isinstance(other, UpdateJobTemplateComment)
+ and self._new_comment == other._new_comment
+ )
+
+ def __hash__(self) -> int:
+ return hash(self._new_comment)
+
+ def __str__(self) -> str:
+ return f"UPDATE JOB TEMPLATE COMMENT {self._new_comment}"
+
+
+class UpdateJobTemplate(JobTemplateChange):
+ """
+ A job template change to update the details of the job template.
+ """
+
+ def __init__(self, template_update: "TemplateUpdate"):
+ """
+ Initialize an UpdateJobTemplate change.
+
+ Args:
+ template_update: The job template update details.
+ """
+ self._template_update = template_update
+
+ def get_template_update(self) -> "TemplateUpdate":
+ """
+ Get the job template update.
+
+ Returns:
+ The job template update.
+ """
+ return self._template_update
+
+ def __eq__(self, other: Any) -> bool:
+ return (
+ isinstance(other, UpdateJobTemplate)
+ and self._template_update == other._template_update
+ )
+
+ def __hash__(self) -> int:
+ return hash(self._template_update)
+
+ def __str__(self) -> str:
+ return f"UPDATE JOB TEMPLATE {type(self._template_update).__name__}"
+
+
+class TemplateUpdate(ABC):
+ """
+ Base class for template updates.
+ """
+
+ def __init__(
+ self,
+ new_executable: str,
+ new_arguments: Optional[List[str]] = None,
+ new_environments: Optional[Dict[str, str]] = None,
+ new_custom_fields: Optional[Dict[str, str]] = None,
+ ):
+ """
+ Initialize a TemplateUpdate.
+
+ Args:
+ new_executable: The new executable of the job template.
+ new_arguments: The new arguments of the job template.
+ new_environments: The new environments of the job template.
+ new_custom_fields: The new custom fields of the job template.
+ """
+ if not new_executable or not new_executable.strip():
+ raise ValueError("Executable cannot be null or blank")
+ self._new_executable = new_executable
+ self._new_arguments = (
+ deepcopy(new_arguments) if new_arguments is not None else []
+ )
+ self._new_environments = (
+ deepcopy(new_environments) if new_environments is not None else {}
+ )
+ self._new_custom_fields = (
+ deepcopy(new_custom_fields) if new_custom_fields is not None else
{}
+ )
+
+ def get_new_executable(self) -> str:
+ """
+ Get the new executable of the job template.
+
+ Returns:
+ The new executable of the job template.
+ """
+ return self._new_executable
+
+ def get_new_arguments(self) -> List[str]:
+ """
+ Get the new arguments of the job template.
+
+ Returns:
+ The new arguments of the job template.
+ """
+ return self._new_arguments
+
+ def get_new_environments(self) -> Dict[str, str]:
+ """
+ Get the new environments of the job template.
+
+ Returns:
+ The new environments of the job template.
+ """
+ return self._new_environments
+
+ def get_new_custom_fields(self) -> Dict[str, str]:
+ """
+ Get the new custom fields of the job template.
+
+ Returns:
+ The new custom fields of the job template.
+ """
+ return self._new_custom_fields
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, TemplateUpdate):
+ return False
+ return (
+ self._new_executable == other._new_executable
+ and self._new_arguments == other._new_arguments
+ and self._new_environments == other._new_environments
+ and self._new_custom_fields == other._new_custom_fields
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ self._new_executable,
+ tuple(self._new_arguments),
+ frozenset(self._new_environments.items()),
+ frozenset(self._new_custom_fields.items()),
+ )
+ )
+
+
+class ShellTemplateUpdate(TemplateUpdate):
+ """
+ A template update for shell job templates.
+ """
+
+ def __init__(
+ self,
+ new_executable: str,
+ new_arguments: Optional[List[str]] = None,
+ new_environments: Optional[Dict[str, str]] = None,
+ new_custom_fields: Optional[Dict[str, str]] = None,
+ new_scripts: Optional[List[str]] = None,
+ ):
+ """
+ Initialize a ShellTemplateUpdate.
+
+ Args:
+ new_executable: The new executable of the shell job template.
+ new_arguments: The new arguments of the shell job template.
+ new_environments: The new environments of the shell job template.
+ new_custom_fields: The new custom fields of the shell job template.
+ new_scripts: The new scripts of the shell job template.
+ """
+ super().__init__(
+ new_executable, new_arguments, new_environments, new_custom_fields
+ )
+ self._new_scripts = deepcopy(new_scripts) if new_scripts is not None
else []
+
+ def get_new_scripts(self) -> List[str]:
+ """
+ Get the new scripts of the shell job template.
+
+ Returns:
+ The new scripts of the shell job template.
+ """
+ return self._new_scripts
+
+ def __eq__(self, other: Any) -> bool:
+ return (
+ isinstance(other, ShellTemplateUpdate)
+ and super().__eq__(other)
+ and self._new_scripts == other._new_scripts
+ )
+
+ def __hash__(self) -> int:
+ return hash((super().__hash__(), tuple(self._new_scripts)))
+
+
+class SparkTemplateUpdate(TemplateUpdate):
+ """
+ A template update for spark job templates.
+ """
+
+ def __init__(
+ self,
+ new_executable: str,
+ new_arguments: Optional[List[str]] = None,
+ new_environments: Optional[Dict[str, str]] = None,
+ new_custom_fields: Optional[Dict[str, str]] = None,
+ new_class_name: Optional[str] = None,
+ new_jars: Optional[List[str]] = None,
+ new_files: Optional[List[str]] = None,
+ new_archives: Optional[List[str]] = None,
+ new_configs: Optional[Dict[str, str]] = None,
+ ):
+ """
+ Initialize a SparkTemplateUpdate.
+
+ Args:
+ new_executable: The new executable of the spark job template.
+ new_arguments: The new arguments of the spark job template.
+ new_environments: The new environments of the spark job template.
+ new_custom_fields: The new custom fields of the spark job template.
+ new_class_name: The new class name of the spark job template.
+ new_jars: The new jars of the spark job template.
+ new_files: The new files of the spark job template.
+ new_archives: The new archives of the spark job template.
+ new_configs: The new configs of the spark job template.
+ """
+ super().__init__(
+ new_executable, new_arguments, new_environments, new_custom_fields
+ )
+ self._new_class_name = new_class_name
+ self._new_jars = deepcopy(new_jars) if new_jars is not None else []
+ self._new_files = deepcopy(new_files) if new_files is not None else []
+ self._new_archives = deepcopy(new_archives) if new_archives is not
None else []
+ self._new_configs = deepcopy(new_configs) if new_configs is not None
else {}
+
+ def get_new_class_name(self) -> Optional[str]:
+ """
+ Get the new class name of the spark job template.
+
+ Returns:
+ The new class name of the spark job template.
+ """
+ return self._new_class_name
+
+ def get_new_jars(self) -> List[str]:
+ """
+ Get the new jars of the spark job template.
+
+ Returns:
+ The new jars of the spark job template.
+ """
+ return self._new_jars
+
+ def get_new_files(self) -> List[str]:
+ """
+ Get the new files of the spark job template.
+
+ Returns:
+ The new files of the spark job template.
+ """
+ return self._new_files
+
+ def get_new_archives(self) -> List[str]:
+ """
+ Get the new archives of the spark job template.
+
+ Returns:
+ The new archives of the spark job template.
+ """
+ return self._new_archives
+
+ def get_new_configs(self) -> Dict[str, str]:
+ """
+ Get the new configs of the spark job template.
+
+ Returns:
+ The new configs of the spark job template.
+ """
+ return self._new_configs
+
+ def __eq__(self, other: Any) -> bool:
+ return (
+ isinstance(other, SparkTemplateUpdate)
+ and super().__eq__(other)
+ and self._new_class_name == other._new_class_name
+ and self._new_jars == other._new_jars
+ and self._new_files == other._new_files
+ and self._new_archives == other._new_archives
+ and self._new_configs == other._new_configs
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ super().__hash__(),
+ self._new_class_name,
+ tuple(self._new_jars),
+ tuple(self._new_files),
+ tuple(self._new_archives),
+ frozenset(self._new_configs.items()),
+ )
+ )
diff --git a/clients/client-python/gravitino/api/job/supports_jobs.py
b/clients/client-python/gravitino/api/job/supports_jobs.py
index 6c86a98bf9..adb584fe40 100644
--- a/clients/client-python/gravitino/api/job/supports_jobs.py
+++ b/clients/client-python/gravitino/api/job/supports_jobs.py
@@ -19,6 +19,7 @@ from abc import ABC, abstractmethod
from typing import List, Dict
from .job_handle import JobHandle
from .job_template import JobTemplate
+from .job_template_change import JobTemplateChange
class SupportsJobs(ABC):
@@ -89,6 +90,27 @@ class SupportsJobs(ABC):
"""
pass
+ def alter_job_template(
+ self, job_template_name: str, *changes: JobTemplateChange
+ ) -> JobTemplate:
+ """
+ Alters a job template by applying the specified changes. This allows
for modifying the
+ properties of an existing job template, such as its name, description,
or parameters.
+
+ Args:
+ job_template_name: The name of the job template.
+ changes: The changes to apply to the job template.
+
+ Raises:
+ NoSuchJobTemplateException: If the job template does not exist.
+ IllegalArgumentException: If any of the changes cannot be applied
to the job template.
+
+ Returns:
+ The altered job template.
+ """
+ pass
+
+ @abstractmethod
def list_jobs(self, job_template_name: str = None) -> List[JobHandle]:
"""
Lists all the jobs in Gravitino. If a job template name is provided,
it will filter the jobs
diff --git a/clients/client-python/tests/unittests/job/__init__.py
b/clients/client-python/tests/unittests/job/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/clients/client-python/tests/unittests/job/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/clients/client-python/tests/unittests/job/test_job_template_change.py
b/clients/client-python/tests/unittests/job/test_job_template_change.py
new file mode 100644
index 0000000000..56d71d5585
--- /dev/null
+++ b/clients/client-python/tests/unittests/job/test_job_template_change.py
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from typing import cast
+
+from gravitino.api.job.job_template_change import (
+ JobTemplateChange,
+ RenameJobTemplate,
+ UpdateJobTemplateComment,
+ UpdateJobTemplate,
+ ShellTemplateUpdate,
+ SparkTemplateUpdate,
+)
+
+
+class TestJobTemplateChange(unittest.TestCase):
+ def test_rename_job_template(self):
+ change = JobTemplateChange.rename("new_name")
+ self.assertIsInstance(change, RenameJobTemplate)
+ self.assertEqual(change.get_new_name(), "new_name")
+ self.assertEqual(str(change), "RENAME JOB TEMPLATE new_name")
+
+ def test_update_comment(self):
+ change = JobTemplateChange.update_comment("new comment")
+ self.assertIsInstance(change, UpdateJobTemplateComment)
+ self.assertEqual(change.get_new_comment(), "new comment")
+ self.assertEqual(str(change), "UPDATE JOB TEMPLATE COMMENT new
comment")
+
+ def test_update_shell_template(self):
+ template_update = ShellTemplateUpdate(
+ new_executable="/bin/bash",
+ new_arguments=["-c", "echo hello"],
+ new_environments={"ENV": "test"},
+ new_custom_fields={"field": "value"},
+ new_scripts=["script1", "echo script1 content"],
+ )
+ change = JobTemplateChange.update_template(template_update)
+ self.assertIsInstance(change, UpdateJobTemplate)
+ self.assertEqual(change.get_template_update().get_new_executable(),
"/bin/bash")
+ self.assertEqual(
+ change.get_template_update().get_new_arguments(), ["-c", "echo
hello"]
+ )
+ self.assertEqual(
+ change.get_template_update().get_new_environments(), {"ENV":
"test"}
+ )
+ self.assertEqual(
+ change.get_template_update().get_new_custom_fields(), {"field":
"value"}
+ )
+ self.assertEqual(
+ cast(ShellTemplateUpdate,
change.get_template_update()).get_new_scripts(),
+ ["script1", "echo script1 content"],
+ )
+
+ def test_update_spark_template(self):
+
+ template_update = SparkTemplateUpdate(
+ new_executable="spark-submit",
+ new_arguments=["--class", "org.example.Main", "app.jar"],
+ new_environments={"SPARK_ENV": "prod"},
+ new_custom_fields={"spark_field": "spark_value"},
+ new_class_name="org.example.Main",
+ new_jars=["lib1.jar", "lib2.jar"],
+ new_files=["file1.txt", "file2.txt"],
+ new_archives=["archive1.zip"],
+ new_configs={"spark.executor.memory": "4g"},
+ )
+ change = JobTemplateChange.update_template(template_update)
+ self.assertIsInstance(change, UpdateJobTemplate)
+ self.assertEqual(
+ change.get_template_update().get_new_executable(), "spark-submit"
+ )
+ self.assertEqual(
+ change.get_template_update().get_new_arguments(),
+ ["--class", "org.example.Main", "app.jar"],
+ )
+ self.assertEqual(
+ change.get_template_update().get_new_environments(), {"SPARK_ENV":
"prod"}
+ )
+ self.assertEqual(
+ change.get_template_update().get_new_custom_fields(),
+ {"spark_field": "spark_value"},
+ )
+ spark_update = cast(SparkTemplateUpdate, change.get_template_update())
+ self.assertEqual(spark_update.get_new_class_name(), "org.example.Main")
+ self.assertEqual(spark_update.get_new_jars(), ["lib1.jar", "lib2.jar"])
+ self.assertEqual(spark_update.get_new_files(), ["file1.txt",
"file2.txt"])
+ self.assertEqual(spark_update.get_new_archives(), ["archive1.zip"])
+ self.assertEqual(
+ spark_update.get_new_configs(), {"spark.executor.memory": "4g"}
+ )
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 331afb43f9..af02d1f655 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -503,7 +503,34 @@ public class JobManager implements JobOperationDispatcher {
activeJobs.forEach(
job -> {
- JobHandle.Status newStatus =
jobExecutor.getJobStatus(job.jobExecutionId());
+ JobHandle.Status newStatus = job.status();
+ try {
+ newStatus = jobExecutor.getJobStatus(job.jobExecutionId());
+ } catch (NoSuchJobException e) {
+ // If the job is not found in the external job executor, we
assume the job is
+ // FAILED if it is not in CANCELLING status, otherwise we assume
it is CANCELLED.
+ if (job.status() == JobHandle.Status.CANCELLING) {
+ newStatus = JobHandle.Status.CANCELLED;
+ } else {
+ newStatus = JobHandle.Status.FAILED;
+ }
+ LOG.warn(
+ "Job {} with execution id {} under metalake {} is not found
in the "
+ + "external job executor, marking it as {}. This could
be due to the job "
+ + "being deleted by the external job executor. Please
check the external job "
+ + "executor to know more details.",
+ job.name(),
+ job.jobExecutionId(),
+ metalake,
+ newStatus);
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to get job status for job {} by execution id {}",
+ job.name(),
+ job.jobExecutionId(),
+ e);
+ }
+
if (newStatus != job.status()) {
JobEntity newJobEntity =
JobEntity.builder()
@@ -522,6 +549,7 @@ public class JobManager implements JobOperationDispatcher {
.build();
// Update the job entity with new status.
+ JobHandle.Status finalNewStatus = newStatus;
TreeLockUtils.doWithTreeLock(
NameIdentifierUtil.ofJob(metalake, job.name()),
LockType.WRITE,
@@ -533,7 +561,7 @@ public class JobManager implements JobOperationDispatcher {
throw new RuntimeException(
String.format(
"Failed to update job entity %s to status %s",
- newJobEntity, newStatus),
+ newJobEntity, finalNewStatus),
e);
}
});
diff --git
a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
index a1c26331ef..4b1af0e4ec 100644
--- a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
+++ b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
@@ -316,7 +316,7 @@ public class LocalJobExecutor implements JobExecutor {
jobExecutorService.submit(() -> runJob(jobPair));
} catch (InterruptedException e) {
- LOG.warn("Polling job interrupted", e);
+ LOG.warn("Polling job interrupted");
finished = true;
}
}