This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 46091e14b3 [#7505] feat(core): Add job execution logic and job 
framework (part-2) (#7772)
46091e14b3 is described below

commit 46091e14b3cdf4a499ac6f8b256df5d28fb89083
Author: Jerry Shao <[email protected]>
AuthorDate: Mon Jul 28 09:50:48 2025 +0800

    [#7505] feat(core): Add job execution logic and job framework (part-2) 
(#7772)
    
    ### What changes were proposed in this pull request?
    
    This is the second part of the job framework PR to add the job executor
    interface and complete the job framework.
    
    ### Why are the changes needed?
    
    Fix: #7505
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add UTs to cover the code changes.
---
 .../java/org/apache/gravitino/job/JobHandle.java   |   3 +
 .../org/apache/gravitino/job/ShellJobTemplate.java |  11 +
 .../org/apache/gravitino/job/SparkJobTemplate.java |  11 +
 .../main/java/org/apache/gravitino/Configs.java    |  11 +
 .../gravitino/connector/job/JobExecutor.java       |  83 ++++++
 .../apache/gravitino/job/JobExecutorFactory.java   |  60 ++++
 .../java/org/apache/gravitino/job/JobManager.java  | 291 ++++++++++++++++++-
 .../gravitino/job/JobOperationDispatcher.java      |  27 +-
 .../java/org/apache/gravitino/meta/JobEntity.java  |  19 +-
 .../org/apache/gravitino/job/TestJobManager.java   | 165 ++++++++++-
 .../org/apache/gravitino/job/TestJobTemplate.java  | 314 +++++++++++++++++++++
 11 files changed, 975 insertions(+), 20 deletions(-)

diff --git a/api/src/main/java/org/apache/gravitino/job/JobHandle.java 
b/api/src/main/java/org/apache/gravitino/job/JobHandle.java
index 38927538de..9aded575cc 100644
--- a/api/src/main/java/org/apache/gravitino/job/JobHandle.java
+++ b/api/src/main/java/org/apache/gravitino/job/JobHandle.java
@@ -42,6 +42,9 @@ public interface JobHandle {
     /** The job has completed successfully. */
     SUCCEEDED,
 
+    /** The job is being cancelled. */
+    CANCELLING,
+
     /** The job has been cancelled. */
     CANCELLED;
   }
diff --git a/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java 
b/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
index afb6fdd07d..57edcca61c 100644
--- a/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
+++ b/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
@@ -89,11 +89,22 @@ public class ShellJobTemplate extends JobTemplate {
     return "\nShellJobTemplate{\n" + super.toString() + sb + "}\n";
   }
 
+  /**
+   * Creates a new builder for constructing instances of {@link 
ShellJobTemplate}.
+   *
+   * @return a new instance of {@link Builder}
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
   /** Builder for creating instances of {@link ShellJobTemplate}. */
   public static class Builder extends BaseBuilder<Builder, ShellJobTemplate> {
 
     private List<String> scripts;
 
+    private Builder() {}
+
     /**
      * Sets the scripts to be executed by the shell job template.
      *
diff --git a/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java 
b/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
index a1f68f7e75..13b6d8c6bf 100644
--- a/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
+++ b/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
@@ -181,6 +181,15 @@ public class SparkJobTemplate extends JobTemplate {
     return sb + super.toString() + "}\n";
   }
 
+  /**
+   * Creates a new builder for constructing instances of {@link 
SparkJobTemplate}.
+   *
+   * @return a new instance of {@link Builder}
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
   /** Builder for creating instances of {@link SparkJobTemplate}. */
   public static class Builder extends JobTemplate.BaseBuilder<Builder, 
SparkJobTemplate> {
 
@@ -194,6 +203,8 @@ public class SparkJobTemplate extends JobTemplate {
 
     private Map<String, String> configs;
 
+    private Builder() {}
+
     /**
      * Sets the class name of the Spark application to be executed.
      *
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java 
b/core/src/main/java/org/apache/gravitino/Configs.java
index 5a6544b1d4..8c283630eb 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -412,5 +412,16 @@ public class Configs {
           .doc("Directory for managing staging files when running jobs.")
           .version(ConfigConstants.VERSION_1_0_0)
           .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
           .createWithDefault("/tmp/gravitino/jobs/staging");
+
+  public static final ConfigEntry<String> JOB_EXECUTOR =
+      new ConfigBuilder("gravitino.job.executor")
+          .doc(
+              "The executor to run jobs, by default it is 'local', user can 
implement their own "
+                  + "executor and set it here.")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .createWithDefault("local");
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/job/JobExecutor.java 
b/core/src/main/java/org/apache/gravitino/connector/job/JobExecutor.java
new file mode 100644
index 0000000000..40eacac9de
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/connector/job/JobExecutor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.connector.job;
+
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+
+/**
+ * The JobExecutor interface defines the API for executing jobs in a specific 
Job runner, for
+ * example, Airflow, local runner, etc. The developer can implement this 
interface to adapt to the
+ * specific job runner. The Gravitino core will use this interface to submit 
jobs, get job status,
+ * etc.
+ */
+@DeveloperApi
+public interface JobExecutor extends Closeable {
+
+  /**
+   * Initialize the job executor with the given configurations.
+   *
+   * @param configs A map of configuration key-value pairs.
+   */
+  void initialize(Map<String, String> configs);
+
+  /**
+   * Submit a job with the given name and job template to the external job 
runner.
+   *
+   * <p>The returned job identifier is unique in the external job runner, and 
can be used to track
+   * the job status or cancel it later.
+   *
+   * <p>The placeholders in the job template has already been replaced with 
the actual values before
+   * calling this method. So the implementors can directly use this job 
template to submit the job
+   * to the external job runner.
+   *
+   * @param jobTemplate The job template containing the job configuration and 
parameters.
+   * @return A unique identifier for the submitted job.
+   */
+  String submitJob(JobTemplate jobTemplate);
+
+  /**
+   * Get the status of a job by its unique identifier. The status should be 
one of the values in
+   * {@link JobHandle.Status}. The implementors should query the external job 
runner to get the job
+   * status, and map the status to the values in {@link JobHandle.Status}.
+   *
+   * @param jobId The unique identifier of the job.
+   * @return The status of the job.
+   * @throws NoSuchJobException If the job with the given identifier does not 
exist.
+   */
+  JobHandle.Status getJobStatus(String jobId) throws NoSuchJobException;
+
+  /**
+   * Cancel a job by its unique identifier. The job runner should stop the job 
if it is currently
+   * running. If the job is already completed, it should return directly 
without any error. If the
+   * job does not exist, it should throw a {@link NoSuchJobException}.
+   *
+   * <p>This method should not be blocked on the job cancellation, it should 
return immediately, and
+   * Gravitino can further query the job status to check if the job is 
cancelled successfully.
+   *
+   * @param jobId The unique identifier of the job to cancel.
+   * @throws NoSuchJobException If the job with the given identifier does not 
exist.
+   */
+  void cancelJob(String jobId) throws NoSuchJobException;
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java 
b/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java
new file mode 100644
index 0000000000..cdf06c59af
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.connector.job.JobExecutor;
+
+public class JobExecutorFactory {
+
+  private static final String JOB_EXECUTOR_CONF_PREFIX = "gravitino.executor.";
+
+  private static final String JOB_EXECUTOR_CLASS_SUFFIX = ".class";
+
+  private JobExecutorFactory() {
+    // Private constructor to prevent instantiation
+  }
+
+  public static JobExecutor create(Config config) {
+    String jobExecutorName = config.get(Configs.JOB_EXECUTOR);
+    String jobExecutorClassKey =
+        JOB_EXECUTOR_CONF_PREFIX + jobExecutorName + JOB_EXECUTOR_CLASS_SUFFIX;
+    String clzName = config.getRawString(jobExecutorClassKey);
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(clzName),
+        "Job executor class name must be specified for job executor: %s",
+        jobExecutorName);
+
+    Map<String, String> configs =
+        config.getConfigsWithPrefix(JOB_EXECUTOR_CONF_PREFIX + jobExecutorName 
+ ".");
+    try {
+      JobExecutor jobExecutor =
+          (JobExecutor) 
Class.forName(clzName).getDeclaredConstructor().newInstance();
+      jobExecutor.initialize(configs);
+      return jobExecutor;
+
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create job executor: " + 
jobExecutorName, e);
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java 
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 09c36cf22f..e0307713b9 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -21,11 +21,21 @@ package org.apache.gravitino.job;
 
 import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.time.Instant;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
@@ -34,6 +44,7 @@ import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.connector.job.JobExecutor;
 import org.apache.gravitino.exceptions.InUseException;
 import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -41,19 +52,38 @@ import org.apache.gravitino.exceptions.NoSuchJobException;
 import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
 import org.apache.gravitino.lock.LockType;
 import org.apache.gravitino.lock.TreeLockUtils;
+import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.JobEntity;
 import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
 
 public class JobManager implements JobOperationDispatcher, Closeable {
 
+  private static final Pattern PLACEHOLDER_PATTERN = 
Pattern.compile("\\{\\{([\\w.-]+)\\}\\}");
+
+  private static final int TIMEOUT_IN_MS = 30 * 1000; // 30 seconds
+
   private final EntityStore entityStore;
 
   private final File stagingDir;
 
-  public JobManager(Config config, EntityStore entityStore) {
+  private final JobExecutor jobExecutor;
+
+  private final IdGenerator idGenerator;
+
+  public JobManager(Config config, EntityStore entityStore, IdGenerator 
idGenerator) {
+    this(config, entityStore, idGenerator, JobExecutorFactory.create(config));
+  }
+
+  @VisibleForTesting
+  JobManager(
+      Config config, EntityStore entityStore, IdGenerator idGenerator, 
JobExecutor jobExecutor) {
     this.entityStore = entityStore;
+    this.jobExecutor = jobExecutor;
+    this.idGenerator = idGenerator;
 
     String stagingDirPath = config.get(Configs.JOB_STAGING_DIR);
     this.stagingDir = new File(stagingDirPath);
@@ -144,8 +174,19 @@ public class JobManager implements JobOperationDispatcher, 
Closeable {
   public boolean deleteJobTemplate(String metalake, String jobTemplateName) 
throws InUseException {
     checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
 
-    // TODO. Check if there are any running jobs associated with the job 
template. If there are
-    //  running jobs, throw InUseException
+    List<JobEntity> jobs = listJobs(metalake, Optional.of(jobTemplateName));
+    boolean hasActiveJobs =
+        jobs.stream()
+            .anyMatch(
+                job ->
+                    job.status() != JobHandle.Status.CANCELLED
+                        && job.status() != JobHandle.Status.SUCCEEDED
+                        && job.status() != JobHandle.Status.FAILED);
+    if (hasActiveJobs) {
+      throw new InUseException(
+          "Job template %s under metalake %s has active jobs associated with 
it",
+          jobTemplateName, metalake);
+    }
 
     // Delete the job template entity as well as all the jobs associated with 
it.
     return TreeLockUtils.doWithTreeLock(
@@ -226,8 +267,250 @@ public class JobManager implements 
JobOperationDispatcher, Closeable {
         });
   }
 
+  @Override
+  public JobEntity runJob(String metalake, String jobTemplateName, Map<String, 
String> jobConf)
+      throws NoSuchJobTemplateException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    // Check if the job template exists, will throw NoSuchJobTemplateException 
if it does not exist.
+    JobTemplateEntity jobTemplateEntity = getJobTemplate(metalake, 
jobTemplateName);
+
+    // Create staging directory.
+    // TODO(jerry). The job staging directory will be deleted using a 
background thread.
+    long jobId = idGenerator.nextId();
+    String jobStagingPath =
+        stagingDir.getAbsolutePath()
+            + File.separator
+            + metalake
+            + File.separator
+            + JobHandle.JOB_ID_PREFIX
+            + jobId;
+    File jobStagingDir = new File(jobStagingPath);
+    if (!jobStagingDir.mkdirs()) {
+      throw new RuntimeException(
+          String.format("Failed to create staging directory %s for job %s", 
jobStagingDir, jobId));
+    }
+
+    // Create a JobTemplate by replacing the template parameters with the 
jobConf values, and
+    // also downloading any necessary files from the URIs specified in the job 
template.
+    JobTemplate jobTemplate = createRuntimeJobTemplate(jobTemplateEntity, 
jobConf, jobStagingDir);
+
+    // Submit the job template to the job executor
+    String jobExecutionId;
+    try {
+      jobExecutionId = jobExecutor.submitJob(jobTemplate);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Failed to submit job template %s for execution", 
jobTemplate), e);
+    }
+
+    // Create a new JobEntity to represent the job
+    JobEntity jobEntity =
+        JobEntity.builder()
+            .withId(jobId)
+            .withJobExecutionId(jobExecutionId)
+            .withJobTemplateName(jobTemplateName)
+            .withStatus(JobHandle.Status.QUEUED)
+            .withNamespace(NamespaceUtil.ofJob(metalake))
+            .withAuditInfo(
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+
+    try {
+      entityStore.put(jobEntity, false /* overwrite */);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to register the job entity " + 
jobEntity, e);
+    }
+
+    return jobEntity;
+  }
+
+  @Override
+  public JobEntity cancelJob(String metalake, String jobId) throws 
NoSuchJobException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    // Retrieve the job entity, will throw NoSuchJobException if the job does 
not exist.
+    JobEntity jobEntity = getJob(metalake, jobId);
+
+    if (jobEntity.status() == JobHandle.Status.CANCELLED
+        || jobEntity.status() == JobHandle.Status.SUCCEEDED
+        || jobEntity.status() == JobHandle.Status.FAILED) {
+      // If the job is already cancelled, succeeded, or failed, we do not need 
to cancel it again.
+      return jobEntity;
+    }
+
+    // Cancel the job using the job executor
+    try {
+      jobExecutor.cancelJob(jobEntity.jobExecutionId());
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Failed to cancel job with ID %s under metalake %s", 
jobId, metalake), e);
+    }
+
+    // Update the job status to CANCELING
+    JobEntity newJobEntity =
+        JobEntity.builder()
+            .withId(jobEntity.id())
+            .withJobExecutionId(jobEntity.jobExecutionId())
+            .withJobTemplateName(jobEntity.jobTemplateName())
+            .withStatus(JobHandle.Status.CANCELLING)
+            .withNamespace(jobEntity.namespace())
+            .withAuditInfo(
+                AuditInfo.builder()
+                    .withCreator(jobEntity.auditInfo().creator())
+                    .withCreateTime(jobEntity.auditInfo().createTime())
+                    
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withLastModifiedTime(Instant.now())
+                    .build())
+            .build();
+
+    try {
+      // Update the job entity in the entity store
+      entityStore.put(newJobEntity, true /* overwrite */);
+      return newJobEntity;
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Failed to update job entity %s to CANCELING status", 
newJobEntity), e);
+    }
+  }
+
   @Override
   public void close() throws IOException {
-    // TODO. Implement any necessary cleanup logic for the JobManager.
+    jobExecutor.close();
+    // TODO(jerry). Implement any necessary cleanup logic for the JobManager.
+  }
+
+  @VisibleForTesting
+  static JobTemplate createRuntimeJobTemplate(
+      JobTemplateEntity jobTemplateEntity, Map<String, String> jobConf, File 
stagingDir) {
+    String name = jobTemplateEntity.name();
+    String comment = jobTemplateEntity.comment();
+
+    JobTemplateEntity.TemplateContent content = 
jobTemplateEntity.templateContent();
+    String executable = fetchFileFromUri(content.executable(), stagingDir, 
TIMEOUT_IN_MS);
+
+    List<String> args =
+        content.arguments().stream()
+            .map(arg -> replacePlaceholder(arg, jobConf))
+            .collect(Collectors.toList());
+    Map<String, String> environments =
+        content.environments().entrySet().stream()
+            .collect(
+                Collectors.toMap(
+                    entry -> replacePlaceholder(entry.getKey(), jobConf),
+                    entry -> replacePlaceholder(entry.getValue(), jobConf)));
+    Map<String, String> customFields =
+        content.customFields().entrySet().stream()
+            .collect(
+                Collectors.toMap(
+                    entry -> replacePlaceholder(entry.getKey(), jobConf),
+                    entry -> replacePlaceholder(entry.getValue(), jobConf)));
+
+    // For shell job template
+    if (content.jobType() == JobTemplate.JobType.SHELL) {
+      List<String> scripts = fetchFilesFromUri(content.scripts(), stagingDir, 
TIMEOUT_IN_MS);
+
+      return ShellJobTemplate.builder()
+          .withName(name)
+          .withComment(comment)
+          .withExecutable(executable)
+          .withArguments(args)
+          .withEnvironments(environments)
+          .withCustomFields(customFields)
+          .withScripts(scripts)
+          .build();
+    }
+
+    // For Spark job template
+    if (content.jobType() == JobTemplate.JobType.SPARK) {
+      String className = content.className();
+      List<String> jars = fetchFilesFromUri(content.jars(), stagingDir, 
TIMEOUT_IN_MS);
+      List<String> files = fetchFilesFromUri(content.files(), stagingDir, 
TIMEOUT_IN_MS);
+      List<String> archives = fetchFilesFromUri(content.archives(), 
stagingDir, TIMEOUT_IN_MS);
+      Map<String, String> configs =
+          content.configs().entrySet().stream()
+              .collect(
+                  Collectors.toMap(
+                      entry -> replacePlaceholder(entry.getKey(), jobConf),
+                      entry -> replacePlaceholder(entry.getValue(), jobConf)));
+
+      return SparkJobTemplate.builder()
+          .withName(name)
+          .withComment(comment)
+          .withExecutable(executable)
+          .withArguments(args)
+          .withEnvironments(environments)
+          .withCustomFields(customFields)
+          .withClassName(className)
+          .withJars(jars)
+          .withFiles(files)
+          .withArchives(archives)
+          .withConfigs(configs)
+          .build();
+    }
+
+    throw new IllegalArgumentException("Unsupported job type: " + 
content.jobType());
+  }
+
+  @VisibleForTesting
+  static String replacePlaceholder(String inputString, Map<String, String> 
replacements) {
+    if (StringUtils.isBlank(inputString)) {
+      return inputString; // Return as is if the input string is blank
+    }
+
+    StringBuffer result = new StringBuffer();
+
+    Matcher matcher = PLACEHOLDER_PATTERN.matcher(inputString);
+    while (matcher.find()) {
+      String key = matcher.group(1);
+      String replacement = replacements.get(key);
+      if (replacement != null) {
+        matcher.appendReplacement(result, replacement);
+      } else {
+        // If no replacement is found, keep the placeholder as is
+        matcher.appendReplacement(result, matcher.group(0));
+      }
+    }
+    matcher.appendTail(result);
+
+    return result.toString();
+  }
+
+  @VisibleForTesting
+  static List<String> fetchFilesFromUri(List<String> uris, File stagingDir, 
int timeoutInMs) {
+    return uris.stream()
+        .map(uri -> fetchFileFromUri(uri, stagingDir, timeoutInMs))
+        .collect(Collectors.toList());
+  }
+
+  @VisibleForTesting
+  static String fetchFileFromUri(String uri, File stagingDir, int timeoutInMs) 
{
+    try {
+      URI fileUri = new URI(uri);
+      String scheme = Optional.ofNullable(fileUri.getScheme()).orElse("file");
+      File destFile = new File(stagingDir, new 
File(fileUri.getPath()).getName());
+
+      switch (scheme) {
+        case "http":
+        case "https":
+        case "ftp":
+          FileUtils.copyURLToFile(fileUri.toURL(), destFile, timeoutInMs, 
timeoutInMs);
+          break;
+
+        case "file":
+          Files.createSymbolicLink(destFile.toPath(), new 
File(fileUri.getPath()).toPath());
+          break;
+
+        default:
+          throw new IllegalArgumentException("Unsupported scheme: " + scheme);
+      }
+
+      return destFile.getAbsolutePath();
+    } catch (Exception e) {
+      throw new RuntimeException(String.format("Failed to fetch file from URI 
%s", uri), e);
+    }
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
index 42f0d75e05..0c83862f4c 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
@@ -20,6 +20,7 @@
 package org.apache.gravitino.job;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import org.apache.gravitino.exceptions.InUseException;
 import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
@@ -93,9 +94,25 @@ public interface JobOperationDispatcher {
    */
   JobEntity getJob(String metalake, String jobId) throws NoSuchJobException;
 
-  // TODO. Implement the runJob and cancelJob methods in another PR.
-  //  JobEntity runJob(String metalake, String jobTemplateName, Map<String, 
String> jobConf)
-  //      throws NoSuchJobTemplateException;
-  //
-  //  JobEntity cancelJob(String metalake, String jobId) throws 
NoSuchJobException;
+  /**
+   * Runs a job based on the specified job template and configuration in the 
specified metalake.
+   *
+   * @param metalake the name of the metalake
+   * @param jobTemplateName the name of the job template to use for running 
the job
+   * @param jobConf the runtime configuration for the job, which contains 
key-value pairs
+   * @return the job entity representing the job
+   * @throws NoSuchJobTemplateException if no job template with the specified 
name exists
+   */
+  JobEntity runJob(String metalake, String jobTemplateName, Map<String, 
String> jobConf)
+      throws NoSuchJobTemplateException;
+
+  /**
+   * Cancels a job by its ID in the specified metalake.
+   *
+   * @param metalake the name of the metalake
+   * @param jobId the ID of the job to cancel
+   * @return the job entity representing the job after cancellation
+   * @throws NoSuchJobException if no job with the specified ID exists
+   */
+  JobEntity cancelJob(String metalake, String jobId) throws NoSuchJobException;
 }
diff --git a/core/src/main/java/org/apache/gravitino/meta/JobEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
index 4083154685..4d2868bca2 100644
--- a/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
@@ -36,6 +36,11 @@ public class JobEntity implements Entity, Auditable, 
HasIdentifier {
 
   public static final Field ID =
       Field.required("id", Long.class, "The unique id of the job entity.");
+  public static final Field JOB_EXECUTION_ID =
+      Field.required(
+          "job_execution_id",
+          String.class,
+          "The unique execution id of the job, used for tracking.");
   public static final Field STATUS =
       Field.required("status", JobHandle.Status.class, "The status of the 
job.");
   public static final Field TEMPLATE_NAME =
@@ -45,6 +50,7 @@ public class JobEntity implements Entity, Auditable, 
HasIdentifier {
           "audit_info", AuditInfo.class, "The audit details of the job 
template entity.");
 
   private Long id;
+  private String jobExecutionId;
   private JobHandle.Status status;
   private String jobTemplateName;
   private Namespace namespace;
@@ -56,6 +62,7 @@ public class JobEntity implements Entity, Auditable, 
HasIdentifier {
   public Map<Field, Object> fields() {
     Map<Field, Object> fields = Maps.newHashMap();
     fields.put(ID, id);
+    fields.put(JOB_EXECUTION_ID, jobExecutionId);
     fields.put(TEMPLATE_NAME, jobTemplateName);
     fields.put(STATUS, status);
     fields.put(AUDIT_INFO, auditInfo);
@@ -67,6 +74,10 @@ public class JobEntity implements Entity, Auditable, 
HasIdentifier {
     return id;
   }
 
+  public String jobExecutionId() {
+    return jobExecutionId;
+  }
+
   @Override
   public String name() {
     return JobHandle.JOB_ID_PREFIX + id;
@@ -106,6 +117,7 @@ public class JobEntity implements Entity, Auditable, 
HasIdentifier {
 
     JobEntity that = (JobEntity) o;
     return Objects.equals(id, that.id)
+        && Objects.equals(jobExecutionId, that.jobExecutionId)
         && Objects.equals(status, that.status)
         && Objects.equals(jobTemplateName, that.jobTemplateName)
         && Objects.equals(namespace, that.namespace)
@@ -114,7 +126,7 @@ public class JobEntity implements Entity, Auditable, 
HasIdentifier {
 
   @Override
   public int hashCode() {
-    return Objects.hash(id, namespace, status, jobTemplateName, auditInfo);
+    return Objects.hash(id, jobExecutionId, namespace, status, 
jobTemplateName, auditInfo);
   }
 
   public static Builder builder() {
@@ -133,6 +145,11 @@ public class JobEntity implements Entity, Auditable, 
HasIdentifier {
       return this;
     }
 
+    public Builder withJobExecutionId(String jobExecutionId) {
+      jobEntity.jobExecutionId = jobExecutionId;
+      return this;
+    }
+
     public Builder withNamespace(Namespace namespace) {
       jobEntity.namespace = namespace;
       return this;
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java 
b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
index 079f84aec3..25665d6127 100644
--- a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.job;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -28,6 +30,7 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.time.Instant;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Random;
@@ -41,6 +44,8 @@ import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.connector.job.JobExecutor;
+import org.apache.gravitino.exceptions.InUseException;
 import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
 import org.apache.gravitino.exceptions.MetalakeInUseException;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -52,6 +57,8 @@ import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.JobEntity;
 import org.apache.gravitino.meta.JobTemplateEntity;
 import org.apache.gravitino.metalake.MetalakeManager;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
 import org.junit.jupiter.api.AfterAll;
@@ -78,6 +85,8 @@ public class TestJobManager {
 
   private static MockedStatic<MetalakeManager> mockedMetalake;
 
+  private static JobExecutor jobExecutor;
+
   @BeforeAll
   public static void setUp() throws IllegalAccessException {
     config = new Config(false) {};
@@ -86,10 +95,13 @@ public class TestJobManager {
     config.set(Configs.JOB_STAGING_DIR, testStagingDir);
 
     FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
-    entityStore = Mockito.mock(EntityStore.class);
 
-    JobManager jm = new JobManager(config, entityStore);
+    entityStore = Mockito.mock(EntityStore.class);
+    jobExecutor = Mockito.mock(JobExecutor.class);
+    IdGenerator idGenerator = new RandomIdGenerator();
+    JobManager jm = new JobManager(config, entityStore, idGenerator, 
jobExecutor);
     jobManager = Mockito.spy(jm);
+
     mockedMetalake = mockStatic(MetalakeManager.class);
   }
 
@@ -106,6 +118,7 @@ public class TestJobManager {
     // Reset the mocked static methods after each test
     mockedMetalake.reset();
     Mockito.reset(entityStore);
+    Mockito.reset(jobManager);
   }
 
   @Test
@@ -275,6 +288,10 @@ public class TestJobManager {
             NameIdentifierUtil.ofJobTemplate(metalake, 
shellJobTemplate.name()),
             Entity.EntityType.JOB_TEMPLATE);
 
+    doReturn(Collections.emptyList())
+        .when(jobManager)
+        .listJobs(metalake, Optional.of(shellJobTemplate.name()));
+
     // Delete an existing job template
     Assertions.assertTrue(() -> jobManager.deleteJobTemplate(metalake, 
"shell_job"));
 
@@ -293,6 +310,30 @@ public class TestJobManager {
         .delete(NameIdentifierUtil.ofJobTemplate(metalake, "job"), 
Entity.EntityType.JOB_TEMPLATE);
     Assertions.assertThrows(
         RuntimeException.class, () -> jobManager.deleteJobTemplate(metalake, 
"job"));
+
+    // Delete job template that is in use
+    Lists.newArrayList(
+            JobHandle.Status.QUEUED, JobHandle.Status.STARTED, 
JobHandle.Status.CANCELLING)
+        .forEach(
+            status -> {
+              doReturn(Lists.newArrayList(newJobEntity("shell_job", status)))
+                  .when(jobManager)
+                  .listJobs(metalake, Optional.of(shellJobTemplate.name()));
+              Assertions.assertThrows(
+                  InUseException.class, () -> 
jobManager.deleteJobTemplate(metalake, "shell_job"));
+            });
+
+    // Delete job template that is not in use
+    Lists.newArrayList(
+            JobHandle.Status.CANCELLED, JobHandle.Status.FAILED, 
JobHandle.Status.SUCCEEDED)
+        .forEach(
+            status -> {
+              doReturn(Lists.newArrayList(newJobEntity("shell_job", status)))
+                  .when(jobManager)
+                  .listJobs(metalake, Optional.of(shellJobTemplate.name()));
+              Assertions.assertDoesNotThrow(
+                  () -> jobManager.deleteJobTemplate(metalake, "shell_job"));
+            });
   }
 
   @Test
@@ -305,8 +346,8 @@ public class TestJobManager {
         newShellJobTemplateEntity("shell_job", "A shell job template");
     when(jobManager.getJobTemplate(metalake, 
shellJobTemplate.name())).thenReturn(shellJobTemplate);
 
-    JobEntity job1 = newJobEntity("shell_job");
-    JobEntity job2 = newJobEntity("spark_job");
+    JobEntity job1 = newJobEntity("shell_job", JobHandle.Status.QUEUED);
+    JobEntity job2 = newJobEntity("spark_job", JobHandle.Status.QUEUED);
 
     SupportsRelationOperations supportsRelationOperations =
         Mockito.mock(SupportsRelationOperations.class);
@@ -356,7 +397,7 @@ public class TestJobManager {
         .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
         .thenAnswer(a -> null);
 
-    JobEntity job = newJobEntity("shell_job");
+    JobEntity job = newJobEntity("shell_job", JobHandle.Status.QUEUED);
     when(entityStore.get(
             NameIdentifierUtil.ofJob(metalake, job.name()), 
Entity.EntityType.JOB, JobEntity.class))
         .thenReturn(job);
@@ -385,9 +426,112 @@ public class TestJobManager {
     Assertions.assertThrows(RuntimeException.class, () -> 
jobManager.getJob(metalake, "job"));
   }
 
+  @Test
+  public void testRunJob() throws IOException {
+    mockedMetalake
+        .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+        .thenAnswer(a -> null);
+
+    JobTemplateEntity shellJobTemplate =
+        newShellJobTemplateEntity("shell_job", "A shell job template");
+    when(jobManager.getJobTemplate(metalake, 
shellJobTemplate.name())).thenReturn(shellJobTemplate);
+
+    String jobExecutionId = "job_execution_id_for_test";
+    when(jobExecutor.submitJob(any())).thenReturn(jobExecutionId);
+
+    doNothing().when(entityStore).put(any(JobEntity.class), anyBoolean());
+
+    JobEntity jobEntity = jobManager.runJob(metalake, "shell_job", 
Collections.emptyMap());
+
+    Assertions.assertEquals(jobExecutionId, jobEntity.jobExecutionId());
+    Assertions.assertEquals("shell_job", jobEntity.jobTemplateName());
+    Assertions.assertEquals(JobHandle.Status.QUEUED, jobEntity.status());
+
+    // Test when job template does not exist
+    when(jobManager.getJobTemplate(metalake, "non_existent"))
+        .thenThrow(new NoSuchJobTemplateException("Job template does not 
exist"));
+
+    Exception e =
+        Assertions.assertThrows(
+            NoSuchJobTemplateException.class,
+            () -> jobManager.runJob(metalake, "non_existent", 
Collections.emptyMap()));
+    Assertions.assertEquals("Job template does not exist", e.getMessage());
+
+    // Test when job executor fails
+    doThrow(new RuntimeException("Job executor 
error")).when(jobExecutor).submitJob(any());
+
+    Assertions.assertThrows(
+        RuntimeException.class,
+        () -> jobManager.runJob(metalake, "shell_job", 
Collections.emptyMap()));
+
+    // Test when entity store fails
+    doThrow(new IOException("Entity store error"))
+        .when(entityStore)
+        .put(any(JobEntity.class), anyBoolean());
+
+    Assertions.assertThrows(
+        RuntimeException.class,
+        () -> jobManager.runJob(metalake, "shell_job", 
Collections.emptyMap()));
+  }
+
+  @Test
+  public void testCancelJob() throws IOException {
+    mockedMetalake
+        .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+        .thenAnswer(a -> null);
+
+    JobEntity job = newJobEntity("shell_job", JobHandle.Status.QUEUED);
+    when(jobManager.getJob(metalake, job.name())).thenReturn(job);
+    doNothing().when(jobExecutor).cancelJob(job.jobExecutionId());
+    doNothing().when(entityStore).put(any(JobEntity.class), anyBoolean());
+
+    // Cancel an existing job
+    JobEntity cancelledJob = jobManager.cancelJob(metalake, job.name());
+    Assertions.assertEquals(job.jobExecutionId(), 
cancelledJob.jobExecutionId());
+    Assertions.assertEquals(JobHandle.Status.CANCELLING, 
cancelledJob.status());
+
+    // Test cancel a nonexistent job
+    when(jobManager.getJob(metalake, "non_existent"))
+        .thenThrow(new NoSuchJobException("Job does not exist"));
+
+    Exception e =
+        Assertions.assertThrows(
+            NoSuchJobException.class, () -> jobManager.cancelJob(metalake, 
"non_existent"));
+    Assertions.assertEquals("Job does not exist", e.getMessage());
+
+    // Test cancelling a finished job
+    Lists.newArrayList(
+            JobHandle.Status.CANCELLED, JobHandle.Status.FAILED, 
JobHandle.Status.SUCCEEDED)
+        .forEach(
+            status -> {
+              JobEntity finishedJob = newJobEntity("shell_job", status);
+              when(jobManager.getJob(metalake, 
finishedJob.name())).thenReturn(finishedJob);
+
+              JobEntity cancelledFinishedJob = jobManager.cancelJob(metalake, 
finishedJob.name());
+              Assertions.assertEquals(
+                  finishedJob.jobExecutionId(), 
cancelledFinishedJob.jobExecutionId());
+              Assertions.assertEquals(status, cancelledFinishedJob.status());
+            });
+
+    // Test job executor failed to cancel the job
+    doThrow(new RuntimeException("Job executor error"))
+        .when(jobExecutor)
+        .cancelJob(job.jobExecutionId());
+    Assertions.assertThrows(
+        RuntimeException.class, () -> jobManager.cancelJob(metalake, 
job.name()));
+
+    // Test when entity store failed to update the job status
+    doThrow(new IOException("Entity store error"))
+        .when(entityStore)
+        .put(any(JobEntity.class), anyBoolean());
+
+    Assertions.assertThrows(
+        RuntimeException.class, () -> jobManager.cancelJob(metalake, 
job.name()));
+  }
+
   private static JobTemplateEntity newShellJobTemplateEntity(String name, 
String comment) {
     ShellJobTemplate shellJobTemplate =
-        new ShellJobTemplate.Builder()
+        ShellJobTemplate.builder()
             .withName(name)
             .withComment(comment)
             .withExecutable("/bin/echo")
@@ -406,11 +550,11 @@ public class TestJobManager {
 
   private static JobTemplateEntity newSparkJobTemplateEntity(String name, 
String comment) {
     SparkJobTemplate sparkJobTemplate =
-        new SparkJobTemplate.Builder()
+        SparkJobTemplate.builder()
             .withName(name)
             .withComment(comment)
             .withClassName("org.apache.spark.examples.SparkPi")
-            .withExecutable("local:///path/to/spark-examples.jar")
+            .withExecutable("file:/path/to/spark-examples.jar")
             .build();
 
     Random rand = new Random();
@@ -424,13 +568,14 @@ public class TestJobManager {
         .build();
   }
 
-  private static JobEntity newJobEntity(String templateName) {
+  private static JobEntity newJobEntity(String templateName, JobHandle.Status 
status) {
     Random rand = new Random();
     return JobEntity.builder()
         .withId(rand.nextLong())
+        .withJobExecutionId(rand.nextLong() + "")
         .withNamespace(NamespaceUtil.ofJob(metalake))
         .withJobTemplateName(templateName)
-        .withStatus(JobHandle.Status.QUEUED)
+        .withStatus(status)
         .withAuditInfo(
             
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
         .build();
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java 
b/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java
new file mode 100644
index 0000000000..7c53f4ec4c
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestJobTemplate {
+
+  private static File tempDir;
+  private File tempStagingDir;
+
+  @BeforeAll
+  public static void setUpClass() throws IOException {
+    // Create a temporary directory for testing
+    tempDir = Files.createTempDirectory("job-template-test").toFile();
+  }
+
+  @AfterAll
+  public static void tearDownClass() throws IOException {
+    // Clean up the temporary directory after all tests
+    if (tempDir != null && tempDir.exists()) {
+      FileUtils.deleteDirectory(tempDir);
+      tempDir = null;
+    }
+  }
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    // Create a temporary staging directory for each test
+    tempStagingDir = Files.createTempDirectory(tempDir.toPath(), 
"staging").toFile();
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    // Clean up the temporary staging directory after each test
+    if (tempStagingDir != null && tempStagingDir.exists()) {
+      FileUtils.deleteDirectory(tempStagingDir);
+      tempStagingDir = null;
+    }
+  }
+
+  @Test
+  public void testReplacePlaceholders() {
+    String template = "Hello, {{name}}! Welcome to {{place}}.";
+    Map<String, String> replacements = ImmutableMap.of("name", "Alice", 
"place", "Wonderland");
+
+    String result = JobManager.replacePlaceholder(template, replacements);
+    Assertions.assertEquals("Hello, Alice! Welcome to Wonderland.", result);
+
+    // Test with missing replacements
+    replacements = ImmutableMap.of("name", "Bob");
+    result = JobManager.replacePlaceholder(template, replacements);
+    Assertions.assertEquals("Hello, Bob! Welcome to {{place}}.", result);
+
+    // Test with no replacements
+    result = JobManager.replacePlaceholder(template, ImmutableMap.of());
+    Assertions.assertEquals("Hello, {{name}}! Welcome to {{place}}.", result);
+
+    // Test with no placeholders
+    String noPlaceholders = "Hello, World!";
+    result = JobManager.replacePlaceholder(noPlaceholders, replacements);
+    Assertions.assertEquals("Hello, World!", result);
+
+    // Test with repeated placeholders
+    String repeatedTemplate = "Hello, {{name}}! Your name is {{name}}.";
+    replacements = ImmutableMap.of("name", "Charlie");
+    result = JobManager.replacePlaceholder(repeatedTemplate, replacements);
+    Assertions.assertEquals("Hello, Charlie! Your name is Charlie.", result);
+
+    // Test with incomplete placeholders
+    String incompleteTemplate = "Hello, {{name}! Welcome to {{place}}.";
+    replacements = ImmutableMap.of("name", "Dave", "place", "Earth");
+    result = JobManager.replacePlaceholder(incompleteTemplate, replacements);
+    Assertions.assertEquals("Hello, {{name}! Welcome to Earth.", result);
+
+    // Test with empty template
+    String emptyTemplate = "";
+    result = JobManager.replacePlaceholder(emptyTemplate, replacements);
+    Assertions.assertEquals("", result);
+
+    // Test with nested braces
+    String nestedTemplate = "Hello, {{name}}! Your code is {{{value}}}.";
+    replacements = ImmutableMap.of("name", "Eve", "value", "42");
+    result = JobManager.replacePlaceholder(nestedTemplate, replacements);
+    Assertions.assertEquals("Hello, Eve! Your code is {42}.", result);
+
+    nestedTemplate = "Hello, {{name}}! Your code is {{{{value}}}}.";
+    result = JobManager.replacePlaceholder(nestedTemplate, replacements);
+    Assertions.assertEquals("Hello, Eve! Your code is {{42}}.", result);
+
+    // Test with special characters in placeholders.
+    String specialTemplate = "Hello, {{name}}! Your score is {{score%}}.";
+    replacements = ImmutableMap.of("name", "Frank", "score%", "100%");
+    result = JobManager.replacePlaceholder(specialTemplate, replacements);
+    Assertions.assertEquals("Hello, Frank! Your score is {{score%}}.", result);
+
+    // Test with alphanumeric placeholders
+    String alphanumericTemplate = "Hello, {{user_name}}! Your score is 
{{score123}}.";
+    replacements = ImmutableMap.of("user_name", "Grace", "score123", "200");
+    result = JobManager.replacePlaceholder(alphanumericTemplate, replacements);
+    Assertions.assertEquals("Hello, Grace! Your score is 200.", result);
+
+    // Test with "." and "-"
+    String dotDashTemplate = "Hello, {{user.name}}! Your score is 
{{score-123}}.";
+    replacements = ImmutableMap.of("user.name", "Hank", "score-123", "300");
+    result = JobManager.replacePlaceholder(dotDashTemplate, replacements);
+    Assertions.assertEquals("Hello, Hank! Your score is 300.", result);
+  }
+
+  @Test
+  public void testFetchFilesFromUir() throws IOException {
+    File testFile1 = Files.createTempFile(tempDir.toPath(), "testFile1", 
".txt").toFile();
+    String result =
+        JobManager.fetchFileFromUri(testFile1.toURI().toString(), 
tempStagingDir, 30 * 1000);
+    File resultFile = new File(result);
+    Assertions.assertEquals(testFile1.getName(), resultFile.getName());
+
+    File testFile2 = Files.createTempFile(tempDir.toPath(), "testFile2", 
".txt").toFile();
+    File testFile3 = Files.createTempFile(tempDir.toPath(), "testFile3", 
".txt").toFile();
+
+    List<String> expectedUris =
+        Lists.newArrayList(testFile2.toURI().toString(), 
testFile3.toURI().toString());
+    List<String> resultUris = JobManager.fetchFilesFromUri(expectedUris, 
tempStagingDir, 30 * 1000);
+
+    Assertions.assertEquals(2, resultUris.size());
+    List<String> resultFileNames =
+        resultUris.stream().map(uri -> new 
File(uri).getName()).collect(Collectors.toList());
+    Assertions.assertTrue(resultFileNames.contains(testFile2.getName()));
+    Assertions.assertTrue(resultFileNames.contains(testFile3.getName()));
+  }
+
+  @Test
+  public void testCreateShellRuntimeJobTemplate() throws IOException {
+    File testScript1 = Files.createTempFile(tempDir.toPath(), "testScript1", 
".sh").toFile();
+    File testScript2 = Files.createTempFile(tempDir.toPath(), "testScript2", 
".sh").toFile();
+
+    ShellJobTemplate shellJobTemplate =
+        ShellJobTemplate.builder()
+            .withName("testShellJob")
+            .withComment("This is a test shell job template")
+            .withExecutable("/bin/echo")
+            .withArguments(Lists.newArrayList("arg1", "arg2", "{{arg3}}, 
{{arg4}}"))
+            .withEnvironments(ImmutableMap.of("ENV_VAR1", "{{val1}}", 
"ENV_VAR2", "{{val2}}"))
+            .withCustomFields(ImmutableMap.of("customField1", 
"{{customVal1}}"))
+            .withScripts(
+                Lists.newArrayList(testScript1.toURI().toString(), 
testScript2.toURI().toString()))
+            .build();
+
+    JobTemplateEntity entity =
+        JobTemplateEntity.builder()
+            .withId(1L)
+            .withName(shellJobTemplate.name())
+            .withComment(shellJobTemplate.comment())
+            .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+            .withTemplateContent(
+                
JobTemplateEntity.TemplateContent.fromJobTemplate(shellJobTemplate))
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    JobTemplate result =
+        JobManager.createRuntimeJobTemplate(
+            entity,
+            ImmutableMap.of(
+                "arg3", "value3",
+                "arg4", "value4",
+                "val1", "value1",
+                "val2", "value2",
+                "customVal1", "customValue1"),
+            tempStagingDir);
+
+    Assertions.assertEquals(shellJobTemplate.name(), result.name());
+    Assertions.assertEquals(shellJobTemplate.comment(), result.comment());
+    Assertions.assertEquals("echo", new File(result.executable).getName());
+    Assertions.assertEquals(
+        Lists.newArrayList("arg1", "arg2", "value3, value4"), 
result.arguments());
+    Assertions.assertEquals(
+        ImmutableMap.of("ENV_VAR1", "value1", "ENV_VAR2", "value2"), 
result.environments());
+    Assertions.assertEquals(ImmutableMap.of("customField1", "customValue1"), 
result.customFields());
+
+    Assertions.assertEquals(2, ((ShellJobTemplate) result).scripts().size());
+    List<String> scriptNames =
+        ((ShellJobTemplate) result)
+            .scripts().stream()
+                .map(script -> new File(script).getName())
+                .collect(Collectors.toList());
+    Assertions.assertTrue(scriptNames.contains(testScript1.getName()));
+    Assertions.assertTrue(scriptNames.contains(testScript2.getName()));
+  }
+
+  @Test
+  public void testCreateSparkRuntimeJobTemplate() throws IOException {
+    File executable = Files.createTempFile(tempDir.toPath(), "testSparkJob", 
".jar").toFile();
+    File jar1 = Files.createTempFile(tempDir.toPath(), "testJar1", 
".jar").toFile();
+    File jar2 = Files.createTempFile(tempDir.toPath(), "testJar2", 
".jar").toFile();
+
+    File file1 = Files.createTempFile(tempDir.toPath(), "testFile1", 
".txt").toFile();
+    File file2 = Files.createTempFile(tempDir.toPath(), "testFile2", 
".txt").toFile();
+
+    File archive1 = Files.createTempFile(tempDir.toPath(), "testArchive1", 
".zip").toFile();
+
+    SparkJobTemplate sparkJobTemplate =
+        SparkJobTemplate.builder()
+            .withName("testSparkJob")
+            .withComment("This is a test Spark job template")
+            .withExecutable(executable.toURI().toString())
+            .withClassName("org.apache.gravitino.TestSparkJob")
+            .withArguments(Lists.newArrayList("arg1", "arg2", "{{arg3}}"))
+            .withEnvironments(ImmutableMap.of("ENV_VAR1", "{{val1}}", 
"ENV_VAR2", "{{val2}}"))
+            .withCustomFields(ImmutableMap.of("customField1", 
"{{customVal1}}"))
+            .withJars(Lists.newArrayList(jar1.toURI().toString(), 
jar2.toURI().toString()))
+            .withFiles(Lists.newArrayList(file1.toURI().toString(), 
file2.toURI().toString()))
+            .withArchives(Lists.newArrayList(archive1.toURI().toString()))
+            .withConfigs(
+                ImmutableMap.of(
+                    "spark.executor.memory",
+                    "{{executor-mem}}",
+                    "spark.driver.cores",
+                    "{{driver-cores}}"))
+            .build();
+
+    JobTemplateEntity entity =
+        JobTemplateEntity.builder()
+            .withId(1L)
+            .withName(sparkJobTemplate.name())
+            .withComment(sparkJobTemplate.comment())
+            .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+            .withTemplateContent(
+                
JobTemplateEntity.TemplateContent.fromJobTemplate(sparkJobTemplate))
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    JobTemplate result =
+        JobManager.createRuntimeJobTemplate(
+            entity,
+            ImmutableMap.of(
+                "arg3", "value3",
+                "val1", "value1",
+                "val2", "value2",
+                "customVal1", "customValue1",
+                "executor-mem", "4g",
+                "driver-cores", "2"),
+            tempStagingDir);
+
+    Assertions.assertEquals(sparkJobTemplate.name(), result.name());
+    Assertions.assertEquals(sparkJobTemplate.comment(), result.comment());
+    Assertions.assertEquals(executable.getName(), new 
File(result.executable).getName());
+    Assertions.assertEquals(Lists.newArrayList("arg1", "arg2", "value3"), 
result.arguments());
+    Assertions.assertEquals(
+        ImmutableMap.of("ENV_VAR1", "value1", "ENV_VAR2", "value2"), 
result.environments());
+    Assertions.assertEquals(ImmutableMap.of("customField1", "customValue1"), 
result.customFields());
+
+    Assertions.assertEquals(2, ((SparkJobTemplate) result).jars().size());
+    List<String> jarNames =
+        ((SparkJobTemplate) result)
+            .jars().stream().map(jar -> new 
File(jar).getName()).collect(Collectors.toList());
+    Assertions.assertTrue(jarNames.contains(jar1.getName()));
+    Assertions.assertTrue(jarNames.contains(jar2.getName()));
+
+    Assertions.assertEquals(2, ((SparkJobTemplate) result).files().size());
+    List<String> fileNames =
+        ((SparkJobTemplate) result)
+            .files().stream().map(file -> new 
File(file).getName()).collect(Collectors.toList());
+    Assertions.assertTrue(fileNames.contains(file1.getName()));
+    Assertions.assertTrue(fileNames.contains(file2.getName()));
+
+    Assertions.assertEquals(1, ((SparkJobTemplate) result).archives().size());
+    List<String> archiveNames =
+        ((SparkJobTemplate) result)
+            .archives().stream()
+                .map(archive -> new File(archive).getName())
+                .collect(Collectors.toList());
+    Assertions.assertTrue(archiveNames.contains(archive1.getName()));
+
+    Assertions.assertEquals(
+        ImmutableMap.of("spark.executor.memory", "4g", "spark.driver.cores", 
"2"),
+        ((SparkJobTemplate) result).configs());
+  }
+}

Reply via email to