This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new eba4808c2e [#7840] feat(core): Add job status puller and orphan file 
cleaner (#7936)
eba4808c2e is described below

commit eba4808c2eac97dafdc0fd8bb64cc6df751f117f
Author: Jerry Shao <[email protected]>
AuthorDate: Thu Aug 7 10:00:08 2025 +0800

    [#7840] feat(core): Add job status puller and orphan file cleaner (#7936)
    
    ### What changes were proposed in this pull request?
    
    Add the job status pulling thread and orphan file cleaner thread for job
    manager.
    
    ### Why are the changes needed?
    
    
    Fix: #7840
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add UTs.
---
 .../main/java/org/apache/gravitino/Configs.java    |  21 ++
 .../java/org/apache/gravitino/job/JobManager.java  | 219 ++++++++++++++++++++-
 .../java/org/apache/gravitino/meta/JobEntity.java  |  13 ++
 .../gravitino/storage/relational/JDBCBackend.java  |   2 +
 .../storage/relational/mapper/JobMetaMapper.java   |   3 +
 .../mapper/JobMetaSQLProviderFactory.java          |   4 +
 .../provider/base/JobMetaBaseSQLProvider.java      |   8 +
 .../postgresql/JobMetaPostgreSQLProvider.java      |   8 +
 .../gravitino/storage/relational/po/JobPO.java     |   1 +
 .../storage/relational/service/JobMetaService.java |  16 +-
 .../org/apache/gravitino/job/TestJobManager.java   |  74 ++++++-
 .../relational/service/TestJobMetaService.java     |  42 ++++
 12 files changed, 398 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/Configs.java 
b/core/src/main/java/org/apache/gravitino/Configs.java
index d90e069bc2..829f6330bf 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -424,4 +424,25 @@ public class Configs {
           .stringConf()
           .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
           .createWithDefault("local");
+
+  public static final ConfigEntry<Long> JOB_STAGING_DIR_KEEP_TIME_IN_MS =
+      new ConfigBuilder("gravitino.job.stagingDirKeepTimeInMs")
+          .doc(
+              "The time in milliseconds to keep the staging files of the 
finished job in the job"
+                  + " staging directory. The minimum recommended value is 10 
minutes if you're "
+                  + "not testing.")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .longConf()
+          .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+          .createWithDefault(7 * 24 * 3600 * 1000L); // Default is 7 days
+
+  public static final ConfigEntry<Long> JOB_STATUS_PULL_INTERVAL_IN_MS =
+      new ConfigBuilder("gravitino.job.statusPullIntervalInMs")
+          .doc(
+              "The interval in milliseconds to pull the job status from the 
job executor. The "
+                  + "minimum recommended value is 1 minute if you're not 
testing.")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .longConf()
+          .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+          .createWithDefault(5 * 60 * 1000L); // Default is 5 minutes
 }
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java 
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 4ecb5066b9..e32bb3752c 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -19,9 +19,11 @@
 
 package org.apache.gravitino.job;
 
+import static org.apache.gravitino.Metalake.PROPERTY_IN_USE;
 import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -30,6 +32,9 @@ import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -52,17 +57,35 @@ import 
org.apache.gravitino.exceptions.NoSuchJobTemplateException;
 import org.apache.gravitino.lock.LockType;
 import org.apache.gravitino.lock.TreeLockUtils;
 import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.JobEntity;
 import org.apache.gravitino.meta.JobTemplateEntity;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class JobManager implements JobOperationDispatcher {
 
+  private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
+
   private static final Pattern PLACEHOLDER_PATTERN = 
Pattern.compile("\\{\\{([\\w.-]+)\\}\\}");
 
+  private static final String JOB_STAGING_DIR =
+      File.separator
+          + "%s"
+          + File.separator
+          + "%s"
+          + File.separator
+          + JobHandle.JOB_ID_PREFIX
+          + "%s";
+
+  private static final long JOB_STAGING_DIR_CLEANUP_MIN_TIME_IN_MS = 600 * 
1000L; // 10 minute
+
+  private static final long JOB_STATUS_PULL_MIN_INTERVAL_IN_MS = 60 * 1000L; 
// 1 minute
+
   private static final int TIMEOUT_IN_MS = 30 * 1000; // 30 seconds
 
   private final EntityStore entityStore;
@@ -73,6 +96,12 @@ public class JobManager implements JobOperationDispatcher {
 
   private final IdGenerator idGenerator;
 
+  private final long jobStagingDirKeepTimeInMs;
+
+  private final ScheduledExecutorService cleanUpExecutor;
+
+  private final ScheduledExecutorService statusPullExecutor;
+
   public JobManager(Config config, EntityStore entityStore, IdGenerator 
idGenerator) {
     this(config, entityStore, idGenerator, JobExecutorFactory.create(config));
   }
@@ -102,6 +131,56 @@ public class JobManager implements JobOperationDispatcher {
             String.format("Failed to create staging directory %s", 
stagingDirPath));
       }
     }
+
+    this.jobStagingDirKeepTimeInMs = 
config.get(Configs.JOB_STAGING_DIR_KEEP_TIME_IN_MS);
+    if (jobStagingDirKeepTimeInMs < JOB_STAGING_DIR_CLEANUP_MIN_TIME_IN_MS) {
+      LOG.warn(
+          "The job staging directory keep time is set to {} ms, the number is 
too small, "
+              + "which will cause frequent cleanup, please set it to a value 
larger than {} if "
+              + "you're not using it to do the test.",
+          jobStagingDirKeepTimeInMs,
+          JOB_STAGING_DIR_CLEANUP_MIN_TIME_IN_MS);
+    }
+
+    this.cleanUpExecutor =
+        Executors.newSingleThreadScheduledExecutor(
+            runnable -> {
+              Thread thread = new Thread(runnable, "job-staging-dir-cleanup");
+              thread.setDaemon(true);
+              return thread;
+            });
+    long scheduleInterval = jobStagingDirKeepTimeInMs / 10;
+    Preconditions.checkArgument(
+        scheduleInterval != 0,
+        "The schedule interval for "
+            + "job staging directory cleanup cannot be zero, please set the 
job staging directory "
+            + "keep time to a value larger than %s ms",
+        JOB_STAGING_DIR_CLEANUP_MIN_TIME_IN_MS);
+
+    cleanUpExecutor.scheduleAtFixedRate(
+        this::cleanUpStagingDirs, scheduleInterval, scheduleInterval, 
TimeUnit.MILLISECONDS);
+
+    long jobStatusPullIntervalInMs = 
config.get(Configs.JOB_STATUS_PULL_INTERVAL_IN_MS);
+    if (jobStatusPullIntervalInMs < JOB_STATUS_PULL_MIN_INTERVAL_IN_MS) {
+      LOG.warn(
+          "The job status pull interval is set to {} ms, the number is too 
small, "
+              + "which will cause frequent job status pull from external job 
executor, please set "
+              + "it to a value larger than {} if you're not using it to do the 
test.",
+          jobStatusPullIntervalInMs,
+          JOB_STATUS_PULL_MIN_INTERVAL_IN_MS);
+    }
+    this.statusPullExecutor =
+        Executors.newSingleThreadScheduledExecutor(
+            runnable -> {
+              Thread thread = new Thread(runnable, "job-status-pull");
+              thread.setDaemon(true);
+              return thread;
+            });
+    statusPullExecutor.scheduleAtFixedRate(
+        this::pullAndUpdateJobStatus,
+        jobStatusPullIntervalInMs,
+        jobStatusPullIntervalInMs,
+        TimeUnit.MILLISECONDS);
   }
 
   @Override
@@ -187,6 +266,18 @@ public class JobManager implements JobOperationDispatcher {
           jobTemplateName, metalake);
     }
 
+    // Delete all the job staging directories associated with the job template.
+    String jobTemplateStagingPath =
+        stagingDir.getAbsolutePath() + File.separator + metalake + 
File.separator + jobTemplateName;
+    File jobTemplateStagingDir = new File(jobTemplateStagingPath);
+    if (jobTemplateStagingDir.exists()) {
+      try {
+        FileUtils.deleteDirectory(jobTemplateStagingDir);
+      } catch (IOException e) {
+        LOG.error("Failed to delete job template staging directory: {}", 
jobTemplateStagingPath, e);
+      }
+    }
+
     // Delete the job template entity as well as all the jobs associated with 
it.
     return TreeLockUtils.doWithTreeLock(
         NameIdentifier.of(NamespaceUtil.ofJobTemplate(metalake).levels()),
@@ -275,15 +366,10 @@ public class JobManager implements JobOperationDispatcher 
{
     JobTemplateEntity jobTemplateEntity = getJobTemplate(metalake, 
jobTemplateName);
 
     // Create staging directory.
-    // TODO(jerry). The job staging directory will be deleted using a 
background thread.
     long jobId = idGenerator.nextId();
     String jobStagingPath =
         stagingDir.getAbsolutePath()
-            + File.separator
-            + metalake
-            + File.separator
-            + JobHandle.JOB_ID_PREFIX
-            + jobId;
+            + String.format(JOB_STAGING_DIR, metalake, jobTemplateName, jobId);
     File jobStagingDir = new File(jobStagingPath);
     if (!jobStagingDir.mkdirs()) {
       throw new RuntimeException(
@@ -351,8 +437,6 @@ public class JobManager implements JobOperationDispatcher {
           String.format("Failed to cancel job with ID %s under metalake %s", 
jobId, metalake), e);
     }
 
-    // TODO(jerry). Implement a background thread to monitor the job status 
and update it. Also,
-    //  we should delete the finished job entities after a certain period of 
time.
     // Update the job status to CANCELING
     JobEntity newJobEntity =
         JobEntity.builder()
@@ -388,7 +472,104 @@ public class JobManager implements JobOperationDispatcher 
{
   @Override
   public void close() throws IOException {
     jobExecutor.close();
-    // TODO(jerry). Implement any necessary cleanup logic for the JobManager.
+    statusPullExecutor.shutdownNow();
+    cleanUpExecutor.shutdownNow();
+  }
+
+  @VisibleForTesting
+  void pullAndUpdateJobStatus() {
+    List<String> metalakes = listInUseMetalakes(entityStore);
+    for (String metalake : metalakes) {
+      // This unnecessary list all the jobs, we need to improve the code to 
only list the active
+      // jobs.
+      List<JobEntity> activeJobs =
+          listJobs(metalake, Optional.empty()).stream()
+              .filter(
+                  job ->
+                      job.status() == JobHandle.Status.QUEUED
+                          || job.status() == JobHandle.Status.STARTED
+                          || job.status() == JobHandle.Status.CANCELLING)
+              .toList();
+
+      activeJobs.forEach(
+          job -> {
+            JobHandle.Status newStatus = 
jobExecutor.getJobStatus(job.jobExecutionId());
+            if (newStatus != job.status()) {
+              JobEntity newJobEntity =
+                  JobEntity.builder()
+                      .withId(job.id())
+                      .withJobExecutionId(job.jobExecutionId())
+                      .withJobTemplateName(job.jobTemplateName())
+                      .withStatus(newStatus)
+                      .withNamespace(job.namespace())
+                      .withAuditInfo(
+                          AuditInfo.builder()
+                              .withCreator(job.auditInfo().creator())
+                              .withCreateTime(job.auditInfo().createTime())
+                              
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                              .withLastModifiedTime(Instant.now())
+                              .build())
+                      .build();
+
+              // Update the job entity with new status.
+              TreeLockUtils.doWithTreeLock(
+                  NameIdentifierUtil.ofJob(metalake, job.name()),
+                  LockType.WRITE,
+                  () -> {
+                    try {
+                      entityStore.put(newJobEntity, true /* overwrite */);
+                      return null;
+                    } catch (IOException e) {
+                      throw new RuntimeException(
+                          String.format(
+                              "Failed to update job entity %s to status %s",
+                              newJobEntity, newStatus),
+                          e);
+                    }
+                  });
+            }
+          });
+    }
+  }
+
+  @VisibleForTesting
+  void cleanUpStagingDirs() {
+    List<String> metalakes = listInUseMetalakes(entityStore);
+
+    for (String metalake : metalakes) {
+      List<JobEntity> finishedJobs =
+          listJobs(metalake, Optional.empty()).stream()
+              .filter(
+                  job ->
+                      job.status() == JobHandle.Status.CANCELLED
+                          || job.status() == JobHandle.Status.SUCCEEDED
+                          || job.status() == JobHandle.Status.FAILED)
+              .filter(
+                  job ->
+                      job.finishedAt() > 0
+                          && job.finishedAt() + jobStagingDirKeepTimeInMs
+                              < System.currentTimeMillis())
+              .toList();
+
+      finishedJobs.forEach(
+          job -> {
+            try {
+              entityStore.delete(
+                  NameIdentifierUtil.ofJob(metalake, job.name()), 
Entity.EntityType.JOB);
+
+              String jobStagingPath =
+                  stagingDir.getAbsolutePath()
+                      + String.format(JOB_STAGING_DIR, metalake, 
job.jobTemplateName(), job.id());
+              File jobStagingDir = new File(jobStagingPath);
+              if (jobStagingDir.exists()) {
+                FileUtils.deleteDirectory(jobStagingDir);
+                LOG.info("Deleted job staging directory {} for job {}", 
jobStagingPath, job.name());
+              }
+            } catch (IOException e) {
+              LOG.error("Failed to delete job and staging directory for job 
{}", job.name(), e);
+            }
+          });
+    }
   }
 
   @VisibleForTesting
@@ -469,7 +650,7 @@ public class JobManager implements JobOperationDispatcher {
       return inputString; // Return as is if the input string is blank
     }
 
-    StringBuffer result = new StringBuffer();
+    StringBuilder result = new StringBuilder();
 
     Matcher matcher = PLACEHOLDER_PATTERN.matcher(inputString);
     while (matcher.find()) {
@@ -521,4 +702,22 @@ public class JobManager implements JobOperationDispatcher {
       throw new RuntimeException(String.format("Failed to fetch file from URI 
%s", uri), e);
     }
   }
+
+  private static List<String> listInUseMetalakes(EntityStore entityStore) {
+    try {
+      List<BaseMetalake> metalakes =
+          TreeLockUtils.doWithRootTreeLock(
+              LockType.READ,
+              () ->
+                  entityStore.list(
+                      Namespace.empty(), BaseMetalake.class, 
Entity.EntityType.METALAKE));
+      return metalakes.stream()
+          .filter(
+              m -> (boolean) 
m.propertiesMetadata().getOrDefault(m.properties(), PROPERTY_IN_USE))
+          .map(BaseMetalake::name)
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to list in-use metalakes", e);
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/gravitino/meta/JobEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
index 4d2868bca2..6dff1cc84b 100644
--- a/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
@@ -48,6 +48,8 @@ public class JobEntity implements Entity, Auditable, 
HasIdentifier {
   public static final Field AUDIT_INFO =
       Field.required(
           "audit_info", AuditInfo.class, "The audit details of the job 
template entity.");
+  public static final Field FINISHED_AT =
+      Field.optional("job_finished_at", Long.class, "The time when the job 
finished execution.");
 
   private Long id;
   private String jobExecutionId;
@@ -55,6 +57,7 @@ public class JobEntity implements Entity, Auditable, 
HasIdentifier {
   private String jobTemplateName;
   private Namespace namespace;
   private AuditInfo auditInfo;
+  private Long finishedAt;
 
   private JobEntity() {}
 
@@ -66,6 +69,7 @@ public class JobEntity implements Entity, Auditable, 
HasIdentifier {
     fields.put(TEMPLATE_NAME, jobTemplateName);
     fields.put(STATUS, status);
     fields.put(AUDIT_INFO, auditInfo);
+    fields.put(FINISHED_AT, finishedAt);
     return Collections.unmodifiableMap(fields);
   }
 
@@ -96,6 +100,10 @@ public class JobEntity implements Entity, Auditable, 
HasIdentifier {
     return jobTemplateName;
   }
 
+  public Long finishedAt() {
+    return finishedAt;
+  }
+
   @Override
   public AuditInfo auditInfo() {
     return auditInfo;
@@ -170,6 +178,11 @@ public class JobEntity implements Entity, Auditable, 
HasIdentifier {
       return this;
     }
 
+    public Builder withFinishedAt(Long finishedAt) {
+      jobEntity.finishedAt = finishedAt;
+      return this;
+    }
+
     public JobEntity build() {
       jobEntity.validate();
       return jobEntity;
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index d512ce29b4..7c92e88a2b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -307,6 +307,8 @@ public class JDBCBackend implements RelationalBackend {
         return PolicyMetaService.getInstance().deletePolicy(ident);
       case JOB_TEMPLATE:
         return JobTemplateMetaService.getInstance().deleteJobTemplate(ident);
+      case JOB:
+        return JobMetaService.getInstance().deleteJob(ident);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for delete operation", entityType);
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
index a8a45e3888..3bc359756d 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
@@ -67,4 +67,7 @@ public interface JobMetaMapper {
   @DeleteProvider(type = JobMetaSQLProviderFactory.class, method = 
"deleteJobMetasByLegacyTimeline")
   Integer deleteJobMetasByLegacyTimeline(
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+  @UpdateProvider(type = JobMetaSQLProviderFactory.class, method = 
"softDeleteJobMetaByRunId")
+  Integer softDeleteJobMetaByRunId(@Param("jobRunId") Long jobRunId);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
index 25a4c924f6..fccb116a9c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
@@ -94,4 +94,8 @@ public class JobMetaSQLProviderFactory {
       @Param("legacyTimeline") Long legacyTimeline) {
     return getProvider().softDeleteJobMetasByLegacyTimeline(legacyTimeline);
   }
+
+  public static String softDeleteJobMetaByRunId(@Param("jobRunId") Long 
jobRunId) {
+    return getProvider().softDeleteJobMetaByRunId(jobRunId);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
index 03abcfcdc5..9f013f9586 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
@@ -159,6 +159,14 @@ public class JobMetaBaseSQLProvider {
         + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
   }
 
+  public String softDeleteJobMetaByRunId(@Param("jobRunId") Long jobRunId) {
+    return "UPDATE "
+        + JobMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000.0"
+        + " WHERE job_run_id = #{jobRunId} AND deleted_at = 0";
+  }
+
   public String softDeleteJobMetasByLegacyTimeline(@Param("legacyTimeline") 
Long legacyTimeline) {
     return "UPDATE "
         + JobMetaMapper.TABLE_NAME
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
index ec9f851d74..d83cd98073 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
@@ -86,6 +86,14 @@ public class JobMetaPostgreSQLProvider extends 
JobMetaBaseSQLProvider {
         + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
   }
 
+  public String softDeleteJobMetaByRunId(@Param("jobRunId") Long jobRunId) {
+    return "UPDATE "
+        + JobMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE job_run_id = #{jobRunId} AND deleted_at = 0";
+  }
+
   @Override
   public String softDeleteJobMetasByLegacyTimeline(@Param("legacyTimeline") 
Long legacyTimeline) {
     return "UPDATE "
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java
index 236a1eb6c3..8cb9d2e379 100644
--- a/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java
+++ b/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java
@@ -135,6 +135,7 @@ public class JobPO {
           .withStatus(JobHandle.Status.valueOf(jobPO.jobRunStatus))
           .withJobTemplateName(jobPO.jobTemplateName)
           .withAuditInfo(JsonUtils.anyFieldMapper().readValue(jobPO.auditInfo, 
AuditInfo.class))
+          .withFinishedAt(jobPO.jobFinishedAt())
           .build();
     } catch (JsonProcessingException e) {
       throw new RuntimeException("Failed to deserialize job PO", e);
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
index ebc9c70760..485dba11b7 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
@@ -69,7 +69,7 @@ public class JobMetaService {
   public JobEntity getJobByIdentifier(NameIdentifier ident) {
     String metalakeName = ident.namespace().level(0);
     String jobRunId = ident.name();
-    Long jobRunIdLong;
+    long jobRunIdLong;
     try {
       jobRunIdLong = 
Long.parseLong(jobRunId.substring(JobHandle.JOB_ID_PREFIX.length()));
     } catch (NumberFormatException e) {
@@ -112,6 +112,20 @@ public class JobMetaService {
     }
   }
 
+  public boolean deleteJob(NameIdentifier jobIdent) {
+    String jobRunId = jobIdent.name();
+    long jobRunIdLong;
+    try {
+      jobRunIdLong = 
Long.parseLong(jobRunId.substring(JobHandle.JOB_ID_PREFIX.length()));
+    } catch (NumberFormatException e) {
+      throw new NoSuchEntityException("Invalid job run ID format %s", 
jobRunId);
+    }
+    int result =
+        SessionUtils.doWithCommitAndFetchResult(
+            JobMetaMapper.class, mapper -> 
mapper.softDeleteJobMetaByRunId(jobRunIdLong));
+    return result > 0;
+  }
+
   public int deleteJobsByLegacyTimeline(long legacyTimeline, int limit) {
     // Mark jobs as deleted for finished jobs, so that they can be cleaned up 
later
     SessionUtils.doWithCommit(
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java 
b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
index 25665d6127..1070af6bbd 100644
--- a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
@@ -24,8 +24,12 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
@@ -34,6 +38,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Config;
@@ -43,6 +48,7 @@ import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
 import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.connector.job.JobExecutor;
 import org.apache.gravitino.exceptions.InUseException;
@@ -54,13 +60,16 @@ import 
org.apache.gravitino.exceptions.NoSuchJobTemplateException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.JobEntity;
 import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.meta.SchemaVersion;
 import org.apache.gravitino.metalake.MetalakeManager;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -87,18 +96,21 @@ public class TestJobManager {
 
   private static JobExecutor jobExecutor;
 
+  private static IdGenerator idGenerator;
+
   @BeforeAll
   public static void setUp() throws IllegalAccessException {
     config = new Config(false) {};
     Random rand = new Random();
-    testStagingDir = "test_staging_dir_" + rand.nextInt(1000);
+    testStagingDir = "test_staging_dir_" + rand.nextInt(100);
     config.set(Configs.JOB_STAGING_DIR, testStagingDir);
+    config.set(Configs.JOB_STAGING_DIR_KEEP_TIME_IN_MS, 1000L);
 
     FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
 
     entityStore = Mockito.mock(EntityStore.class);
     jobExecutor = Mockito.mock(JobExecutor.class);
-    IdGenerator idGenerator = new RandomIdGenerator();
+    idGenerator = new RandomIdGenerator();
     JobManager jm = new JobManager(config, entityStore, idGenerator, 
jobExecutor);
     jobManager = Mockito.spy(jm);
 
@@ -529,6 +541,63 @@ public class TestJobManager {
         RuntimeException.class, () -> jobManager.cancelJob(metalake, 
job.name()));
   }
 
+  @Test
+  public void testPullJobStatus() throws IOException {
+    JobEntity job = newJobEntity("shell_job", JobHandle.Status.QUEUED);
+    BaseMetalake mockMetalake =
+        BaseMetalake.builder()
+            .withName(metalake)
+            .withId(idGenerator.nextId())
+            .withVersion(SchemaVersion.V_0_1)
+            .withAuditInfo(AuditInfo.EMPTY)
+            .build();
+    when(entityStore.list(Namespace.empty(), BaseMetalake.class, 
Entity.EntityType.METALAKE))
+        .thenReturn(ImmutableList.of(mockMetalake));
+    when(jobManager.listJobs(metalake, 
Optional.empty())).thenReturn(ImmutableList.of(job));
+
+    
when(jobExecutor.getJobStatus(job.jobExecutionId())).thenReturn(JobHandle.Status.QUEUED);
+    Assertions.assertDoesNotThrow(() -> jobManager.pullAndUpdateJobStatus());
+    verify(entityStore, never()).put(any(), anyBoolean());
+
+    
when(jobExecutor.getJobStatus(job.jobExecutionId())).thenReturn(JobHandle.Status.SUCCEEDED);
+    Assertions.assertDoesNotThrow(() -> jobManager.pullAndUpdateJobStatus());
+    verify(entityStore, times(1)).put(any(JobEntity.class), anyBoolean());
+  }
+
+  @Test
+  public void testCleanUpStagingDirs() throws IOException, 
InterruptedException {
+    JobEntity job = newJobEntity("shell_job", JobHandle.Status.STARTED);
+    BaseMetalake mockMetalake =
+        BaseMetalake.builder()
+            .withName(metalake)
+            .withId(idGenerator.nextId())
+            .withVersion(SchemaVersion.V_0_1)
+            .withAuditInfo(AuditInfo.EMPTY)
+            .build();
+    when(entityStore.list(Namespace.empty(), BaseMetalake.class, 
Entity.EntityType.METALAKE))
+        .thenReturn(ImmutableList.of(mockMetalake));
+
+    when(jobManager.listJobs(metalake, 
Optional.empty())).thenReturn(ImmutableList.of(job));
+    Assertions.assertDoesNotThrow(() -> jobManager.cleanUpStagingDirs());
+    verify(entityStore, never()).delete(any(), any());
+
+    JobEntity finishedJob = newJobEntity("shell_job", 
JobHandle.Status.SUCCEEDED);
+    when(jobManager.listJobs(metalake, 
Optional.empty())).thenReturn(ImmutableList.of(finishedJob));
+
+    Awaitility.await()
+        .atMost(3, TimeUnit.SECONDS)
+        .until(
+            () -> {
+              Assertions.assertDoesNotThrow(() -> 
jobManager.cleanUpStagingDirs());
+              try {
+                verify(entityStore, times(1)).delete(any(), any());
+                return true;
+              } catch (Throwable e) {
+                return false;
+              }
+            });
+  }
+
   private static JobTemplateEntity newShellJobTemplateEntity(String name, 
String comment) {
     ShellJobTemplate shellJobTemplate =
         ShellJobTemplate.builder()
@@ -575,6 +644,7 @@ public class TestJobManager {
         .withJobExecutionId(rand.nextLong() + "")
         .withNamespace(NamespaceUtil.ofJob(metalake))
         .withJobTemplateName(templateName)
+        .withFinishedAt(System.currentTimeMillis())
         .withStatus(status)
         .withAuditInfo(
             
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
index 6c24addb95..e47cb50b6b 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
@@ -132,6 +132,17 @@ public class TestJobMetaService extends TestJDBCBackend {
         JobMetaService.getInstance()
             .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME, 
jobOverwrite.name()));
     Assertions.assertEquals(jobOverwrite, updatedJob);
+
+    // Test insert and get job with finishedAt
+    JobEntity finishedJob =
+        TestJobTemplateMetaService.newJobEntity(
+            jobTemplate.name(), JobHandle.Status.SUCCEEDED, METALAKE_NAME);
+    Assertions.assertDoesNotThrow(() -> 
JobMetaService.getInstance().insertJob(finishedJob, false));
+
+    JobEntity retrievedFinishedJob =
+        JobMetaService.getInstance()
+            .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME, 
finishedJob.name()));
+    Assertions.assertTrue(retrievedFinishedJob.finishedAt() > 0);
   }
 
   @Test
@@ -180,4 +191,35 @@ public class TestJobMetaService extends TestJDBCBackend {
         
JobMetaService.getInstance().listJobsByNamespace(NamespaceUtil.ofJob(METALAKE_NAME));
     Assertions.assertTrue(jobs.isEmpty(), "Jobs should be deleted by legacy 
timeline");
   }
+
+  @Test
+  public void testDeleteJobByIdentifier() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME, 
AUDIT_INFO);
+    backend.insert(metalake, false);
+
+    JobTemplateEntity jobTemplate =
+        TestJobTemplateMetaService.newShellJobTemplateEntity(
+            "test_job_template", "test_comment", METALAKE_NAME);
+    JobTemplateMetaService.getInstance().insertJobTemplate(jobTemplate, false);
+
+    JobEntity job =
+        TestJobTemplateMetaService.newJobEntity(
+            jobTemplate.name(), JobHandle.Status.QUEUED, METALAKE_NAME);
+    Assertions.assertDoesNotThrow(() -> 
JobMetaService.getInstance().insertJob(job, false));
+
+    JobEntity retrievedJob =
+        JobMetaService.getInstance()
+            .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME, 
job.name()));
+    Assertions.assertEquals(job, retrievedJob);
+
+    Assertions.assertTrue(
+        JobMetaService.getInstance()
+            .deleteJob(NameIdentifierUtil.ofJob(METALAKE_NAME, job.name())));
+
+    // Verify that the job is deleted
+    Assertions.assertFalse(
+        JobMetaService.getInstance()
+            .deleteJob(NameIdentifierUtil.ofJob(METALAKE_NAME, job.name())));
+  }
 }


Reply via email to