This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a7a77f8ea [#7937] test(core): Add ITs for job system (#8015)
0a7a77f8ea is described below

commit 0a7a77f8ea42ff6652137f76fae9b59776a6258b
Author: Jerry Shao <[email protected]>
AuthorDate: Wed Aug 13 19:34:28 2025 +0800

    [#7937] test(core): Add ITs for job system (#8015)
    
    ### What changes were proposed in this pull request?
    
    Add the integration tests to the job system, also fix the issues during
    the tests.
    
    ### Why are the changes needed?
    
    Fix: #7937
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add the tests.
---
 .../gravitino/client/integration/test/JobIT.java   | 356 +++++++++++++++++++++
 .../gravitino/client/gravitino_metalake.py         |  16 +-
 .../tests/integration/test_supports_jobs.py        | 327 +++++++++++++++++++
 conf/log4j2.properties.template                    |   2 +-
 .../gravitino/SupportsRelationOperations.java      |   2 -
 .../java/org/apache/gravitino/job/JobManager.java  |  32 +-
 .../gravitino/storage/relational/JDBCBackend.java  |   8 -
 .../storage/relational/service/JobMetaService.java |  40 ++-
 .../org/apache/gravitino/job/TestJobManager.java   |  21 +-
 .../relational/service/TestJobMetaService.java     |  11 +-
 .../gravitino/server/web/rest/JobOperations.java   |  14 +-
 11 files changed, 770 insertions(+), 59 deletions(-)

diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
new file mode 100644
index 0000000000..59043422aa
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.ShellJobTemplate;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+public class JobIT extends BaseIT {
+
+  private static final String METALAKE_NAME = 
GravitinoITUtils.genRandomName("job_it_metalake");
+
+  private File testStagingDir;
+  private String testEntryScriptPath;
+  private String testLibScriptPath;
+  private ShellJobTemplate.Builder builder;
+  private GravitinoMetalake metalake;
+
+  @BeforeAll
+  @Override
+  public void startIntegrationTest() throws Exception {
+    testStagingDir = Files.createTempDirectory("test_staging_dir").toFile();
+    testEntryScriptPath = generateTestEntryScript();
+    testLibScriptPath = generateTestLibScript();
+
+    builder =
+        ShellJobTemplate.builder()
+            .withComment("Test shell job template")
+            .withExecutable(testEntryScriptPath)
+            .withArguments(Lists.newArrayList("{{arg1}}", "{{arg2}}"))
+            .withEnvironments(ImmutableMap.of("ENV_VAR", "{{env_var}}"))
+            .withScripts(Lists.newArrayList(testLibScriptPath))
+            .withCustomFields(Collections.emptyMap());
+
+    Map<String, String> configs =
+        ImmutableMap.of(
+            "gravitino.job.stagingDir",
+            testStagingDir.getAbsolutePath(),
+            "gravitino.job.statusPullIntervalInMs",
+            "3000");
+    registerCustomConfigs(configs);
+    super.startIntegrationTest();
+  }
+
+  @AfterAll
+  public void tearDown() throws Exception {
+    FileUtils.deleteDirectory(testStagingDir);
+  }
+
+  @BeforeEach
+  public void setUp() {
+    // Create a metalake for testing jobs
+    metalake =
+        client.createMetalake(METALAKE_NAME, "metalake test for job", 
Collections.emptyMap());
+  }
+
+  @AfterEach
+  public void cleanUp() {
+    // Drop the metalake after each test
+    client.dropMetalake(METALAKE_NAME, true);
+  }
+
+  @Test
+  public void testRegisterAndListJobTemplates() {
+    JobTemplate template1 = builder.withName("test_1").build();
+    JobTemplate template2 = builder.withName("test_2").build();
+
+    Assertions.assertDoesNotThrow(() -> 
metalake.registerJobTemplate(template1));
+    Assertions.assertDoesNotThrow(() -> 
metalake.registerJobTemplate(template2));
+
+    List<JobTemplate> registeredTemplates = metalake.listJobTemplates();
+    Assertions.assertEquals(2, registeredTemplates.size());
+    Assertions.assertTrue(registeredTemplates.contains(template1));
+    Assertions.assertTrue(registeredTemplates.contains(template2));
+
+    // Test register duplicated job template
+    Assertions.assertThrows(
+        JobTemplateAlreadyExistsException.class, () -> 
metalake.registerJobTemplate(template1));
+  }
+
+  @Test
+  public void testRegisterAndGetJobTemplate() {
+    JobTemplate template = builder.withName("test_get").build();
+    Assertions.assertDoesNotThrow(() -> 
metalake.registerJobTemplate(template));
+
+    JobTemplate retrievedTemplate = metalake.getJobTemplate(template.name());
+    Assertions.assertEquals(template, retrievedTemplate);
+
+    // Test get non-existent job template
+    Assertions.assertThrows(
+        NoSuchJobTemplateException.class, () -> 
metalake.getJobTemplate("non_existent_template"));
+  }
+
+  @Test
+  public void testRegisterAndDeleteJobTemplate() {
+    JobTemplate template1 = builder.withName("test_1").build();
+    JobTemplate template2 = builder.withName("test_2").build();
+    Assertions.assertDoesNotThrow(() -> 
metalake.registerJobTemplate(template1));
+    Assertions.assertDoesNotThrow(() -> 
metalake.registerJobTemplate(template2));
+
+    List<JobTemplate> registeredTemplates = metalake.listJobTemplates();
+    Assertions.assertEquals(2, registeredTemplates.size());
+    Assertions.assertTrue(registeredTemplates.contains(template1));
+    Assertions.assertTrue(registeredTemplates.contains(template2));
+
+    JobTemplate result1 = metalake.getJobTemplate(template1.name());
+    JobTemplate result2 = metalake.getJobTemplate(template2.name());
+    Assertions.assertEquals(template1, result1);
+    Assertions.assertEquals(template2, result2);
+
+    // Delete the first job template
+    Assertions.assertTrue(metalake.deleteJobTemplate(template1.name()));
+    // Verify the first job template is deleted
+    Assertions.assertThrows(
+        NoSuchJobTemplateException.class, () -> 
metalake.getJobTemplate(template1.name()));
+    // Verify the second job template still exists
+    JobTemplate remainingTemplate = metalake.getJobTemplate(template2.name());
+    Assertions.assertEquals(template2, remainingTemplate);
+
+    // Verify the list of job templates after deletion
+    registeredTemplates = metalake.listJobTemplates();
+    Assertions.assertEquals(1, registeredTemplates.size());
+    Assertions.assertTrue(registeredTemplates.contains(template2));
+
+    // Test deleting a non-existent job template
+    Assertions.assertFalse(metalake.deleteJobTemplate(template1.name()));
+
+    // Delete the second job template
+    Assertions.assertTrue(metalake.deleteJobTemplate(template2.name()));
+
+    // Verify the second job template is deleted
+    Assertions.assertThrows(
+        NoSuchJobTemplateException.class, () -> 
metalake.getJobTemplate(template2.name()));
+
+    // Verify the list of job templates is empty after deleting both
+    registeredTemplates = metalake.listJobTemplates();
+    Assertions.assertTrue(registeredTemplates.isEmpty());
+  }
+
+  @Test
+  public void testRunAndListJobs() {
+    JobTemplate template = builder.withName("test_run").build();
+    Assertions.assertDoesNotThrow(() -> 
metalake.registerJobTemplate(template));
+
+    // Submit a job with success status
+    JobHandle jobHandle1 =
+        metalake.runJob(
+            template.name(),
+            ImmutableMap.of("arg1", "value1", "arg2", "success", "env_var", 
"value2"));
+    Assertions.assertEquals(JobHandle.Status.QUEUED, jobHandle1.jobStatus());
+    Assertions.assertEquals(template.name(), jobHandle1.jobTemplateName());
+
+    JobHandle jobHandle2 =
+        metalake.runJob(
+            template.name(),
+            ImmutableMap.of("arg1", "value3", "arg2", "success", "env_var", 
"value4"));
+    Assertions.assertEquals(JobHandle.Status.QUEUED, jobHandle2.jobStatus());
+    Assertions.assertEquals(template.name(), jobHandle2.jobTemplateName());
+
+    List<JobHandle> jobs = metalake.listJobs(template.name());
+    Assertions.assertEquals(2, jobs.size());
+    List<String> resultJobIds = 
jobs.stream().map(JobHandle::jobId).collect(Collectors.toList());
+    Assertions.assertTrue(resultJobIds.contains(jobHandle1.jobId()));
+    Assertions.assertTrue(resultJobIds.contains(jobHandle2.jobId()));
+
+    Awaitility.await()
+        .atMost(3, TimeUnit.MINUTES)
+        .until(
+            () -> {
+              JobHandle updatedJob1 = metalake.getJob(jobHandle1.jobId());
+              return updatedJob1.jobStatus() == JobHandle.Status.SUCCEEDED;
+            });
+
+    Awaitility.await()
+        .atMost(3, TimeUnit.MINUTES)
+        .until(
+            () -> {
+              JobHandle updatedJob2 = metalake.getJob(jobHandle2.jobId());
+              return updatedJob2.jobStatus() == JobHandle.Status.SUCCEEDED;
+            });
+
+    List<JobHandle> updatedJobs = metalake.listJobs(template.name());
+    Assertions.assertEquals(2, updatedJobs.size());
+    Set<JobHandle.Status> jobStatuses =
+        
updatedJobs.stream().map(JobHandle::jobStatus).collect(Collectors.toSet());
+    Assertions.assertEquals(1, jobStatuses.size());
+    Assertions.assertTrue(jobStatuses.contains(JobHandle.Status.SUCCEEDED));
+  }
+
+  @Test
+  public void testRunAndGetJob() {
+    JobTemplate template = builder.withName("test_run_get").build();
+    Assertions.assertDoesNotThrow(() -> 
metalake.registerJobTemplate(template));
+
+    // Submit a job with success status
+    JobHandle jobHandle =
+        metalake.runJob(
+            template.name(),
+            ImmutableMap.of("arg1", "value1", "arg2", "success", "env_var", 
"value2"));
+    Assertions.assertEquals(JobHandle.Status.QUEUED, jobHandle.jobStatus());
+    Assertions.assertEquals(template.name(), jobHandle.jobTemplateName());
+
+    Awaitility.await()
+        .atMost(3, TimeUnit.MINUTES)
+        .until(
+            () -> {
+              JobHandle updatedJob = metalake.getJob(jobHandle.jobId());
+              return updatedJob.jobStatus() == JobHandle.Status.SUCCEEDED;
+            });
+
+    JobHandle retrievedJob = metalake.getJob(jobHandle.jobId());
+    Assertions.assertEquals(jobHandle.jobId(), retrievedJob.jobId());
+    Assertions.assertEquals(JobHandle.Status.SUCCEEDED, 
retrievedJob.jobStatus());
+
+    // Test run a failed job
+    JobHandle failedJobHandle =
+        metalake.runJob(
+            template.name(),
+            ImmutableMap.of("arg1", "value1", "arg2", "fail", "env_var", 
"value2"));
+    Assertions.assertEquals(JobHandle.Status.QUEUED, 
failedJobHandle.jobStatus());
+
+    Awaitility.await()
+        .atMost(3, TimeUnit.MINUTES)
+        .until(
+            () -> {
+              JobHandle updatedFailedJob = 
metalake.getJob(failedJobHandle.jobId());
+              return updatedFailedJob.jobStatus() == JobHandle.Status.FAILED;
+            });
+
+    JobHandle retrievedFailedJob = metalake.getJob(failedJobHandle.jobId());
+    Assertions.assertEquals(failedJobHandle.jobId(), 
retrievedFailedJob.jobId());
+    Assertions.assertEquals(JobHandle.Status.FAILED, 
retrievedFailedJob.jobStatus());
+
+    // Test get a non-existent job
+    Assertions.assertThrows(NoSuchJobException.class, () -> 
metalake.getJob("non_existent_job_id"));
+  }
+
+  @Test
+  public void testRunAndCancelJob() {
+    JobTemplate template = builder.withName("test_run_cancel").build();
+    Assertions.assertDoesNotThrow(() -> 
metalake.registerJobTemplate(template));
+
+    // Submit a job with success status
+    JobHandle jobHandle =
+        metalake.runJob(
+            template.name(),
+            ImmutableMap.of("arg1", "value1", "arg2", "success", "env_var", 
"value2"));
+    Assertions.assertEquals(JobHandle.Status.QUEUED, jobHandle.jobStatus());
+    Assertions.assertEquals(template.name(), jobHandle.jobTemplateName());
+
+    // Cancel the job
+    metalake.cancelJob(jobHandle.jobId());
+
+    Awaitility.await()
+        .atMost(3, TimeUnit.MINUTES)
+        .until(
+            () -> {
+              JobHandle updatedJob = metalake.getJob(jobHandle.jobId());
+              return updatedJob.jobStatus() == JobHandle.Status.CANCELLED;
+            });
+
+    JobHandle retrievedJob = metalake.getJob(jobHandle.jobId());
+    Assertions.assertEquals(jobHandle.jobId(), retrievedJob.jobId());
+    Assertions.assertEquals(JobHandle.Status.CANCELLED, 
retrievedJob.jobStatus());
+
+    // Test cancel a non-existent job
+    Assertions.assertThrows(
+        NoSuchJobException.class, () -> 
metalake.cancelJob("non_existent_job_id"));
+  }
+
+  private String generateTestEntryScript() {
+    String content =
+        "#!/bin/bash\n"
+            + "echo \"starting test test job\"\n\n"
+            + "bin=\"$(dirname \"${BASH_SOURCE-$0}\")\"\n"
+            + "bin=\"$(cd \"${bin}\">/dev/null; pwd)\"\n\n"
+            + ". \"${bin}/common.sh\"\n\n"
+            + "sleep 3\n\n"
+            + "JOB_NAME=\"test_job-$(date +%s)-$1\"\n\n"
+            + "echo \"Submitting job with name: $JOB_NAME\"\n\n"
+            + "echo \"$1\"\n\n"
+            + "echo \"$2\"\n\n"
+            + "echo \"$ENV_VAR\"\n\n"
+            + "if [[ \"$2\" == \"success\" ]]; then\n"
+            + "  exit 0\n"
+            + "elif [[ \"$2\" == \"fail\" ]]; then\n"
+            + "  exit 1\n"
+            + "else\n"
+            + "  exit 2\n"
+            + "fi\n";
+
+    // save the script to a file
+    try {
+      File scriptFile = new File(testStagingDir, "test-job.sh");
+      Files.writeString(scriptFile.toPath(), content);
+      scriptFile.setExecutable(true);
+      return scriptFile.getAbsolutePath();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create test entry script", e);
+    }
+  }
+
+  private String generateTestLibScript() {
+    String content = "#!/bin/bash\necho \"in common script\"\n";
+
+    // save the script to a file
+    try {
+      File scriptFile = new File(testStagingDir, "common.sh");
+      Files.writeString(scriptFile.toPath(), content);
+      scriptFile.setExecutable(true);
+      return scriptFile.getAbsolutePath();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create test lib script", e);
+    }
+  }
+}
diff --git a/clients/client-python/gravitino/client/gravitino_metalake.py 
b/clients/client-python/gravitino/client/gravitino_metalake.py
index b85fa41c7b..3d7d7f4cfa 100644
--- a/clients/client-python/gravitino/client/gravitino_metalake.py
+++ b/clients/client-python/gravitino/client/gravitino_metalake.py
@@ -322,7 +322,7 @@ class GravitinoMetalake(MetalakeDTO, SupportsJobs):
             raise ValueError("Job template name cannot be null or empty")
 
         url = (
-            
f"{self.API_METALAKES_JOB_TEMPLATES_PATH}/{encode_string(self.name())}/"
+            
f"{self.API_METALAKES_JOB_TEMPLATES_PATH.format(encode_string(self.name()))}/"
             f"{encode_string(job_template_name)}"
         )
         response = self.rest_client.get(url, error_handler=JOB_ERROR_HANDLER)
@@ -355,7 +355,7 @@ class GravitinoMetalake(MetalakeDTO, SupportsJobs):
             raise ValueError("Job template name cannot be null or empty")
 
         url = (
-            
f"{self.API_METALAKES_JOB_TEMPLATES_PATH}/{encode_string(self.name())}/"
+            
f"{self.API_METALAKES_JOB_TEMPLATES_PATH.format(encode_string(self.name()))}/"
             f"{encode_string(job_template_name)}"
         )
         response = self.rest_client.delete(url, 
error_handler=JOB_ERROR_HANDLER)
@@ -403,7 +403,10 @@ class GravitinoMetalake(MetalakeDTO, SupportsJobs):
         if not job_id or not job_id.strip():
             raise ValueError("Job ID cannot be null or empty")
 
-        url = 
f"{self.API_METALAKES_JOB_RUNS_PATH}/{encode_string(self.name())}/{encode_string(job_id)}"
+        url = (
+            
f"{self.API_METALAKES_JOB_RUNS_PATH.format(encode_string(self.name()))}"
+            f"/{encode_string(job_id)}"
+        )
         response = self.rest_client.get(url, error_handler=JOB_ERROR_HANDLER)
         resp = JobResponse.from_json(response.body, infer_missing=True)
         resp.validate()
@@ -423,7 +426,7 @@ class GravitinoMetalake(MetalakeDTO, SupportsJobs):
         Raises:
             NoSuchJobTemplateException: If no job template with the specified 
name exists.
         """
-        url = 
f"{self.API_METALAKES_JOB_RUNS_PATH}/{encode_string(self.name())}"
+        url = 
f"{self.API_METALAKES_JOB_RUNS_PATH.format(encode_string(self.name()))}"
         request = JobRunRequest(job_template_name, job_conf)
 
         response = self.rest_client.post(
@@ -449,7 +452,10 @@ class GravitinoMetalake(MetalakeDTO, SupportsJobs):
         if not job_id or not job_id.strip():
             raise ValueError("Job ID cannot be null or empty")
 
-        url = 
f"{self.API_METALAKES_JOB_RUNS_PATH}/{encode_string(self.name())}/{encode_string(job_id)}"
+        url = (
+            
f"{self.API_METALAKES_JOB_RUNS_PATH.format(encode_string(self.name()))}"
+            f"/{encode_string(job_id)}"
+        )
         response = self.rest_client.post(url, error_handler=JOB_ERROR_HANDLER)
         resp = JobResponse.from_json(response.body, infer_missing=True)
         resp.validate()
diff --git a/clients/client-python/tests/integration/test_supports_jobs.py 
b/clients/client-python/tests/integration/test_supports_jobs.py
new file mode 100644
index 0000000000..2235050b58
--- /dev/null
+++ b/clients/client-python/tests/integration/test_supports_jobs.py
@@ -0,0 +1,327 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import shutil
+import tempfile
+import time
+from pathlib import Path
+from random import randint
+from time import sleep
+
+from gravitino import GravitinoAdminClient, GravitinoMetalake
+from gravitino.api.job.job_handle import JobHandle
+from gravitino.api.job.shell_job_template import ShellJobTemplate
+from gravitino.exceptions.base import (
+    JobTemplateAlreadyExistsException,
+    NoSuchJobTemplateException,
+    NoSuchJobException,
+)
+from tests.integration.integration_test_env import IntegrationTestEnv
+
+
+class TestSupportsJobs(IntegrationTestEnv):
+    _metalake_name: str = "job_it_metalake" + str(randint(0, 1000))
+
+    _gravitino_admin_client: GravitinoAdminClient = None
+    _metalake: GravitinoMetalake = None
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+
+        cls.test_staging_dir = tempfile.mkdtemp()
+        cls.test_entry_script_path = cls.generate_test_entry_script()
+        cls.test_lib_script_path = cls.generate_test_lib_script()
+
+        cls.builder = (
+            ShellJobTemplate.builder()
+            .with_comment("Test shell job template")
+            .with_executable(cls.test_entry_script_path)
+            .with_arguments(["{{arg1}}", "{{arg2}}"])
+            .with_environments({"ENV_VAR": "{{env_var}}"})
+            .with_scripts([cls.test_lib_script_path])
+            .with_custom_fields({})
+        )
+        cls.configs = {
+            "gravitino.job.stagingDir": cls.test_staging_dir,
+            "gravitino.job.statusPullIntervalInMs": "3000",
+        }
+
+        cls._get_gravitino_home()
+        cls._append_conf(cls.configs, 
f"{cls.gravitino_home}/conf/gravitino.conf")
+        cls.restart_server()
+
+        cls._gravitino_admin_client = 
GravitinoAdminClient(uri="http://localhost:8090";)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.test_staging_dir)
+
+    def setUp(self):
+        self._metalake = self._gravitino_admin_client.create_metalake(
+            self._metalake_name, comment="test metalake", properties={}
+        )
+
+    def tearDown(self):
+        self._gravitino_admin_client.drop_metalake(self._metalake_name, 
force=True)
+
+    def test_register_and_list_job_templates(self):
+        template_1 = self.builder.with_name("test_1").build()
+        template_2 = self.builder.with_name("test_2").build()
+
+        # Assert no exceptions are raised when registering job templates
+        self.assertIsNone(self._metalake.register_job_template(template_1))
+        self.assertIsNone(self._metalake.register_job_template(template_2))
+
+        # List registered job templates
+        registered_templates = self._metalake.list_job_templates()
+        self.assertEqual(len(registered_templates), 2)
+        self.assertIn(template_1, registered_templates)
+        self.assertIn(template_2, registered_templates)
+
+        # Test registering a duplicate job template
+        with self.assertRaises(JobTemplateAlreadyExistsException):
+            self._metalake.register_job_template(template_1)
+
+    def test_register_and_get_job_template(self):
+        template = self.builder.with_name("test_get").build()
+
+        # Assert no exceptions are raised when registering the job template
+        self.assertIsNone(self._metalake.register_job_template(template))
+
+        # Retrieve the registered job template
+        retrieved_template = self._metalake.get_job_template(template.name)
+        self.assertEqual(template, retrieved_template)
+
+        # Test retrieving a non-existent job template
+        with self.assertRaises(NoSuchJobTemplateException):
+            self._metalake.get_job_template("non_existent_template")
+
+    def test_register_and_delete_job_template(self):
+        template1 = self.builder.with_name("test_1").build()
+        template2 = self.builder.with_name("test_2").build()
+
+        # Assert no exceptions are raised when registering job templates
+        self.assertIsNone(self._metalake.register_job_template(template1))
+        self.assertIsNone(self._metalake.register_job_template(template2))
+
+        # List registered job templates
+        registered_templates = self._metalake.list_job_templates()
+        self.assertEqual(len(registered_templates), 2)
+        self.assertIn(template1, registered_templates)
+        self.assertIn(template2, registered_templates)
+
+        # Retrieve and verify the registered job templates
+        result1 = self._metalake.get_job_template(template1.name)
+        result2 = self._metalake.get_job_template(template2.name)
+        self.assertEqual(template1, result1)
+        self.assertEqual(template2, result2)
+
+        # Delete the first job template
+        self.assertTrue(self._metalake.delete_job_template(template1.name))
+
+        # Verify the first job template is deleted
+        with self.assertRaises(NoSuchJobTemplateException):
+            self._metalake.get_job_template(template1.name)
+
+        # Verify the second job template still exists
+        remaining_template = self._metalake.get_job_template(template2.name)
+        self.assertEqual(template2, remaining_template)
+
+        # Verify the list of job templates after deletion
+        registered_templates = self._metalake.list_job_templates()
+        self.assertEqual(len(registered_templates), 1)
+        self.assertIn(template2, registered_templates)
+
+        # Test deleting a non-existent job template
+        self.assertFalse(self._metalake.delete_job_template(template1.name))
+
+        # Delete the second job template
+        self.assertTrue(self._metalake.delete_job_template(template2.name))
+
+        # Verify the second job template is deleted
+        with self.assertRaises(NoSuchJobTemplateException):
+            self._metalake.get_job_template(template2.name)
+
+        # Verify the list of job templates is empty after deleting both
+        registered_templates = self._metalake.list_job_templates()
+        self.assertTrue(len(registered_templates) == 0)
+
+    def test_run_and_list_jobs(self):
+        template = self.builder.with_name("test_run").build()
+        self._metalake.register_job_template(template)
+
+        # Submit jobs
+        job_handle1 = self._metalake.run_job(
+            template.name, {"arg1": "value1", "arg2": "success", "env_var": 
"value2"}
+        )
+        self.assertEqual(job_handle1.job_status(), JobHandle.Status.QUEUED)
+        self.assertEqual(job_handle1.job_template_name(), template.name)
+
+        job_handle2 = self._metalake.run_job(
+            template.name, {"arg1": "value3", "arg2": "success", "env_var": 
"value4"}
+        )
+        self.assertEqual(job_handle2.job_status(), JobHandle.Status.QUEUED)
+        self.assertEqual(job_handle2.job_template_name(), template.name)
+
+        # List jobs
+        jobs = self._metalake.list_jobs(template.name)
+        self.assertEqual(len(jobs), 2)
+        job_ids = [job.job_id() for job in jobs]
+        self.assertIn(job_handle1.job_id(), job_ids)
+        self.assertIn(job_handle2.job_id(), job_ids)
+
+        # Wait for jobs to complete
+        self._wait_until(
+            lambda: self._metalake.get_job(job_handle1.job_id()).job_status()
+            == JobHandle.Status.SUCCEEDED,
+            timeout=180,
+        )
+
+        self._wait_until(
+            lambda: self._metalake.get_job(job_handle2.job_id()).job_status()
+            == JobHandle.Status.SUCCEEDED,
+            timeout=180,
+        )
+
+        updated_jobs = self._metalake.list_jobs(template.name)
+        self.assertEqual(len(updated_jobs), 2)
+        job_statuses = {job.job_status() for job in updated_jobs}
+        self.assertEqual(len(job_statuses), 1)
+        self.assertIn(JobHandle.Status.SUCCEEDED, job_statuses)
+
+    def test_run_and_get_job(self):
+        template = self.builder.with_name("test_run_get").build()
+        self._metalake.register_job_template(template)
+
+        # Submit a job
+        job_handle = self._metalake.run_job(
+            template.name, {"arg1": "value1", "arg2": "success", "env_var": 
"value2"}
+        )
+        self.assertEqual(job_handle.job_status(), JobHandle.Status.QUEUED)
+        self.assertEqual(job_handle.job_template_name(), template.name)
+
+        # Wait for job to complete
+        self._wait_until(
+            lambda: self._metalake.get_job(job_handle.job_id()).job_status()
+            == JobHandle.Status.SUCCEEDED,
+            timeout=180,
+        )
+        retrieved_job = self._metalake.get_job(job_handle.job_id())
+        self.assertEqual(job_handle.job_id(), retrieved_job.job_id())
+        self.assertEqual(JobHandle.Status.SUCCEEDED, 
retrieved_job.job_status())
+
+        # Test failed job
+        failed_job_handle = self._metalake.run_job(
+            template.name, {"arg1": "value1", "arg2": "fail", "env_var": 
"value2"}
+        )
+        self.assertEqual(failed_job_handle.job_status(), 
JobHandle.Status.QUEUED)
+
+        self._wait_until(
+            lambda: 
self._metalake.get_job(failed_job_handle.job_id()).job_status()
+            == JobHandle.Status.FAILED,
+            timeout=180,
+        )
+        retrieved_failed_job = 
self._metalake.get_job(failed_job_handle.job_id())
+        self.assertEqual(failed_job_handle.job_id(), 
retrieved_failed_job.job_id())
+        self.assertEqual(JobHandle.Status.FAILED, 
retrieved_failed_job.job_status())
+
+        # Test non-existent job
+        with self.assertRaises(NoSuchJobException):
+            self._metalake.get_job("non_existent_job_id")
+
+    def test_run_and_cancel_job(self):
+        template = self.builder.with_name("test_run_cancel").build()
+        self._metalake.register_job_template(template)
+
+        # Submit a job
+        job_handle = self._metalake.run_job(
+            template.name, {"arg1": "value1", "arg2": "success", "env_var": 
"value2"}
+        )
+        self.assertEqual(job_handle.job_status(), JobHandle.Status.QUEUED)
+        self.assertEqual(job_handle.job_template_name(), template.name)
+
+        sleep(1)
+
+        # Cancel the job
+        self._metalake.cancel_job(job_handle.job_id())
+
+        self._wait_until(
+            lambda: self._metalake.get_job(job_handle.job_id()).job_status()
+            == JobHandle.Status.CANCELLED,
+            timeout=180,
+        )
+
+        retrieved_job = self._metalake.get_job(job_handle.job_id())
+        self.assertEqual(job_handle.job_id(), retrieved_job.job_id())
+        self.assertEqual(JobHandle.Status.CANCELLED, 
retrieved_job.job_status())
+
+        # Test cancel non-existent job
+        with self.assertRaises(NoSuchJobException):
+            self._metalake.cancel_job("non_existent_job_id")
+
+    def _wait_until(self, condition, timeout=180, interval=0.5):
+        end_time = time.time() + timeout
+        while time.time() < end_time:
+            if condition():
+                return True
+            time.sleep(interval)
+        raise TimeoutError(f"Condition not met within {timeout} seconds")
+
+    @classmethod
+    def generate_test_entry_script(cls):
+        content = """#!/bin/bash
+echo "starting test test job"
+
+bin="$(dirname "${BASH_SOURCE-$0}")"
+bin="$(cd "${bin}">/dev/null; pwd)"
+
+. "${bin}/common.sh"
+
+sleep 3
+
+JOB_NAME="test_job-$(date +%s)-$1"
+
+echo "Submitting job with name: $JOB_NAME"
+
+echo "$1"
+
+echo "$2"
+
+echo "$ENV_VAR"
+
+if [[ "$2" == "success" ]]; then
+  exit 0
+elif [[ "$2" == "fail" ]]; then
+  exit 1
+else
+  exit 2
+fi
+"""
+        script_path = Path(cls.test_staging_dir) / "test-job.sh"
+        script_path.write_text(content, encoding="utf-8")
+        script_path.chmod(0o755)
+        return str(script_path)
+
+    @classmethod
+    def generate_test_lib_script(cls):
+        content = """#!/bin/bash
+echo "in common script"
+"""
+        script_path = Path(cls.test_staging_dir) / "common.sh"
+        script_path.write_text(content, encoding="utf-8")
+        script_path.chmod(0o755)
+        return str(script_path)
diff --git a/conf/log4j2.properties.template b/conf/log4j2.properties.template
index 074b11fd7d..c3f31f5268 100644
--- a/conf/log4j2.properties.template
+++ b/conf/log4j2.properties.template
@@ -48,7 +48,7 @@ appender.rolling.strategy.delete.ifLastModified.type = 
IfLastModified
 appender.rolling.strategy.delete.ifLastModified.age = 30d
 
 
-## use seperate file for lineage log
+## use separate file for lineage log
 appender.lineage_file.type=RollingFile
 appender.lineage_file.name=lineage_file
 appender.lineage_file.fileName=${basePath}/gravitino_lineage.log
diff --git 
a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java 
b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
index 8aea5b2384..f64b60123d 100644
--- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
+++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
@@ -38,8 +38,6 @@ public interface SupportsRelationOperations {
     ROLE_USER_REL,
     /** Role and group relationship */
     ROLE_GROUP_REL,
-    /** Job template and job relationship */
-    JOB_TEMPLATE_JOB_REL,
     /** Policy and metadata object relationship */
     POLICY_METADATA_OBJECT_REL,
   }
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java 
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index e32bb3752c..331afb43f9 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -39,6 +39,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
@@ -47,7 +48,6 @@ import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
-import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.connector.job.JobExecutor;
 import org.apache.gravitino.exceptions.InUseException;
 import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
@@ -252,7 +252,14 @@ public class JobManager implements JobOperationDispatcher {
   public boolean deleteJobTemplate(String metalake, String jobTemplateName) 
throws InUseException {
     checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
 
-    List<JobEntity> jobs = listJobs(metalake, Optional.of(jobTemplateName));
+    List<JobEntity> jobs;
+    try {
+      jobs = listJobs(metalake, Optional.of(jobTemplateName));
+    } catch (NoSuchJobTemplateException e) {
+      // If the job template does not exist, we can safely return false.
+      return false;
+    }
+
     boolean hasActiveJobs =
         jobs.stream()
             .anyMatch(
@@ -313,6 +320,13 @@ public class JobManager implements JobOperationDispatcher {
               NameIdentifier jobTemplateIdent =
                   NameIdentifierUtil.ofJobTemplate(metalake, 
jobTemplateName.get());
 
+              // If jobTemplateName is present, we need to list the jobs 
associated with the job.
+              // Using a mock namespace from job template identifier to get 
the jobs associated
+              // with job template.
+              String[] elements =
+                  ArrayUtils.add(jobTemplateIdent.namespace().levels(), 
jobTemplateIdent.name());
+              Namespace jobTemplateIdentNs = Namespace.of(elements);
+
               // Lock the job template to ensure no concurrent 
modifications/deletions
               jobEntities =
                   TreeLockUtils.doWithTreeLock(
@@ -320,12 +334,8 @@ public class JobManager implements JobOperationDispatcher {
                       LockType.READ,
                       () ->
                           // List all the jobs associated with the job template
-                          entityStore
-                              .relationOperations()
-                              .listEntitiesByRelation(
-                                  
SupportsRelationOperations.Type.JOB_TEMPLATE_JOB_REL,
-                                  jobTemplateIdent,
-                                  Entity.EntityType.JOB_TEMPLATE));
+                          entityStore.list(
+                              jobTemplateIdentNs, JobEntity.class, 
Entity.EntityType.JOB));
             } else {
               jobEntities = entityStore.list(jobNs, JobEntity.class, 
Entity.EntityType.JOB);
             }
@@ -527,6 +537,12 @@ public class JobManager implements JobOperationDispatcher {
                           e);
                     }
                   });
+
+              LOG.info(
+                  "Updated the job {} with execution id {} status to {}",
+                  job.name(),
+                  job.jobExecutionId(),
+                  newStatus);
             }
           });
     }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index bfe3ac72a9..a72182d5df 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -508,14 +508,6 @@ public class JDBCBackend implements RelationalBackend {
               String.format("ROLE_USER_REL doesn't support type %s", 
identType.name()));
         }
 
-      case JOB_TEMPLATE_JOB_REL:
-        if (identType == Entity.EntityType.JOB_TEMPLATE) {
-          return (List<E>) 
JobMetaService.getInstance().listJobsByTemplateIdent(nameIdentifier);
-        } else {
-          throw new IllegalArgumentException(
-              String.format("JOB_TEMPLATE_JOB_REL doesn't support type %s", 
identType.name()));
-        }
-
       case POLICY_METADATA_OBJECT_REL:
         if (identType == Entity.EntityType.POLICY) {
           return (List<E>)
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
index 485dba11b7..86af63c1fa 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
@@ -25,6 +25,7 @@ import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.IllegalNamespaceException;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.job.JobHandle;
 import org.apache.gravitino.meta.JobEntity;
@@ -48,22 +49,29 @@ public class JobMetaService {
 
   public List<JobEntity> listJobsByNamespace(Namespace ns) {
     String metalakeName = ns.level(0);
-    List<JobPO> jobPOs =
-        SessionUtils.getWithoutCommit(
-            JobMetaMapper.class, mapper -> 
mapper.listJobPOsByMetalake(metalakeName));
-    return jobPOs.stream().map(po -> JobPO.fromJobPO(po, 
ns)).collect(Collectors.toList());
-  }
-
-  public List<JobEntity> listJobsByTemplateIdent(NameIdentifier 
jobTemplateIdent) {
-    String metalakeName = jobTemplateIdent.namespace().level(0);
-    String jobTemplateName = jobTemplateIdent.name();
-    List<JobPO> jobPOs =
-        SessionUtils.getWithoutCommit(
-            JobMetaMapper.class,
-            mapper -> mapper.listJobPOsByMetalakeAndTemplate(metalakeName, 
jobTemplateName));
-    return jobPOs.stream()
-        .map(po -> JobPO.fromJobPO(po, NamespaceUtil.ofJob(metalakeName)))
-        .collect(Collectors.toList());
+    if (ns.length() == 3) {
+      // If the namespace is job namespace, we will list all the jobs in the 
metalake.
+      List<JobPO> jobPOs =
+          SessionUtils.getWithoutCommit(
+              JobMetaMapper.class, mapper -> 
mapper.listJobPOsByMetalake(metalakeName));
+      return jobPOs.stream().map(po -> JobPO.fromJobPO(po, 
ns)).collect(Collectors.toList());
+
+    } else if (ns.length() == 4) {
+      // If the namespace is generated by a job template identifier, we will 
list all the jobs
+      // associate with the job template.
+      NameIdentifier jobTemplateIdent = NameIdentifier.of(ns.levels());
+      String jobTemplateName = jobTemplateIdent.name();
+      List<JobPO> jobPOs =
+          SessionUtils.getWithoutCommit(
+              JobMetaMapper.class,
+              mapper -> mapper.listJobPOsByMetalakeAndTemplate(metalakeName, 
jobTemplateName));
+      return jobPOs.stream()
+          .map(po -> JobPO.fromJobPO(po, NamespaceUtil.ofJob(metalakeName)))
+          .collect(Collectors.toList());
+
+    } else {
+      throw new IllegalNamespaceException("Invalid namespace for listing jobs: 
%s", ns);
+    }
   }
 
   public JobEntity getJobByIdentifier(NameIdentifier ident) {
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java 
b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
index 1070af6bbd..a95cec8466 100644
--- a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
@@ -40,6 +40,7 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
@@ -49,7 +50,6 @@ import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
-import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.connector.job.JobExecutor;
 import org.apache.gravitino.exceptions.InUseException;
 import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
@@ -361,18 +361,11 @@ public class TestJobManager {
     JobEntity job1 = newJobEntity("shell_job", JobHandle.Status.QUEUED);
     JobEntity job2 = newJobEntity("spark_job", JobHandle.Status.QUEUED);
 
-    SupportsRelationOperations supportsRelationOperations =
-        Mockito.mock(SupportsRelationOperations.class);
-    when(supportsRelationOperations.listEntitiesByRelation(
-            SupportsRelationOperations.Type.JOB_TEMPLATE_JOB_REL,
-            NameIdentifierUtil.ofJobTemplate(metalake, 
shellJobTemplate.name()),
-            Entity.EntityType.JOB_TEMPLATE))
+    String[] levels =
+        ArrayUtils.add(shellJobTemplate.namespace().levels(), 
shellJobTemplate.name());
+    Namespace jobTemplateIdentNs = Namespace.of(levels);
+    when(entityStore.list(jobTemplateIdentNs, JobEntity.class, 
Entity.EntityType.JOB))
         .thenReturn(Lists.newArrayList(job1));
-    
when(entityStore.relationOperations()).thenReturn(supportsRelationOperations);
-
-    // Mock the listJobs method to return a list of jobs associated with the 
job template
-    when(entityStore.list(NamespaceUtil.ofJob(metalake), JobEntity.class, 
Entity.EntityType.JOB))
-        .thenReturn(Lists.newArrayList(job1, job2));
 
     List<JobEntity> jobs = jobManager.listJobs(metalake, 
Optional.of(shellJobTemplate.name()));
     Assertions.assertEquals(1, jobs.size());
@@ -380,6 +373,10 @@ public class TestJobManager {
     Assertions.assertFalse(jobs.contains(job2));
 
     // List all jobs without filtering by job template
+    // Mock the listJobs method to return a list of jobs associated with the 
job template
+    when(entityStore.list(NamespaceUtil.ofJob(metalake), JobEntity.class, 
Entity.EntityType.JOB))
+        .thenReturn(Lists.newArrayList(job1, job2));
+
     jobs = jobManager.listJobs(metalake, Optional.empty());
     Assertions.assertEquals(2, jobs.size());
     Assertions.assertTrue(jobs.contains(job1));
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
index e47cb50b6b..4fd59ef4df 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
@@ -21,7 +21,9 @@ package org.apache.gravitino.storage.relational.service;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.List;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.job.JobHandle;
 import org.apache.gravitino.meta.AuditInfo;
@@ -70,17 +72,18 @@ public class TestJobMetaService extends TestJDBCBackend {
     Assertions.assertTrue(jobs.contains(job2));
 
     // Test listing jobs by job template identifier
+    String[] levels = ArrayUtils.add(jobTemplate.namespace().levels(), 
jobTemplate.name());
+    Namespace jobTemplateIdentNs = Namespace.of(levels);
     List<JobEntity> jobsByTemplate =
-        
JobMetaService.getInstance().listJobsByTemplateIdent(jobTemplate.nameIdentifier());
+        JobMetaService.getInstance().listJobsByNamespace(jobTemplateIdentNs);
     Assertions.assertEquals(2, jobsByTemplate.size());
     Assertions.assertTrue(jobsByTemplate.contains(job1));
     Assertions.assertTrue(jobsByTemplate.contains(job2));
 
     // Test listing jobs by non-existing template identifier
+    levels = ArrayUtils.add(jobTemplate.namespace().levels(), 
"non_existing_template");
     List<JobEntity> emptyJobs =
-        JobMetaService.getInstance()
-            .listJobsByTemplateIdent(
-                NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, 
"non_existing_template"));
+        JobMetaService.getInstance().listJobsByNamespace(Namespace.of(levels));
     Assertions.assertTrue(emptyJobs.isEmpty());
   }
 
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java 
b/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
index d1daed2a77..54ea3c1365 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
@@ -122,7 +122,10 @@ public class JobOperations {
   @ResponseMetered(name = "register-job-template", absolute = true)
   public Response registerJobTemplate(
       @PathParam("metalake") String metalake, JobTemplateRegisterRequest 
request) {
-    LOG.info("Received request to register job template in metalake: {}", 
metalake);
+    LOG.info(
+        "Received request to register job template {} in metalake: {}",
+        request.getJobTemplate().name(),
+        metalake);
 
     try {
       return Utils.doAs(
@@ -257,7 +260,8 @@ public class JobOperations {
   @Timed(name = "run-job." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "run-job", absolute = true)
   public Response runJob(@PathParam("metalake") String metalake, JobRunRequest 
request) {
-    LOG.info("Received request to run job in metalake: {}", metalake);
+    LOG.info(
+        "Received request to run job {} in metalake: {}", 
request.getJobTemplateName(), metalake);
 
     try {
       return Utils.doAs(
@@ -270,7 +274,11 @@ public class JobOperations {
             JobEntity jobEntity =
                 jobOperationDispatcher.runJob(metalake, 
request.getJobTemplateName(), jobConf);
 
-            LOG.info("Run job {} in metalake: {}", jobEntity.name(), metalake);
+            LOG.info(
+                "Run job[{}] {} in metalake: {}",
+                jobEntity.jobTemplateName(),
+                jobEntity.name(),
+                metalake);
             return Utils.ok(new JobResponse(toDTO(jobEntity)));
           });
 


Reply via email to