This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 46091e14b3 [#7505] feat(core): Add job execution logic and job
framework (part-2) (#7772)
46091e14b3 is described below
commit 46091e14b3cdf4a499ac6f8b256df5d28fb89083
Author: Jerry Shao <[email protected]>
AuthorDate: Mon Jul 28 09:50:48 2025 +0800
[#7505] feat(core): Add job execution logic and job framework (part-2)
(#7772)
### What changes were proposed in this pull request?
This is the second part of the job framework PR to add the job executor
interface and complete the job framework.
### Why are the changes needed?
Fix: #7505
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add UTs to cover the code changes.
---
.../java/org/apache/gravitino/job/JobHandle.java | 3 +
.../org/apache/gravitino/job/ShellJobTemplate.java | 11 +
.../org/apache/gravitino/job/SparkJobTemplate.java | 11 +
.../main/java/org/apache/gravitino/Configs.java | 11 +
.../gravitino/connector/job/JobExecutor.java | 83 ++++++
.../apache/gravitino/job/JobExecutorFactory.java | 60 ++++
.../java/org/apache/gravitino/job/JobManager.java | 291 ++++++++++++++++++-
.../gravitino/job/JobOperationDispatcher.java | 27 +-
.../java/org/apache/gravitino/meta/JobEntity.java | 19 +-
.../org/apache/gravitino/job/TestJobManager.java | 165 ++++++++++-
.../org/apache/gravitino/job/TestJobTemplate.java | 314 +++++++++++++++++++++
11 files changed, 975 insertions(+), 20 deletions(-)
diff --git a/api/src/main/java/org/apache/gravitino/job/JobHandle.java
b/api/src/main/java/org/apache/gravitino/job/JobHandle.java
index 38927538de..9aded575cc 100644
--- a/api/src/main/java/org/apache/gravitino/job/JobHandle.java
+++ b/api/src/main/java/org/apache/gravitino/job/JobHandle.java
@@ -42,6 +42,9 @@ public interface JobHandle {
/** The job has completed successfully. */
SUCCEEDED,
+ /** The job is being cancelled. */
+ CANCELLING,
+
/** The job has been cancelled. */
CANCELLED;
}
diff --git a/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
b/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
index afb6fdd07d..57edcca61c 100644
--- a/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
+++ b/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
@@ -89,11 +89,22 @@ public class ShellJobTemplate extends JobTemplate {
return "\nShellJobTemplate{\n" + super.toString() + sb + "}\n";
}
+ /**
+ * Creates a new builder for constructing instances of {@link
ShellJobTemplate}.
+ *
+ * @return a new instance of {@link Builder}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
/** Builder for creating instances of {@link ShellJobTemplate}. */
public static class Builder extends BaseBuilder<Builder, ShellJobTemplate> {
private List<String> scripts;
+ private Builder() {}
+
/**
* Sets the scripts to be executed by the shell job template.
*
diff --git a/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
b/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
index a1f68f7e75..13b6d8c6bf 100644
--- a/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
+++ b/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
@@ -181,6 +181,15 @@ public class SparkJobTemplate extends JobTemplate {
return sb + super.toString() + "}\n";
}
+ /**
+ * Creates a new builder for constructing instances of {@link
SparkJobTemplate}.
+ *
+ * @return a new instance of {@link Builder}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
/** Builder for creating instances of {@link SparkJobTemplate}. */
public static class Builder extends JobTemplate.BaseBuilder<Builder,
SparkJobTemplate> {
@@ -194,6 +203,8 @@ public class SparkJobTemplate extends JobTemplate {
private Map<String, String> configs;
+ private Builder() {}
+
/**
* Sets the class name of the Spark application to be executed.
*
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java
b/core/src/main/java/org/apache/gravitino/Configs.java
index 5a6544b1d4..8c283630eb 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -412,5 +412,16 @@ public class Configs {
.doc("Directory for managing staging files when running jobs.")
.version(ConfigConstants.VERSION_1_0_0)
.stringConf()
+ .checkValue(StringUtils::isNotBlank,
ConfigConstants.NOT_BLANK_ERROR_MSG)
.createWithDefault("/tmp/gravitino/jobs/staging");
+
+ public static final ConfigEntry<String> JOB_EXECUTOR =
+ new ConfigBuilder("gravitino.job.executor")
+ .doc(
+ "The executor to run jobs, by default it is 'local', user can
implement their own "
+ + "executor and set it here.")
+ .version(ConfigConstants.VERSION_1_0_0)
+ .stringConf()
+ .checkValue(StringUtils::isNotBlank,
ConfigConstants.NOT_BLANK_ERROR_MSG)
+ .createWithDefault("local");
}
diff --git
a/core/src/main/java/org/apache/gravitino/connector/job/JobExecutor.java
b/core/src/main/java/org/apache/gravitino/connector/job/JobExecutor.java
new file mode 100644
index 0000000000..40eacac9de
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/connector/job/JobExecutor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.connector.job;
+
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+
+/**
+ * The JobExecutor interface defines the API for executing jobs in a specific
Job runner, for
+ * example, Airflow, local runner, etc. The developer can implement this
interface to adapt to the
+ * specific job runner. The Gravitino core will use this interface to submit
jobs, get job status,
+ * etc.
+ */
+@DeveloperApi
+public interface JobExecutor extends Closeable {
+
+ /**
+ * Initialize the job executor with the given configurations.
+ *
+ * @param configs A map of configuration key-value pairs.
+ */
+ void initialize(Map<String, String> configs);
+
+ /**
+ * Submit a job with the given name and job template to the external job
runner.
+ *
+ * <p>The returned job identifier is unique in the external job runner, and
can be used to track
+ * the job status or cancel it later.
+ *
+ * <p>The placeholders in the job template has already been replaced with
the actual values before
+ * calling this method. So the implementors can directly use this job
template to submit the job
+ * to the external job runner.
+ *
+ * @param jobTemplate The job template containing the job configuration and
parameters.
+ * @return A unique identifier for the submitted job.
+ */
+ String submitJob(JobTemplate jobTemplate);
+
+ /**
+ * Get the status of a job by its unique identifier. The status should be
one of the values in
+ * {@link JobHandle.Status}. The implementors should query the external job
runner to get the job
+ * status, and map the status to the values in {@link JobHandle.Status}.
+ *
+ * @param jobId The unique identifier of the job.
+ * @return The status of the job.
+ * @throws NoSuchJobException If the job with the given identifier does not
exist.
+ */
+ JobHandle.Status getJobStatus(String jobId) throws NoSuchJobException;
+
+ /**
+ * Cancel a job by its unique identifier. The job runner should stop the job
if it is currently
+ * running. If the job is already completed, it should return directly
without any error. If the
+ * job does not exist, it should throw a {@link NoSuchJobException}.
+ *
+ * <p>This method should not be blocked on the job cancellation, it should
return immediately, and
+ * Gravitino can further query the job status to check if the job is
cancelled successfully.
+ *
+ * @param jobId The unique identifier of the job to cancel.
+ * @throws NoSuchJobException If the job with the given identifier does not
exist.
+ */
+ void cancelJob(String jobId) throws NoSuchJobException;
+}
diff --git
a/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java
b/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java
new file mode 100644
index 0000000000..cdf06c59af
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.connector.job.JobExecutor;
+
+public class JobExecutorFactory {
+
+ private static final String JOB_EXECUTOR_CONF_PREFIX = "gravitino.executor.";
+
+ private static final String JOB_EXECUTOR_CLASS_SUFFIX = ".class";
+
+ private JobExecutorFactory() {
+ // Private constructor to prevent instantiation
+ }
+
+ public static JobExecutor create(Config config) {
+ String jobExecutorName = config.get(Configs.JOB_EXECUTOR);
+ String jobExecutorClassKey =
+ JOB_EXECUTOR_CONF_PREFIX + jobExecutorName + JOB_EXECUTOR_CLASS_SUFFIX;
+ String clzName = config.getRawString(jobExecutorClassKey);
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clzName),
+ "Job executor class name must be specified for job executor: %s",
+ jobExecutorName);
+
+ Map<String, String> configs =
+ config.getConfigsWithPrefix(JOB_EXECUTOR_CONF_PREFIX + jobExecutorName
+ ".");
+ try {
+ JobExecutor jobExecutor =
+ (JobExecutor)
Class.forName(clzName).getDeclaredConstructor().newInstance();
+ jobExecutor.initialize(configs);
+ return jobExecutor;
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create job executor: " +
jobExecutorName, e);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 09c36cf22f..e0307713b9 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -21,11 +21,21 @@ package org.apache.gravitino.job;
import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
+import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.time.Instant;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.Configs;
import org.apache.gravitino.Entity;
@@ -34,6 +44,7 @@ import org.apache.gravitino.EntityStore;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.connector.job.JobExecutor;
import org.apache.gravitino.exceptions.InUseException;
import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -41,19 +52,38 @@ import org.apache.gravitino.exceptions.NoSuchJobException;
import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
import org.apache.gravitino.lock.LockType;
import org.apache.gravitino.lock.TreeLockUtils;
+import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.JobEntity;
import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.storage.IdGenerator;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
public class JobManager implements JobOperationDispatcher, Closeable {
+ private static final Pattern PLACEHOLDER_PATTERN =
Pattern.compile("\\{\\{([\\w.-]+)\\}\\}");
+
+ private static final int TIMEOUT_IN_MS = 30 * 1000; // 30 seconds
+
private final EntityStore entityStore;
private final File stagingDir;
- public JobManager(Config config, EntityStore entityStore) {
+ private final JobExecutor jobExecutor;
+
+ private final IdGenerator idGenerator;
+
+ public JobManager(Config config, EntityStore entityStore, IdGenerator
idGenerator) {
+ this(config, entityStore, idGenerator, JobExecutorFactory.create(config));
+ }
+
+ @VisibleForTesting
+ JobManager(
+ Config config, EntityStore entityStore, IdGenerator idGenerator,
JobExecutor jobExecutor) {
this.entityStore = entityStore;
+ this.jobExecutor = jobExecutor;
+ this.idGenerator = idGenerator;
String stagingDirPath = config.get(Configs.JOB_STAGING_DIR);
this.stagingDir = new File(stagingDirPath);
@@ -144,8 +174,19 @@ public class JobManager implements JobOperationDispatcher,
Closeable {
public boolean deleteJobTemplate(String metalake, String jobTemplateName)
throws InUseException {
checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
- // TODO. Check if there are any running jobs associated with the job
template. If there are
- // running jobs, throw InUseException
+ List<JobEntity> jobs = listJobs(metalake, Optional.of(jobTemplateName));
+ boolean hasActiveJobs =
+ jobs.stream()
+ .anyMatch(
+ job ->
+ job.status() != JobHandle.Status.CANCELLED
+ && job.status() != JobHandle.Status.SUCCEEDED
+ && job.status() != JobHandle.Status.FAILED);
+ if (hasActiveJobs) {
+ throw new InUseException(
+ "Job template %s under metalake %s has active jobs associated with
it",
+ jobTemplateName, metalake);
+ }
// Delete the job template entity as well as all the jobs associated with
it.
return TreeLockUtils.doWithTreeLock(
@@ -226,8 +267,250 @@ public class JobManager implements
JobOperationDispatcher, Closeable {
});
}
+ @Override
+ public JobEntity runJob(String metalake, String jobTemplateName, Map<String,
String> jobConf)
+ throws NoSuchJobTemplateException {
+ checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+ // Check if the job template exists, will throw NoSuchJobTemplateException
if it does not exist.
+ JobTemplateEntity jobTemplateEntity = getJobTemplate(metalake,
jobTemplateName);
+
+ // Create staging directory.
+ // TODO(jerry). The job staging directory will be deleted using a
background thread.
+ long jobId = idGenerator.nextId();
+ String jobStagingPath =
+ stagingDir.getAbsolutePath()
+ + File.separator
+ + metalake
+ + File.separator
+ + JobHandle.JOB_ID_PREFIX
+ + jobId;
+ File jobStagingDir = new File(jobStagingPath);
+ if (!jobStagingDir.mkdirs()) {
+ throw new RuntimeException(
+ String.format("Failed to create staging directory %s for job %s",
jobStagingDir, jobId));
+ }
+
+ // Create a JobTemplate by replacing the template parameters with the
jobConf values, and
+ // also downloading any necessary files from the URIs specified in the job
template.
+ JobTemplate jobTemplate = createRuntimeJobTemplate(jobTemplateEntity,
jobConf, jobStagingDir);
+
+ // Submit the job template to the job executor
+ String jobExecutionId;
+ try {
+ jobExecutionId = jobExecutor.submitJob(jobTemplate);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to submit job template %s for execution",
jobTemplate), e);
+ }
+
+ // Create a new JobEntity to represent the job
+ JobEntity jobEntity =
+ JobEntity.builder()
+ .withId(jobId)
+ .withJobExecutionId(jobExecutionId)
+ .withJobTemplateName(jobTemplateName)
+ .withStatus(JobHandle.Status.QUEUED)
+ .withNamespace(NamespaceUtil.ofJob(metalake))
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+
+ try {
+ entityStore.put(jobEntity, false /* overwrite */);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to register the job entity " +
jobEntity, e);
+ }
+
+ return jobEntity;
+ }
+
+ @Override
+ public JobEntity cancelJob(String metalake, String jobId) throws
NoSuchJobException {
+ checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+ // Retrieve the job entity, will throw NoSuchJobException if the job does
not exist.
+ JobEntity jobEntity = getJob(metalake, jobId);
+
+ if (jobEntity.status() == JobHandle.Status.CANCELLED
+ || jobEntity.status() == JobHandle.Status.SUCCEEDED
+ || jobEntity.status() == JobHandle.Status.FAILED) {
+ // If the job is already cancelled, succeeded, or failed, we do not need
to cancel it again.
+ return jobEntity;
+ }
+
+ // Cancel the job using the job executor
+ try {
+ jobExecutor.cancelJob(jobEntity.jobExecutionId());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to cancel job with ID %s under metalake %s",
jobId, metalake), e);
+ }
+
+ // Update the job status to CANCELING
+ JobEntity newJobEntity =
+ JobEntity.builder()
+ .withId(jobEntity.id())
+ .withJobExecutionId(jobEntity.jobExecutionId())
+ .withJobTemplateName(jobEntity.jobTemplateName())
+ .withStatus(JobHandle.Status.CANCELLING)
+ .withNamespace(jobEntity.namespace())
+ .withAuditInfo(
+ AuditInfo.builder()
+ .withCreator(jobEntity.auditInfo().creator())
+ .withCreateTime(jobEntity.auditInfo().createTime())
+
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+ .withLastModifiedTime(Instant.now())
+ .build())
+ .build();
+
+ try {
+ // Update the job entity in the entity store
+ entityStore.put(newJobEntity, true /* overwrite */);
+ return newJobEntity;
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to update job entity %s to CANCELING status",
newJobEntity), e);
+ }
+ }
+
@Override
public void close() throws IOException {
- // TODO. Implement any necessary cleanup logic for the JobManager.
+ jobExecutor.close();
+ // TODO(jerry). Implement any necessary cleanup logic for the JobManager.
+ }
+
+ @VisibleForTesting
+ static JobTemplate createRuntimeJobTemplate(
+ JobTemplateEntity jobTemplateEntity, Map<String, String> jobConf, File
stagingDir) {
+ String name = jobTemplateEntity.name();
+ String comment = jobTemplateEntity.comment();
+
+ JobTemplateEntity.TemplateContent content =
jobTemplateEntity.templateContent();
+ String executable = fetchFileFromUri(content.executable(), stagingDir,
TIMEOUT_IN_MS);
+
+ List<String> args =
+ content.arguments().stream()
+ .map(arg -> replacePlaceholder(arg, jobConf))
+ .collect(Collectors.toList());
+ Map<String, String> environments =
+ content.environments().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry -> replacePlaceholder(entry.getKey(), jobConf),
+ entry -> replacePlaceholder(entry.getValue(), jobConf)));
+ Map<String, String> customFields =
+ content.customFields().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry -> replacePlaceholder(entry.getKey(), jobConf),
+ entry -> replacePlaceholder(entry.getValue(), jobConf)));
+
+ // For shell job template
+ if (content.jobType() == JobTemplate.JobType.SHELL) {
+ List<String> scripts = fetchFilesFromUri(content.scripts(), stagingDir,
TIMEOUT_IN_MS);
+
+ return ShellJobTemplate.builder()
+ .withName(name)
+ .withComment(comment)
+ .withExecutable(executable)
+ .withArguments(args)
+ .withEnvironments(environments)
+ .withCustomFields(customFields)
+ .withScripts(scripts)
+ .build();
+ }
+
+ // For Spark job template
+ if (content.jobType() == JobTemplate.JobType.SPARK) {
+ String className = content.className();
+ List<String> jars = fetchFilesFromUri(content.jars(), stagingDir,
TIMEOUT_IN_MS);
+ List<String> files = fetchFilesFromUri(content.files(), stagingDir,
TIMEOUT_IN_MS);
+ List<String> archives = fetchFilesFromUri(content.archives(),
stagingDir, TIMEOUT_IN_MS);
+ Map<String, String> configs =
+ content.configs().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry -> replacePlaceholder(entry.getKey(), jobConf),
+ entry -> replacePlaceholder(entry.getValue(), jobConf)));
+
+ return SparkJobTemplate.builder()
+ .withName(name)
+ .withComment(comment)
+ .withExecutable(executable)
+ .withArguments(args)
+ .withEnvironments(environments)
+ .withCustomFields(customFields)
+ .withClassName(className)
+ .withJars(jars)
+ .withFiles(files)
+ .withArchives(archives)
+ .withConfigs(configs)
+ .build();
+ }
+
+ throw new IllegalArgumentException("Unsupported job type: " +
content.jobType());
+ }
+
+ @VisibleForTesting
+ static String replacePlaceholder(String inputString, Map<String, String>
replacements) {
+ if (StringUtils.isBlank(inputString)) {
+ return inputString; // Return as is if the input string is blank
+ }
+
+ StringBuffer result = new StringBuffer();
+
+ Matcher matcher = PLACEHOLDER_PATTERN.matcher(inputString);
+ while (matcher.find()) {
+ String key = matcher.group(1);
+ String replacement = replacements.get(key);
+ if (replacement != null) {
+ matcher.appendReplacement(result, replacement);
+ } else {
+ // If no replacement is found, keep the placeholder as is
+ matcher.appendReplacement(result, matcher.group(0));
+ }
+ }
+ matcher.appendTail(result);
+
+ return result.toString();
+ }
+
+ @VisibleForTesting
+ static List<String> fetchFilesFromUri(List<String> uris, File stagingDir,
int timeoutInMs) {
+ return uris.stream()
+ .map(uri -> fetchFileFromUri(uri, stagingDir, timeoutInMs))
+ .collect(Collectors.toList());
+ }
+
+ @VisibleForTesting
+ static String fetchFileFromUri(String uri, File stagingDir, int timeoutInMs)
{
+ try {
+ URI fileUri = new URI(uri);
+ String scheme = Optional.ofNullable(fileUri.getScheme()).orElse("file");
+ File destFile = new File(stagingDir, new
File(fileUri.getPath()).getName());
+
+ switch (scheme) {
+ case "http":
+ case "https":
+ case "ftp":
+ FileUtils.copyURLToFile(fileUri.toURL(), destFile, timeoutInMs,
timeoutInMs);
+ break;
+
+ case "file":
+ Files.createSymbolicLink(destFile.toPath(), new
File(fileUri.getPath()).toPath());
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unsupported scheme: " + scheme);
+ }
+
+ return destFile.getAbsolutePath();
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Failed to fetch file from URI
%s", uri), e);
+ }
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
index 42f0d75e05..0c83862f4c 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
@@ -20,6 +20,7 @@
package org.apache.gravitino.job;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import org.apache.gravitino.exceptions.InUseException;
import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
@@ -93,9 +94,25 @@ public interface JobOperationDispatcher {
*/
JobEntity getJob(String metalake, String jobId) throws NoSuchJobException;
- // TODO. Implement the runJob and cancelJob methods in another PR.
- // JobEntity runJob(String metalake, String jobTemplateName, Map<String,
String> jobConf)
- // throws NoSuchJobTemplateException;
- //
- // JobEntity cancelJob(String metalake, String jobId) throws
NoSuchJobException;
+ /**
+ * Runs a job based on the specified job template and configuration in the
specified metalake.
+ *
+ * @param metalake the name of the metalake
+ * @param jobTemplateName the name of the job template to use for running
the job
+ * @param jobConf the runtime configuration for the job, which contains
key-value pairs
+ * @return the job entity representing the job
+ * @throws NoSuchJobTemplateException if no job template with the specified
name exists
+ */
+ JobEntity runJob(String metalake, String jobTemplateName, Map<String,
String> jobConf)
+ throws NoSuchJobTemplateException;
+
+ /**
+ * Cancels a job by its ID in the specified metalake.
+ *
+ * @param metalake the name of the metalake
+ * @param jobId the ID of the job to cancel
+ * @return the job entity representing the job after cancellation
+ * @throws NoSuchJobException if no job with the specified ID exists
+ */
+ JobEntity cancelJob(String metalake, String jobId) throws NoSuchJobException;
}
diff --git a/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
b/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
index 4083154685..4d2868bca2 100644
--- a/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
@@ -36,6 +36,11 @@ public class JobEntity implements Entity, Auditable,
HasIdentifier {
public static final Field ID =
Field.required("id", Long.class, "The unique id of the job entity.");
+ public static final Field JOB_EXECUTION_ID =
+ Field.required(
+ "job_execution_id",
+ String.class,
+ "The unique execution id of the job, used for tracking.");
public static final Field STATUS =
Field.required("status", JobHandle.Status.class, "The status of the
job.");
public static final Field TEMPLATE_NAME =
@@ -45,6 +50,7 @@ public class JobEntity implements Entity, Auditable,
HasIdentifier {
"audit_info", AuditInfo.class, "The audit details of the job
template entity.");
private Long id;
+ private String jobExecutionId;
private JobHandle.Status status;
private String jobTemplateName;
private Namespace namespace;
@@ -56,6 +62,7 @@ public class JobEntity implements Entity, Auditable,
HasIdentifier {
public Map<Field, Object> fields() {
Map<Field, Object> fields = Maps.newHashMap();
fields.put(ID, id);
+ fields.put(JOB_EXECUTION_ID, jobExecutionId);
fields.put(TEMPLATE_NAME, jobTemplateName);
fields.put(STATUS, status);
fields.put(AUDIT_INFO, auditInfo);
@@ -67,6 +74,10 @@ public class JobEntity implements Entity, Auditable,
HasIdentifier {
return id;
}
+ public String jobExecutionId() {
+ return jobExecutionId;
+ }
+
@Override
public String name() {
return JobHandle.JOB_ID_PREFIX + id;
@@ -106,6 +117,7 @@ public class JobEntity implements Entity, Auditable,
HasIdentifier {
JobEntity that = (JobEntity) o;
return Objects.equals(id, that.id)
+ && Objects.equals(jobExecutionId, that.jobExecutionId)
&& Objects.equals(status, that.status)
&& Objects.equals(jobTemplateName, that.jobTemplateName)
&& Objects.equals(namespace, that.namespace)
@@ -114,7 +126,7 @@ public class JobEntity implements Entity, Auditable,
HasIdentifier {
@Override
public int hashCode() {
- return Objects.hash(id, namespace, status, jobTemplateName, auditInfo);
+ return Objects.hash(id, jobExecutionId, namespace, status,
jobTemplateName, auditInfo);
}
public static Builder builder() {
@@ -133,6 +145,11 @@ public class JobEntity implements Entity, Auditable,
HasIdentifier {
return this;
}
+ public Builder withJobExecutionId(String jobExecutionId) {
+ jobEntity.jobExecutionId = jobExecutionId;
+ return this;
+ }
+
public Builder withNamespace(Namespace namespace) {
jobEntity.namespace = namespace;
return this;
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
index 079f84aec3..25665d6127 100644
--- a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
@@ -18,6 +18,8 @@
*/
package org.apache.gravitino.job;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@@ -28,6 +30,7 @@ import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
@@ -41,6 +44,8 @@ import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.connector.job.JobExecutor;
+import org.apache.gravitino.exceptions.InUseException;
import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
import org.apache.gravitino.exceptions.MetalakeInUseException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -52,6 +57,8 @@ import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.JobEntity;
import org.apache.gravitino.meta.JobTemplateEntity;
import org.apache.gravitino.metalake.MetalakeManager;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.storage.RandomIdGenerator;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
import org.junit.jupiter.api.AfterAll;
@@ -78,6 +85,8 @@ public class TestJobManager {
private static MockedStatic<MetalakeManager> mockedMetalake;
+ private static JobExecutor jobExecutor;
+
@BeforeAll
public static void setUp() throws IllegalAccessException {
config = new Config(false) {};
@@ -86,10 +95,13 @@ public class TestJobManager {
config.set(Configs.JOB_STAGING_DIR, testStagingDir);
FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new
LockManager(config), true);
- entityStore = Mockito.mock(EntityStore.class);
- JobManager jm = new JobManager(config, entityStore);
+ entityStore = Mockito.mock(EntityStore.class);
+ jobExecutor = Mockito.mock(JobExecutor.class);
+ IdGenerator idGenerator = new RandomIdGenerator();
+ JobManager jm = new JobManager(config, entityStore, idGenerator,
jobExecutor);
jobManager = Mockito.spy(jm);
+
mockedMetalake = mockStatic(MetalakeManager.class);
}
@@ -106,6 +118,7 @@ public class TestJobManager {
// Reset the mocked static methods after each test
mockedMetalake.reset();
Mockito.reset(entityStore);
+ Mockito.reset(jobManager);
}
@Test
@@ -275,6 +288,10 @@ public class TestJobManager {
NameIdentifierUtil.ofJobTemplate(metalake,
shellJobTemplate.name()),
Entity.EntityType.JOB_TEMPLATE);
+ doReturn(Collections.emptyList())
+ .when(jobManager)
+ .listJobs(metalake, Optional.of(shellJobTemplate.name()));
+
// Delete an existing job template
Assertions.assertTrue(() -> jobManager.deleteJobTemplate(metalake,
"shell_job"));
@@ -293,6 +310,30 @@ public class TestJobManager {
.delete(NameIdentifierUtil.ofJobTemplate(metalake, "job"),
Entity.EntityType.JOB_TEMPLATE);
Assertions.assertThrows(
RuntimeException.class, () -> jobManager.deleteJobTemplate(metalake,
"job"));
+
+ // Delete job template that is in use
+ Lists.newArrayList(
+ JobHandle.Status.QUEUED, JobHandle.Status.STARTED,
JobHandle.Status.CANCELLING)
+ .forEach(
+ status -> {
+ doReturn(Lists.newArrayList(newJobEntity("shell_job", status)))
+ .when(jobManager)
+ .listJobs(metalake, Optional.of(shellJobTemplate.name()));
+ Assertions.assertThrows(
+ InUseException.class, () ->
jobManager.deleteJobTemplate(metalake, "shell_job"));
+ });
+
+ // Delete job template that is not in use
+ Lists.newArrayList(
+ JobHandle.Status.CANCELLED, JobHandle.Status.FAILED,
JobHandle.Status.SUCCEEDED)
+ .forEach(
+ status -> {
+ doReturn(Lists.newArrayList(newJobEntity("shell_job", status)))
+ .when(jobManager)
+ .listJobs(metalake, Optional.of(shellJobTemplate.name()));
+ Assertions.assertDoesNotThrow(
+ () -> jobManager.deleteJobTemplate(metalake, "shell_job"));
+ });
}
@Test
@@ -305,8 +346,8 @@ public class TestJobManager {
newShellJobTemplateEntity("shell_job", "A shell job template");
when(jobManager.getJobTemplate(metalake,
shellJobTemplate.name())).thenReturn(shellJobTemplate);
- JobEntity job1 = newJobEntity("shell_job");
- JobEntity job2 = newJobEntity("spark_job");
+ JobEntity job1 = newJobEntity("shell_job", JobHandle.Status.QUEUED);
+ JobEntity job2 = newJobEntity("spark_job", JobHandle.Status.QUEUED);
SupportsRelationOperations supportsRelationOperations =
Mockito.mock(SupportsRelationOperations.class);
@@ -356,7 +397,7 @@ public class TestJobManager {
.when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
.thenAnswer(a -> null);
- JobEntity job = newJobEntity("shell_job");
+ JobEntity job = newJobEntity("shell_job", JobHandle.Status.QUEUED);
when(entityStore.get(
NameIdentifierUtil.ofJob(metalake, job.name()),
Entity.EntityType.JOB, JobEntity.class))
.thenReturn(job);
@@ -385,9 +426,112 @@ public class TestJobManager {
Assertions.assertThrows(RuntimeException.class, () ->
jobManager.getJob(metalake, "job"));
}
+ @Test
+ public void testRunJob() throws IOException {
+ mockedMetalake
+ .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+ .thenAnswer(a -> null);
+
+ JobTemplateEntity shellJobTemplate =
+ newShellJobTemplateEntity("shell_job", "A shell job template");
+ when(jobManager.getJobTemplate(metalake,
shellJobTemplate.name())).thenReturn(shellJobTemplate);
+
+ String jobExecutionId = "job_execution_id_for_test";
+ when(jobExecutor.submitJob(any())).thenReturn(jobExecutionId);
+
+ doNothing().when(entityStore).put(any(JobEntity.class), anyBoolean());
+
+ JobEntity jobEntity = jobManager.runJob(metalake, "shell_job",
Collections.emptyMap());
+
+ Assertions.assertEquals(jobExecutionId, jobEntity.jobExecutionId());
+ Assertions.assertEquals("shell_job", jobEntity.jobTemplateName());
+ Assertions.assertEquals(JobHandle.Status.QUEUED, jobEntity.status());
+
+ // Test when job template does not exist
+ when(jobManager.getJobTemplate(metalake, "non_existent"))
+ .thenThrow(new NoSuchJobTemplateException("Job template does not
exist"));
+
+ Exception e =
+ Assertions.assertThrows(
+ NoSuchJobTemplateException.class,
+ () -> jobManager.runJob(metalake, "non_existent",
Collections.emptyMap()));
+ Assertions.assertEquals("Job template does not exist", e.getMessage());
+
+ // Test when job executor fails
+ doThrow(new RuntimeException("Job executor
error")).when(jobExecutor).submitJob(any());
+
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () -> jobManager.runJob(metalake, "shell_job",
Collections.emptyMap()));
+
+ // Test when entity store fails
+ doThrow(new IOException("Entity store error"))
+ .when(entityStore)
+ .put(any(JobEntity.class), anyBoolean());
+
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () -> jobManager.runJob(metalake, "shell_job",
Collections.emptyMap()));
+ }
+
+ @Test
+ public void testCancelJob() throws IOException {
+ mockedMetalake
+ .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+ .thenAnswer(a -> null);
+
+ JobEntity job = newJobEntity("shell_job", JobHandle.Status.QUEUED);
+ when(jobManager.getJob(metalake, job.name())).thenReturn(job);
+ doNothing().when(jobExecutor).cancelJob(job.jobExecutionId());
+ doNothing().when(entityStore).put(any(JobEntity.class), anyBoolean());
+
+ // Cancel an existing job
+ JobEntity cancelledJob = jobManager.cancelJob(metalake, job.name());
+ Assertions.assertEquals(job.jobExecutionId(),
cancelledJob.jobExecutionId());
+ Assertions.assertEquals(JobHandle.Status.CANCELLING,
cancelledJob.status());
+
+ // Test cancel a nonexistent job
+ when(jobManager.getJob(metalake, "non_existent"))
+ .thenThrow(new NoSuchJobException("Job does not exist"));
+
+ Exception e =
+ Assertions.assertThrows(
+ NoSuchJobException.class, () -> jobManager.cancelJob(metalake,
"non_existent"));
+ Assertions.assertEquals("Job does not exist", e.getMessage());
+
+ // Test cancelling a finished job
+ Lists.newArrayList(
+ JobHandle.Status.CANCELLED, JobHandle.Status.FAILED,
JobHandle.Status.SUCCEEDED)
+ .forEach(
+ status -> {
+ JobEntity finishedJob = newJobEntity("shell_job", status);
+ when(jobManager.getJob(metalake,
finishedJob.name())).thenReturn(finishedJob);
+
+ JobEntity cancelledFinishedJob = jobManager.cancelJob(metalake,
finishedJob.name());
+ Assertions.assertEquals(
+ finishedJob.jobExecutionId(),
cancelledFinishedJob.jobExecutionId());
+ Assertions.assertEquals(status, cancelledFinishedJob.status());
+ });
+
+ // Test job executor failed to cancel the job
+ doThrow(new RuntimeException("Job executor error"))
+ .when(jobExecutor)
+ .cancelJob(job.jobExecutionId());
+ Assertions.assertThrows(
+ RuntimeException.class, () -> jobManager.cancelJob(metalake,
job.name()));
+
+ // Test when entity store failed to update the job status
+ doThrow(new IOException("Entity store error"))
+ .when(entityStore)
+ .put(any(JobEntity.class), anyBoolean());
+
+ Assertions.assertThrows(
+ RuntimeException.class, () -> jobManager.cancelJob(metalake,
job.name()));
+ }
+
private static JobTemplateEntity newShellJobTemplateEntity(String name,
String comment) {
ShellJobTemplate shellJobTemplate =
- new ShellJobTemplate.Builder()
+ ShellJobTemplate.builder()
.withName(name)
.withComment(comment)
.withExecutable("/bin/echo")
@@ -406,11 +550,11 @@ public class TestJobManager {
private static JobTemplateEntity newSparkJobTemplateEntity(String name,
String comment) {
SparkJobTemplate sparkJobTemplate =
- new SparkJobTemplate.Builder()
+ SparkJobTemplate.builder()
.withName(name)
.withComment(comment)
.withClassName("org.apache.spark.examples.SparkPi")
- .withExecutable("local:///path/to/spark-examples.jar")
+ .withExecutable("file:/path/to/spark-examples.jar")
.build();
Random rand = new Random();
@@ -424,13 +568,14 @@ public class TestJobManager {
.build();
}
- private static JobEntity newJobEntity(String templateName) {
+ private static JobEntity newJobEntity(String templateName, JobHandle.Status
status) {
Random rand = new Random();
return JobEntity.builder()
.withId(rand.nextLong())
+ .withJobExecutionId(rand.nextLong() + "")
.withNamespace(NamespaceUtil.ofJob(metalake))
.withJobTemplateName(templateName)
- .withStatus(JobHandle.Status.QUEUED)
+ .withStatus(status)
.withAuditInfo(
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
.build();
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java
b/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java
new file mode 100644
index 0000000000..7c53f4ec4c
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestJobTemplate {
+
+ private static File tempDir;
+ private File tempStagingDir;
+
+ @BeforeAll
+ public static void setUpClass() throws IOException {
+ // Create a temporary directory for testing
+ tempDir = Files.createTempDirectory("job-template-test").toFile();
+ }
+
+ @AfterAll
+ public static void tearDownClass() throws IOException {
+ // Clean up the temporary directory after all tests
+ if (tempDir != null && tempDir.exists()) {
+ FileUtils.deleteDirectory(tempDir);
+ tempDir = null;
+ }
+ }
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ // Create a temporary staging directory for each test
+ tempStagingDir = Files.createTempDirectory(tempDir.toPath(),
"staging").toFile();
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ // Clean up the temporary staging directory after each test
+ if (tempStagingDir != null && tempStagingDir.exists()) {
+ FileUtils.deleteDirectory(tempStagingDir);
+ tempStagingDir = null;
+ }
+ }
+
+ @Test
+ public void testReplacePlaceholders() {
+ String template = "Hello, {{name}}! Welcome to {{place}}.";
+ Map<String, String> replacements = ImmutableMap.of("name", "Alice",
"place", "Wonderland");
+
+ String result = JobManager.replacePlaceholder(template, replacements);
+ Assertions.assertEquals("Hello, Alice! Welcome to Wonderland.", result);
+
+ // Test with missing replacements
+ replacements = ImmutableMap.of("name", "Bob");
+ result = JobManager.replacePlaceholder(template, replacements);
+ Assertions.assertEquals("Hello, Bob! Welcome to {{place}}.", result);
+
+ // Test with no replacements
+ result = JobManager.replacePlaceholder(template, ImmutableMap.of());
+ Assertions.assertEquals("Hello, {{name}}! Welcome to {{place}}.", result);
+
+ // Test with no placeholders
+ String noPlaceholders = "Hello, World!";
+ result = JobManager.replacePlaceholder(noPlaceholders, replacements);
+ Assertions.assertEquals("Hello, World!", result);
+
+ // Test with repeated placeholders
+ String repeatedTemplate = "Hello, {{name}}! Your name is {{name}}.";
+ replacements = ImmutableMap.of("name", "Charlie");
+ result = JobManager.replacePlaceholder(repeatedTemplate, replacements);
+ Assertions.assertEquals("Hello, Charlie! Your name is Charlie.", result);
+
+ // Test with incomplete placeholders
+ String incompleteTemplate = "Hello, {{name}! Welcome to {{place}}.";
+ replacements = ImmutableMap.of("name", "Dave", "place", "Earth");
+ result = JobManager.replacePlaceholder(incompleteTemplate, replacements);
+ Assertions.assertEquals("Hello, {{name}! Welcome to Earth.", result);
+
+ // Test with empty template
+ String emptyTemplate = "";
+ result = JobManager.replacePlaceholder(emptyTemplate, replacements);
+ Assertions.assertEquals("", result);
+
+ // Test with nested braces
+ String nestedTemplate = "Hello, {{name}}! Your code is {{{value}}}.";
+ replacements = ImmutableMap.of("name", "Eve", "value", "42");
+ result = JobManager.replacePlaceholder(nestedTemplate, replacements);
+ Assertions.assertEquals("Hello, Eve! Your code is {42}.", result);
+
+ nestedTemplate = "Hello, {{name}}! Your code is {{{{value}}}}.";
+ result = JobManager.replacePlaceholder(nestedTemplate, replacements);
+ Assertions.assertEquals("Hello, Eve! Your code is {{42}}.", result);
+
+ // Test with special characters in placeholders.
+ String specialTemplate = "Hello, {{name}}! Your score is {{score%}}.";
+ replacements = ImmutableMap.of("name", "Frank", "score%", "100%");
+ result = JobManager.replacePlaceholder(specialTemplate, replacements);
+ Assertions.assertEquals("Hello, Frank! Your score is {{score%}}.", result);
+
+ // Test with alphanumeric placeholders
+ String alphanumericTemplate = "Hello, {{user_name}}! Your score is
{{score123}}.";
+ replacements = ImmutableMap.of("user_name", "Grace", "score123", "200");
+ result = JobManager.replacePlaceholder(alphanumericTemplate, replacements);
+ Assertions.assertEquals("Hello, Grace! Your score is 200.", result);
+
+ // Test with "." and "-"
+ String dotDashTemplate = "Hello, {{user.name}}! Your score is
{{score-123}}.";
+ replacements = ImmutableMap.of("user.name", "Hank", "score-123", "300");
+ result = JobManager.replacePlaceholder(dotDashTemplate, replacements);
+ Assertions.assertEquals("Hello, Hank! Your score is 300.", result);
+ }
+
+ @Test
+ public void testFetchFilesFromUir() throws IOException {
+ File testFile1 = Files.createTempFile(tempDir.toPath(), "testFile1",
".txt").toFile();
+ String result =
+ JobManager.fetchFileFromUri(testFile1.toURI().toString(),
tempStagingDir, 30 * 1000);
+ File resultFile = new File(result);
+ Assertions.assertEquals(testFile1.getName(), resultFile.getName());
+
+ File testFile2 = Files.createTempFile(tempDir.toPath(), "testFile2",
".txt").toFile();
+ File testFile3 = Files.createTempFile(tempDir.toPath(), "testFile3",
".txt").toFile();
+
+ List<String> expectedUris =
+ Lists.newArrayList(testFile2.toURI().toString(),
testFile3.toURI().toString());
+ List<String> resultUris = JobManager.fetchFilesFromUri(expectedUris,
tempStagingDir, 30 * 1000);
+
+ Assertions.assertEquals(2, resultUris.size());
+ List<String> resultFileNames =
+ resultUris.stream().map(uri -> new
File(uri).getName()).collect(Collectors.toList());
+ Assertions.assertTrue(resultFileNames.contains(testFile2.getName()));
+ Assertions.assertTrue(resultFileNames.contains(testFile3.getName()));
+ }
+
+ @Test
+ public void testCreateShellRuntimeJobTemplate() throws IOException {
+ File testScript1 = Files.createTempFile(tempDir.toPath(), "testScript1",
".sh").toFile();
+ File testScript2 = Files.createTempFile(tempDir.toPath(), "testScript2",
".sh").toFile();
+
+ ShellJobTemplate shellJobTemplate =
+ ShellJobTemplate.builder()
+ .withName("testShellJob")
+ .withComment("This is a test shell job template")
+ .withExecutable("/bin/echo")
+ .withArguments(Lists.newArrayList("arg1", "arg2", "{{arg3}},
{{arg4}}"))
+ .withEnvironments(ImmutableMap.of("ENV_VAR1", "{{val1}}",
"ENV_VAR2", "{{val2}}"))
+ .withCustomFields(ImmutableMap.of("customField1",
"{{customVal1}}"))
+ .withScripts(
+ Lists.newArrayList(testScript1.toURI().toString(),
testScript2.toURI().toString()))
+ .build();
+
+ JobTemplateEntity entity =
+ JobTemplateEntity.builder()
+ .withId(1L)
+ .withName(shellJobTemplate.name())
+ .withComment(shellJobTemplate.comment())
+ .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+ .withTemplateContent(
+
JobTemplateEntity.TemplateContent.fromJobTemplate(shellJobTemplate))
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+
+ JobTemplate result =
+ JobManager.createRuntimeJobTemplate(
+ entity,
+ ImmutableMap.of(
+ "arg3", "value3",
+ "arg4", "value4",
+ "val1", "value1",
+ "val2", "value2",
+ "customVal1", "customValue1"),
+ tempStagingDir);
+
+ Assertions.assertEquals(shellJobTemplate.name(), result.name());
+ Assertions.assertEquals(shellJobTemplate.comment(), result.comment());
+ Assertions.assertEquals("echo", new File(result.executable).getName());
+ Assertions.assertEquals(
+ Lists.newArrayList("arg1", "arg2", "value3, value4"),
result.arguments());
+ Assertions.assertEquals(
+ ImmutableMap.of("ENV_VAR1", "value1", "ENV_VAR2", "value2"),
result.environments());
+ Assertions.assertEquals(ImmutableMap.of("customField1", "customValue1"),
result.customFields());
+
+ Assertions.assertEquals(2, ((ShellJobTemplate) result).scripts().size());
+ List<String> scriptNames =
+ ((ShellJobTemplate) result)
+ .scripts().stream()
+ .map(script -> new File(script).getName())
+ .collect(Collectors.toList());
+ Assertions.assertTrue(scriptNames.contains(testScript1.getName()));
+ Assertions.assertTrue(scriptNames.contains(testScript2.getName()));
+ }
+
+ @Test
+ public void testCreateSparkRuntimeJobTemplate() throws IOException {
+ File executable = Files.createTempFile(tempDir.toPath(), "testSparkJob",
".jar").toFile();
+ File jar1 = Files.createTempFile(tempDir.toPath(), "testJar1",
".jar").toFile();
+ File jar2 = Files.createTempFile(tempDir.toPath(), "testJar2",
".jar").toFile();
+
+ File file1 = Files.createTempFile(tempDir.toPath(), "testFile1",
".txt").toFile();
+ File file2 = Files.createTempFile(tempDir.toPath(), "testFile2",
".txt").toFile();
+
+ File archive1 = Files.createTempFile(tempDir.toPath(), "testArchive1",
".zip").toFile();
+
+ SparkJobTemplate sparkJobTemplate =
+ SparkJobTemplate.builder()
+ .withName("testSparkJob")
+ .withComment("This is a test Spark job template")
+ .withExecutable(executable.toURI().toString())
+ .withClassName("org.apache.gravitino.TestSparkJob")
+ .withArguments(Lists.newArrayList("arg1", "arg2", "{{arg3}}"))
+ .withEnvironments(ImmutableMap.of("ENV_VAR1", "{{val1}}",
"ENV_VAR2", "{{val2}}"))
+ .withCustomFields(ImmutableMap.of("customField1",
"{{customVal1}}"))
+ .withJars(Lists.newArrayList(jar1.toURI().toString(),
jar2.toURI().toString()))
+ .withFiles(Lists.newArrayList(file1.toURI().toString(),
file2.toURI().toString()))
+ .withArchives(Lists.newArrayList(archive1.toURI().toString()))
+ .withConfigs(
+ ImmutableMap.of(
+ "spark.executor.memory",
+ "{{executor-mem}}",
+ "spark.driver.cores",
+ "{{driver-cores}}"))
+ .build();
+
+ JobTemplateEntity entity =
+ JobTemplateEntity.builder()
+ .withId(1L)
+ .withName(sparkJobTemplate.name())
+ .withComment(sparkJobTemplate.comment())
+ .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+ .withTemplateContent(
+
JobTemplateEntity.TemplateContent.fromJobTemplate(sparkJobTemplate))
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+
+ JobTemplate result =
+ JobManager.createRuntimeJobTemplate(
+ entity,
+ ImmutableMap.of(
+ "arg3", "value3",
+ "val1", "value1",
+ "val2", "value2",
+ "customVal1", "customValue1",
+ "executor-mem", "4g",
+ "driver-cores", "2"),
+ tempStagingDir);
+
+ Assertions.assertEquals(sparkJobTemplate.name(), result.name());
+ Assertions.assertEquals(sparkJobTemplate.comment(), result.comment());
+ Assertions.assertEquals(executable.getName(), new
File(result.executable).getName());
+ Assertions.assertEquals(Lists.newArrayList("arg1", "arg2", "value3"),
result.arguments());
+ Assertions.assertEquals(
+ ImmutableMap.of("ENV_VAR1", "value1", "ENV_VAR2", "value2"),
result.environments());
+ Assertions.assertEquals(ImmutableMap.of("customField1", "customValue1"),
result.customFields());
+
+ Assertions.assertEquals(2, ((SparkJobTemplate) result).jars().size());
+ List<String> jarNames =
+ ((SparkJobTemplate) result)
+ .jars().stream().map(jar -> new
File(jar).getName()).collect(Collectors.toList());
+ Assertions.assertTrue(jarNames.contains(jar1.getName()));
+ Assertions.assertTrue(jarNames.contains(jar2.getName()));
+
+ Assertions.assertEquals(2, ((SparkJobTemplate) result).files().size());
+ List<String> fileNames =
+ ((SparkJobTemplate) result)
+ .files().stream().map(file -> new
File(file).getName()).collect(Collectors.toList());
+ Assertions.assertTrue(fileNames.contains(file1.getName()));
+ Assertions.assertTrue(fileNames.contains(file2.getName()));
+
+ Assertions.assertEquals(1, ((SparkJobTemplate) result).archives().size());
+ List<String> archiveNames =
+ ((SparkJobTemplate) result)
+ .archives().stream()
+ .map(archive -> new File(archive).getName())
+ .collect(Collectors.toList());
+ Assertions.assertTrue(archiveNames.contains(archive1.getName()));
+
+ Assertions.assertEquals(
+ ImmutableMap.of("spark.executor.memory", "4g", "spark.driver.cores",
"2"),
+ ((SparkJobTemplate) result).configs());
+ }
+}