This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 0a7a77f8ea [#7937] test(core): Add ITs for job system (#8015)
0a7a77f8ea is described below
commit 0a7a77f8ea42ff6652137f76fae9b59776a6258b
Author: Jerry Shao <[email protected]>
AuthorDate: Wed Aug 13 19:34:28 2025 +0800
[#7937] test(core): Add ITs for job system (#8015)
### What changes were proposed in this pull request?
Add the integration tests to the job system, also fix the issues during
the tests.
### Why are the changes needed?
Fix: #7937
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add the tests.
---
.../gravitino/client/integration/test/JobIT.java | 356 +++++++++++++++++++++
.../gravitino/client/gravitino_metalake.py | 16 +-
.../tests/integration/test_supports_jobs.py | 327 +++++++++++++++++++
conf/log4j2.properties.template | 2 +-
.../gravitino/SupportsRelationOperations.java | 2 -
.../java/org/apache/gravitino/job/JobManager.java | 32 +-
.../gravitino/storage/relational/JDBCBackend.java | 8 -
.../storage/relational/service/JobMetaService.java | 40 ++-
.../org/apache/gravitino/job/TestJobManager.java | 21 +-
.../relational/service/TestJobMetaService.java | 11 +-
.../gravitino/server/web/rest/JobOperations.java | 14 +-
11 files changed, 770 insertions(+), 59 deletions(-)
diff --git
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
new file mode 100644
index 0000000000..59043422aa
--- /dev/null
+++
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.ShellJobTemplate;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+public class JobIT extends BaseIT {
+
+ private static final String METALAKE_NAME =
GravitinoITUtils.genRandomName("job_it_metalake");
+
+ private File testStagingDir;
+ private String testEntryScriptPath;
+ private String testLibScriptPath;
+ private ShellJobTemplate.Builder builder;
+ private GravitinoMetalake metalake;
+
+ @BeforeAll
+ @Override
+ public void startIntegrationTest() throws Exception {
+ testStagingDir = Files.createTempDirectory("test_staging_dir").toFile();
+ testEntryScriptPath = generateTestEntryScript();
+ testLibScriptPath = generateTestLibScript();
+
+ builder =
+ ShellJobTemplate.builder()
+ .withComment("Test shell job template")
+ .withExecutable(testEntryScriptPath)
+ .withArguments(Lists.newArrayList("{{arg1}}", "{{arg2}}"))
+ .withEnvironments(ImmutableMap.of("ENV_VAR", "{{env_var}}"))
+ .withScripts(Lists.newArrayList(testLibScriptPath))
+ .withCustomFields(Collections.emptyMap());
+
+ Map<String, String> configs =
+ ImmutableMap.of(
+ "gravitino.job.stagingDir",
+ testStagingDir.getAbsolutePath(),
+ "gravitino.job.statusPullIntervalInMs",
+ "3000");
+ registerCustomConfigs(configs);
+ super.startIntegrationTest();
+ }
+
+ @AfterAll
+ public void tearDown() throws Exception {
+ FileUtils.deleteDirectory(testStagingDir);
+ }
+
+ @BeforeEach
+ public void setUp() {
+ // Create a metalake for testing jobs
+ metalake =
+ client.createMetalake(METALAKE_NAME, "metalake test for job",
Collections.emptyMap());
+ }
+
+ @AfterEach
+ public void cleanUp() {
+ // Drop the metalake after each test
+ client.dropMetalake(METALAKE_NAME, true);
+ }
+
+ @Test
+ public void testRegisterAndListJobTemplates() {
+ JobTemplate template1 = builder.withName("test_1").build();
+ JobTemplate template2 = builder.withName("test_2").build();
+
+ Assertions.assertDoesNotThrow(() ->
metalake.registerJobTemplate(template1));
+ Assertions.assertDoesNotThrow(() ->
metalake.registerJobTemplate(template2));
+
+ List<JobTemplate> registeredTemplates = metalake.listJobTemplates();
+ Assertions.assertEquals(2, registeredTemplates.size());
+ Assertions.assertTrue(registeredTemplates.contains(template1));
+ Assertions.assertTrue(registeredTemplates.contains(template2));
+
+ // Test register duplicated job template
+ Assertions.assertThrows(
+ JobTemplateAlreadyExistsException.class, () ->
metalake.registerJobTemplate(template1));
+ }
+
+ @Test
+ public void testRegisterAndGetJobTemplate() {
+ JobTemplate template = builder.withName("test_get").build();
+ Assertions.assertDoesNotThrow(() ->
metalake.registerJobTemplate(template));
+
+ JobTemplate retrievedTemplate = metalake.getJobTemplate(template.name());
+ Assertions.assertEquals(template, retrievedTemplate);
+
+ // Test get non-existent job template
+ Assertions.assertThrows(
+ NoSuchJobTemplateException.class, () ->
metalake.getJobTemplate("non_existent_template"));
+ }
+
+ @Test
+ public void testRegisterAndDeleteJobTemplate() {
+ JobTemplate template1 = builder.withName("test_1").build();
+ JobTemplate template2 = builder.withName("test_2").build();
+ Assertions.assertDoesNotThrow(() ->
metalake.registerJobTemplate(template1));
+ Assertions.assertDoesNotThrow(() ->
metalake.registerJobTemplate(template2));
+
+ List<JobTemplate> registeredTemplates = metalake.listJobTemplates();
+ Assertions.assertEquals(2, registeredTemplates.size());
+ Assertions.assertTrue(registeredTemplates.contains(template1));
+ Assertions.assertTrue(registeredTemplates.contains(template2));
+
+ JobTemplate result1 = metalake.getJobTemplate(template1.name());
+ JobTemplate result2 = metalake.getJobTemplate(template2.name());
+ Assertions.assertEquals(template1, result1);
+ Assertions.assertEquals(template2, result2);
+
+ // Delete the first job template
+ Assertions.assertTrue(metalake.deleteJobTemplate(template1.name()));
+ // Verify the first job template is deleted
+ Assertions.assertThrows(
+ NoSuchJobTemplateException.class, () ->
metalake.getJobTemplate(template1.name()));
+ // Verify the second job template still exists
+ JobTemplate remainingTemplate = metalake.getJobTemplate(template2.name());
+ Assertions.assertEquals(template2, remainingTemplate);
+
+ // Verify the list of job templates after deletion
+ registeredTemplates = metalake.listJobTemplates();
+ Assertions.assertEquals(1, registeredTemplates.size());
+ Assertions.assertTrue(registeredTemplates.contains(template2));
+
+ // Test deleting a non-existent job template
+ Assertions.assertFalse(metalake.deleteJobTemplate(template1.name()));
+
+ // Delete the second job template
+ Assertions.assertTrue(metalake.deleteJobTemplate(template2.name()));
+
+ // Verify the second job template is deleted
+ Assertions.assertThrows(
+ NoSuchJobTemplateException.class, () ->
metalake.getJobTemplate(template2.name()));
+
+ // Verify the list of job templates is empty after deleting both
+ registeredTemplates = metalake.listJobTemplates();
+ Assertions.assertTrue(registeredTemplates.isEmpty());
+ }
+
+ @Test
+ public void testRunAndListJobs() {
+ JobTemplate template = builder.withName("test_run").build();
+ Assertions.assertDoesNotThrow(() ->
metalake.registerJobTemplate(template));
+
+ // Submit a job with success status
+ JobHandle jobHandle1 =
+ metalake.runJob(
+ template.name(),
+ ImmutableMap.of("arg1", "value1", "arg2", "success", "env_var",
"value2"));
+ Assertions.assertEquals(JobHandle.Status.QUEUED, jobHandle1.jobStatus());
+ Assertions.assertEquals(template.name(), jobHandle1.jobTemplateName());
+
+ JobHandle jobHandle2 =
+ metalake.runJob(
+ template.name(),
+ ImmutableMap.of("arg1", "value3", "arg2", "success", "env_var",
"value4"));
+ Assertions.assertEquals(JobHandle.Status.QUEUED, jobHandle2.jobStatus());
+ Assertions.assertEquals(template.name(), jobHandle2.jobTemplateName());
+
+ List<JobHandle> jobs = metalake.listJobs(template.name());
+ Assertions.assertEquals(2, jobs.size());
+ List<String> resultJobIds =
jobs.stream().map(JobHandle::jobId).collect(Collectors.toList());
+ Assertions.assertTrue(resultJobIds.contains(jobHandle1.jobId()));
+ Assertions.assertTrue(resultJobIds.contains(jobHandle2.jobId()));
+
+ Awaitility.await()
+ .atMost(3, TimeUnit.MINUTES)
+ .until(
+ () -> {
+ JobHandle updatedJob1 = metalake.getJob(jobHandle1.jobId());
+ return updatedJob1.jobStatus() == JobHandle.Status.SUCCEEDED;
+ });
+
+ Awaitility.await()
+ .atMost(3, TimeUnit.MINUTES)
+ .until(
+ () -> {
+ JobHandle updatedJob2 = metalake.getJob(jobHandle2.jobId());
+ return updatedJob2.jobStatus() == JobHandle.Status.SUCCEEDED;
+ });
+
+ List<JobHandle> updatedJobs = metalake.listJobs(template.name());
+ Assertions.assertEquals(2, updatedJobs.size());
+ Set<JobHandle.Status> jobStatuses =
+
updatedJobs.stream().map(JobHandle::jobStatus).collect(Collectors.toSet());
+ Assertions.assertEquals(1, jobStatuses.size());
+ Assertions.assertTrue(jobStatuses.contains(JobHandle.Status.SUCCEEDED));
+ }
+
+ @Test
+ public void testRunAndGetJob() {
+ JobTemplate template = builder.withName("test_run_get").build();
+ Assertions.assertDoesNotThrow(() ->
metalake.registerJobTemplate(template));
+
+ // Submit a job with success status
+ JobHandle jobHandle =
+ metalake.runJob(
+ template.name(),
+ ImmutableMap.of("arg1", "value1", "arg2", "success", "env_var",
"value2"));
+ Assertions.assertEquals(JobHandle.Status.QUEUED, jobHandle.jobStatus());
+ Assertions.assertEquals(template.name(), jobHandle.jobTemplateName());
+
+ Awaitility.await()
+ .atMost(3, TimeUnit.MINUTES)
+ .until(
+ () -> {
+ JobHandle updatedJob = metalake.getJob(jobHandle.jobId());
+ return updatedJob.jobStatus() == JobHandle.Status.SUCCEEDED;
+ });
+
+ JobHandle retrievedJob = metalake.getJob(jobHandle.jobId());
+ Assertions.assertEquals(jobHandle.jobId(), retrievedJob.jobId());
+ Assertions.assertEquals(JobHandle.Status.SUCCEEDED,
retrievedJob.jobStatus());
+
+ // Test run a failed job
+ JobHandle failedJobHandle =
+ metalake.runJob(
+ template.name(),
+ ImmutableMap.of("arg1", "value1", "arg2", "fail", "env_var",
"value2"));
+ Assertions.assertEquals(JobHandle.Status.QUEUED,
failedJobHandle.jobStatus());
+
+ Awaitility.await()
+ .atMost(3, TimeUnit.MINUTES)
+ .until(
+ () -> {
+ JobHandle updatedFailedJob =
metalake.getJob(failedJobHandle.jobId());
+ return updatedFailedJob.jobStatus() == JobHandle.Status.FAILED;
+ });
+
+ JobHandle retrievedFailedJob = metalake.getJob(failedJobHandle.jobId());
+ Assertions.assertEquals(failedJobHandle.jobId(),
retrievedFailedJob.jobId());
+ Assertions.assertEquals(JobHandle.Status.FAILED,
retrievedFailedJob.jobStatus());
+
+ // Test get a non-existent job
+ Assertions.assertThrows(NoSuchJobException.class, () ->
metalake.getJob("non_existent_job_id"));
+ }
+
+ @Test
+ public void testRunAndCancelJob() {
+ JobTemplate template = builder.withName("test_run_cancel").build();
+ Assertions.assertDoesNotThrow(() ->
metalake.registerJobTemplate(template));
+
+ // Submit a job with success status
+ JobHandle jobHandle =
+ metalake.runJob(
+ template.name(),
+ ImmutableMap.of("arg1", "value1", "arg2", "success", "env_var",
"value2"));
+ Assertions.assertEquals(JobHandle.Status.QUEUED, jobHandle.jobStatus());
+ Assertions.assertEquals(template.name(), jobHandle.jobTemplateName());
+
+ // Cancel the job
+ metalake.cancelJob(jobHandle.jobId());
+
+ Awaitility.await()
+ .atMost(3, TimeUnit.MINUTES)
+ .until(
+ () -> {
+ JobHandle updatedJob = metalake.getJob(jobHandle.jobId());
+ return updatedJob.jobStatus() == JobHandle.Status.CANCELLED;
+ });
+
+ JobHandle retrievedJob = metalake.getJob(jobHandle.jobId());
+ Assertions.assertEquals(jobHandle.jobId(), retrievedJob.jobId());
+ Assertions.assertEquals(JobHandle.Status.CANCELLED,
retrievedJob.jobStatus());
+
+ // Test cancel a non-existent job
+ Assertions.assertThrows(
+ NoSuchJobException.class, () ->
metalake.cancelJob("non_existent_job_id"));
+ }
+
+ private String generateTestEntryScript() {
+ String content =
+ "#!/bin/bash\n"
+ + "echo \"starting test test job\"\n\n"
+ + "bin=\"$(dirname \"${BASH_SOURCE-$0}\")\"\n"
+ + "bin=\"$(cd \"${bin}\">/dev/null; pwd)\"\n\n"
+ + ". \"${bin}/common.sh\"\n\n"
+ + "sleep 3\n\n"
+ + "JOB_NAME=\"test_job-$(date +%s)-$1\"\n\n"
+ + "echo \"Submitting job with name: $JOB_NAME\"\n\n"
+ + "echo \"$1\"\n\n"
+ + "echo \"$2\"\n\n"
+ + "echo \"$ENV_VAR\"\n\n"
+ + "if [[ \"$2\" == \"success\" ]]; then\n"
+ + " exit 0\n"
+ + "elif [[ \"$2\" == \"fail\" ]]; then\n"
+ + " exit 1\n"
+ + "else\n"
+ + " exit 2\n"
+ + "fi\n";
+
+ // save the script to a file
+ try {
+ File scriptFile = new File(testStagingDir, "test-job.sh");
+ Files.writeString(scriptFile.toPath(), content);
+ scriptFile.setExecutable(true);
+ return scriptFile.getAbsolutePath();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create test entry script", e);
+ }
+ }
+
+ private String generateTestLibScript() {
+ String content = "#!/bin/bash\necho \"in common script\"\n";
+
+ // save the script to a file
+ try {
+ File scriptFile = new File(testStagingDir, "common.sh");
+ Files.writeString(scriptFile.toPath(), content);
+ scriptFile.setExecutable(true);
+ return scriptFile.getAbsolutePath();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create test lib script", e);
+ }
+ }
+}
diff --git a/clients/client-python/gravitino/client/gravitino_metalake.py
b/clients/client-python/gravitino/client/gravitino_metalake.py
index b85fa41c7b..3d7d7f4cfa 100644
--- a/clients/client-python/gravitino/client/gravitino_metalake.py
+++ b/clients/client-python/gravitino/client/gravitino_metalake.py
@@ -322,7 +322,7 @@ class GravitinoMetalake(MetalakeDTO, SupportsJobs):
raise ValueError("Job template name cannot be null or empty")
url = (
-
f"{self.API_METALAKES_JOB_TEMPLATES_PATH}/{encode_string(self.name())}/"
+
f"{self.API_METALAKES_JOB_TEMPLATES_PATH.format(encode_string(self.name()))}/"
f"{encode_string(job_template_name)}"
)
response = self.rest_client.get(url, error_handler=JOB_ERROR_HANDLER)
@@ -355,7 +355,7 @@ class GravitinoMetalake(MetalakeDTO, SupportsJobs):
raise ValueError("Job template name cannot be null or empty")
url = (
-
f"{self.API_METALAKES_JOB_TEMPLATES_PATH}/{encode_string(self.name())}/"
+
f"{self.API_METALAKES_JOB_TEMPLATES_PATH.format(encode_string(self.name()))}/"
f"{encode_string(job_template_name)}"
)
response = self.rest_client.delete(url,
error_handler=JOB_ERROR_HANDLER)
@@ -403,7 +403,10 @@ class GravitinoMetalake(MetalakeDTO, SupportsJobs):
if not job_id or not job_id.strip():
raise ValueError("Job ID cannot be null or empty")
- url =
f"{self.API_METALAKES_JOB_RUNS_PATH}/{encode_string(self.name())}/{encode_string(job_id)}"
+ url = (
+
f"{self.API_METALAKES_JOB_RUNS_PATH.format(encode_string(self.name()))}"
+ f"/{encode_string(job_id)}"
+ )
response = self.rest_client.get(url, error_handler=JOB_ERROR_HANDLER)
resp = JobResponse.from_json(response.body, infer_missing=True)
resp.validate()
@@ -423,7 +426,7 @@ class GravitinoMetalake(MetalakeDTO, SupportsJobs):
Raises:
NoSuchJobTemplateException: If no job template with the specified
name exists.
"""
- url =
f"{self.API_METALAKES_JOB_RUNS_PATH}/{encode_string(self.name())}"
+ url =
f"{self.API_METALAKES_JOB_RUNS_PATH.format(encode_string(self.name()))}"
request = JobRunRequest(job_template_name, job_conf)
response = self.rest_client.post(
@@ -449,7 +452,10 @@ class GravitinoMetalake(MetalakeDTO, SupportsJobs):
if not job_id or not job_id.strip():
raise ValueError("Job ID cannot be null or empty")
- url =
f"{self.API_METALAKES_JOB_RUNS_PATH}/{encode_string(self.name())}/{encode_string(job_id)}"
+ url = (
+
f"{self.API_METALAKES_JOB_RUNS_PATH.format(encode_string(self.name()))}"
+ f"/{encode_string(job_id)}"
+ )
response = self.rest_client.post(url, error_handler=JOB_ERROR_HANDLER)
resp = JobResponse.from_json(response.body, infer_missing=True)
resp.validate()
diff --git a/clients/client-python/tests/integration/test_supports_jobs.py
b/clients/client-python/tests/integration/test_supports_jobs.py
new file mode 100644
index 0000000000..2235050b58
--- /dev/null
+++ b/clients/client-python/tests/integration/test_supports_jobs.py
@@ -0,0 +1,327 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import shutil
+import tempfile
+import time
+from pathlib import Path
+from random import randint
+from time import sleep
+
+from gravitino import GravitinoAdminClient, GravitinoMetalake
+from gravitino.api.job.job_handle import JobHandle
+from gravitino.api.job.shell_job_template import ShellJobTemplate
+from gravitino.exceptions.base import (
+ JobTemplateAlreadyExistsException,
+ NoSuchJobTemplateException,
+ NoSuchJobException,
+)
+from tests.integration.integration_test_env import IntegrationTestEnv
+
+
+class TestSupportsJobs(IntegrationTestEnv):
+ _metalake_name: str = "job_it_metalake" + str(randint(0, 1000))
+
+ _gravitino_admin_client: GravitinoAdminClient = None
+ _metalake: GravitinoMetalake = None
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ cls.test_staging_dir = tempfile.mkdtemp()
+ cls.test_entry_script_path = cls.generate_test_entry_script()
+ cls.test_lib_script_path = cls.generate_test_lib_script()
+
+ cls.builder = (
+ ShellJobTemplate.builder()
+ .with_comment("Test shell job template")
+ .with_executable(cls.test_entry_script_path)
+ .with_arguments(["{{arg1}}", "{{arg2}}"])
+ .with_environments({"ENV_VAR": "{{env_var}}"})
+ .with_scripts([cls.test_lib_script_path])
+ .with_custom_fields({})
+ )
+ cls.configs = {
+ "gravitino.job.stagingDir": cls.test_staging_dir,
+ "gravitino.job.statusPullIntervalInMs": "3000",
+ }
+
+ cls._get_gravitino_home()
+ cls._append_conf(cls.configs,
f"{cls.gravitino_home}/conf/gravitino.conf")
+ cls.restart_server()
+
+ cls._gravitino_admin_client =
GravitinoAdminClient(uri="http://localhost:8090")
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.test_staging_dir)
+
+ def setUp(self):
+ self._metalake = self._gravitino_admin_client.create_metalake(
+ self._metalake_name, comment="test metalake", properties={}
+ )
+
+ def tearDown(self):
+ self._gravitino_admin_client.drop_metalake(self._metalake_name,
force=True)
+
+ def test_register_and_list_job_templates(self):
+ template_1 = self.builder.with_name("test_1").build()
+ template_2 = self.builder.with_name("test_2").build()
+
+ # Assert no exceptions are raised when registering job templates
+ self.assertIsNone(self._metalake.register_job_template(template_1))
+ self.assertIsNone(self._metalake.register_job_template(template_2))
+
+ # List registered job templates
+ registered_templates = self._metalake.list_job_templates()
+ self.assertEqual(len(registered_templates), 2)
+ self.assertIn(template_1, registered_templates)
+ self.assertIn(template_2, registered_templates)
+
+ # Test registering a duplicate job template
+ with self.assertRaises(JobTemplateAlreadyExistsException):
+ self._metalake.register_job_template(template_1)
+
+ def test_register_and_get_job_template(self):
+ template = self.builder.with_name("test_get").build()
+
+ # Assert no exceptions are raised when registering the job template
+ self.assertIsNone(self._metalake.register_job_template(template))
+
+ # Retrieve the registered job template
+ retrieved_template = self._metalake.get_job_template(template.name)
+ self.assertEqual(template, retrieved_template)
+
+ # Test retrieving a non-existent job template
+ with self.assertRaises(NoSuchJobTemplateException):
+ self._metalake.get_job_template("non_existent_template")
+
+ def test_register_and_delete_job_template(self):
+ template1 = self.builder.with_name("test_1").build()
+ template2 = self.builder.with_name("test_2").build()
+
+ # Assert no exceptions are raised when registering job templates
+ self.assertIsNone(self._metalake.register_job_template(template1))
+ self.assertIsNone(self._metalake.register_job_template(template2))
+
+ # List registered job templates
+ registered_templates = self._metalake.list_job_templates()
+ self.assertEqual(len(registered_templates), 2)
+ self.assertIn(template1, registered_templates)
+ self.assertIn(template2, registered_templates)
+
+ # Retrieve and verify the registered job templates
+ result1 = self._metalake.get_job_template(template1.name)
+ result2 = self._metalake.get_job_template(template2.name)
+ self.assertEqual(template1, result1)
+ self.assertEqual(template2, result2)
+
+ # Delete the first job template
+ self.assertTrue(self._metalake.delete_job_template(template1.name))
+
+ # Verify the first job template is deleted
+ with self.assertRaises(NoSuchJobTemplateException):
+ self._metalake.get_job_template(template1.name)
+
+ # Verify the second job template still exists
+ remaining_template = self._metalake.get_job_template(template2.name)
+ self.assertEqual(template2, remaining_template)
+
+ # Verify the list of job templates after deletion
+ registered_templates = self._metalake.list_job_templates()
+ self.assertEqual(len(registered_templates), 1)
+ self.assertIn(template2, registered_templates)
+
+ # Test deleting a non-existent job template
+ self.assertFalse(self._metalake.delete_job_template(template1.name))
+
+ # Delete the second job template
+ self.assertTrue(self._metalake.delete_job_template(template2.name))
+
+ # Verify the second job template is deleted
+ with self.assertRaises(NoSuchJobTemplateException):
+ self._metalake.get_job_template(template2.name)
+
+ # Verify the list of job templates is empty after deleting both
+ registered_templates = self._metalake.list_job_templates()
+ self.assertTrue(len(registered_templates) == 0)
+
+ def test_run_and_list_jobs(self):
+ template = self.builder.with_name("test_run").build()
+ self._metalake.register_job_template(template)
+
+ # Submit jobs
+ job_handle1 = self._metalake.run_job(
+ template.name, {"arg1": "value1", "arg2": "success", "env_var":
"value2"}
+ )
+ self.assertEqual(job_handle1.job_status(), JobHandle.Status.QUEUED)
+ self.assertEqual(job_handle1.job_template_name(), template.name)
+
+ job_handle2 = self._metalake.run_job(
+ template.name, {"arg1": "value3", "arg2": "success", "env_var":
"value4"}
+ )
+ self.assertEqual(job_handle2.job_status(), JobHandle.Status.QUEUED)
+ self.assertEqual(job_handle2.job_template_name(), template.name)
+
+ # List jobs
+ jobs = self._metalake.list_jobs(template.name)
+ self.assertEqual(len(jobs), 2)
+ job_ids = [job.job_id() for job in jobs]
+ self.assertIn(job_handle1.job_id(), job_ids)
+ self.assertIn(job_handle2.job_id(), job_ids)
+
+ # Wait for jobs to complete
+ self._wait_until(
+ lambda: self._metalake.get_job(job_handle1.job_id()).job_status()
+ == JobHandle.Status.SUCCEEDED,
+ timeout=180,
+ )
+
+ self._wait_until(
+ lambda: self._metalake.get_job(job_handle2.job_id()).job_status()
+ == JobHandle.Status.SUCCEEDED,
+ timeout=180,
+ )
+
+ updated_jobs = self._metalake.list_jobs(template.name)
+ self.assertEqual(len(updated_jobs), 2)
+ job_statuses = {job.job_status() for job in updated_jobs}
+ self.assertEqual(len(job_statuses), 1)
+ self.assertIn(JobHandle.Status.SUCCEEDED, job_statuses)
+
+ def test_run_and_get_job(self):
+ template = self.builder.with_name("test_run_get").build()
+ self._metalake.register_job_template(template)
+
+ # Submit a job
+ job_handle = self._metalake.run_job(
+ template.name, {"arg1": "value1", "arg2": "success", "env_var":
"value2"}
+ )
+ self.assertEqual(job_handle.job_status(), JobHandle.Status.QUEUED)
+ self.assertEqual(job_handle.job_template_name(), template.name)
+
+ # Wait for job to complete
+ self._wait_until(
+ lambda: self._metalake.get_job(job_handle.job_id()).job_status()
+ == JobHandle.Status.SUCCEEDED,
+ timeout=180,
+ )
+ retrieved_job = self._metalake.get_job(job_handle.job_id())
+ self.assertEqual(job_handle.job_id(), retrieved_job.job_id())
+ self.assertEqual(JobHandle.Status.SUCCEEDED,
retrieved_job.job_status())
+
+ # Test failed job
+ failed_job_handle = self._metalake.run_job(
+ template.name, {"arg1": "value1", "arg2": "fail", "env_var":
"value2"}
+ )
+ self.assertEqual(failed_job_handle.job_status(),
JobHandle.Status.QUEUED)
+
+ self._wait_until(
+ lambda:
self._metalake.get_job(failed_job_handle.job_id()).job_status()
+ == JobHandle.Status.FAILED,
+ timeout=180,
+ )
+ retrieved_failed_job =
self._metalake.get_job(failed_job_handle.job_id())
+ self.assertEqual(failed_job_handle.job_id(),
retrieved_failed_job.job_id())
+ self.assertEqual(JobHandle.Status.FAILED,
retrieved_failed_job.job_status())
+
+ # Test non-existent job
+ with self.assertRaises(NoSuchJobException):
+ self._metalake.get_job("non_existent_job_id")
+
+ def test_run_and_cancel_job(self):
+ template = self.builder.with_name("test_run_cancel").build()
+ self._metalake.register_job_template(template)
+
+ # Submit a job
+ job_handle = self._metalake.run_job(
+ template.name, {"arg1": "value1", "arg2": "success", "env_var":
"value2"}
+ )
+ self.assertEqual(job_handle.job_status(), JobHandle.Status.QUEUED)
+ self.assertEqual(job_handle.job_template_name(), template.name)
+
+ sleep(1)
+
+ # Cancel the job
+ self._metalake.cancel_job(job_handle.job_id())
+
+ self._wait_until(
+ lambda: self._metalake.get_job(job_handle.job_id()).job_status()
+ == JobHandle.Status.CANCELLED,
+ timeout=180,
+ )
+
+ retrieved_job = self._metalake.get_job(job_handle.job_id())
+ self.assertEqual(job_handle.job_id(), retrieved_job.job_id())
+ self.assertEqual(JobHandle.Status.CANCELLED,
retrieved_job.job_status())
+
+ # Test cancel non-existent job
+ with self.assertRaises(NoSuchJobException):
+ self._metalake.cancel_job("non_existent_job_id")
+
+ def _wait_until(self, condition, timeout=180, interval=0.5):
+ end_time = time.time() + timeout
+ while time.time() < end_time:
+ if condition():
+ return True
+ time.sleep(interval)
+ raise TimeoutError(f"Condition not met within {timeout} seconds")
+
+ @classmethod
+ def generate_test_entry_script(cls):
+ content = """#!/bin/bash
+echo "starting test test job"
+
+bin="$(dirname "${BASH_SOURCE-$0}")"
+bin="$(cd "${bin}">/dev/null; pwd)"
+
+. "${bin}/common.sh"
+
+sleep 3
+
+JOB_NAME="test_job-$(date +%s)-$1"
+
+echo "Submitting job with name: $JOB_NAME"
+
+echo "$1"
+
+echo "$2"
+
+echo "$ENV_VAR"
+
+if [[ "$2" == "success" ]]; then
+ exit 0
+elif [[ "$2" == "fail" ]]; then
+ exit 1
+else
+ exit 2
+fi
+"""
+ script_path = Path(cls.test_staging_dir) / "test-job.sh"
+ script_path.write_text(content, encoding="utf-8")
+ script_path.chmod(0o755)
+ return str(script_path)
+
+ @classmethod
+ def generate_test_lib_script(cls):
+ content = """#!/bin/bash
+echo "in common script"
+"""
+ script_path = Path(cls.test_staging_dir) / "common.sh"
+ script_path.write_text(content, encoding="utf-8")
+ script_path.chmod(0o755)
+ return str(script_path)
diff --git a/conf/log4j2.properties.template b/conf/log4j2.properties.template
index 074b11fd7d..c3f31f5268 100644
--- a/conf/log4j2.properties.template
+++ b/conf/log4j2.properties.template
@@ -48,7 +48,7 @@ appender.rolling.strategy.delete.ifLastModified.type =
IfLastModified
appender.rolling.strategy.delete.ifLastModified.age = 30d
-## use seperate file for lineage log
+## use separate file for lineage log
appender.lineage_file.type=RollingFile
appender.lineage_file.name=lineage_file
appender.lineage_file.fileName=${basePath}/gravitino_lineage.log
diff --git
a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
index 8aea5b2384..f64b60123d 100644
--- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
+++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
@@ -38,8 +38,6 @@ public interface SupportsRelationOperations {
ROLE_USER_REL,
/** Role and group relationship */
ROLE_GROUP_REL,
- /** Job template and job relationship */
- JOB_TEMPLATE_JOB_REL,
/** Policy and metadata object relationship */
POLICY_METADATA_OBJECT_REL,
}
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index e32bb3752c..331afb43f9 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -39,6 +39,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.Configs;
@@ -47,7 +48,6 @@ import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
-import org.apache.gravitino.SupportsRelationOperations;
import org.apache.gravitino.connector.job.JobExecutor;
import org.apache.gravitino.exceptions.InUseException;
import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
@@ -252,7 +252,14 @@ public class JobManager implements JobOperationDispatcher {
public boolean deleteJobTemplate(String metalake, String jobTemplateName)
throws InUseException {
checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
- List<JobEntity> jobs = listJobs(metalake, Optional.of(jobTemplateName));
+ List<JobEntity> jobs;
+ try {
+ jobs = listJobs(metalake, Optional.of(jobTemplateName));
+ } catch (NoSuchJobTemplateException e) {
+ // If the job template does not exist, we can safely return false.
+ return false;
+ }
+
boolean hasActiveJobs =
jobs.stream()
.anyMatch(
@@ -313,6 +320,13 @@ public class JobManager implements JobOperationDispatcher {
NameIdentifier jobTemplateIdent =
NameIdentifierUtil.ofJobTemplate(metalake,
jobTemplateName.get());
+ // If jobTemplateName is present, we need to list the jobs
associated with the job.
+ // Using a mock namespace from job template identifier to get
the jobs associated
+ // with job template.
+ String[] elements =
+ ArrayUtils.add(jobTemplateIdent.namespace().levels(),
jobTemplateIdent.name());
+ Namespace jobTemplateIdentNs = Namespace.of(elements);
+
// Lock the job template to ensure no concurrent
modifications/deletions
jobEntities =
TreeLockUtils.doWithTreeLock(
@@ -320,12 +334,8 @@ public class JobManager implements JobOperationDispatcher {
LockType.READ,
() ->
// List all the jobs associated with the job template
- entityStore
- .relationOperations()
- .listEntitiesByRelation(
-
SupportsRelationOperations.Type.JOB_TEMPLATE_JOB_REL,
- jobTemplateIdent,
- Entity.EntityType.JOB_TEMPLATE));
+ entityStore.list(
+ jobTemplateIdentNs, JobEntity.class,
Entity.EntityType.JOB));
} else {
jobEntities = entityStore.list(jobNs, JobEntity.class,
Entity.EntityType.JOB);
}
@@ -527,6 +537,12 @@ public class JobManager implements JobOperationDispatcher {
e);
}
});
+
+ LOG.info(
+ "Updated the job {} with execution id {} status to {}",
+ job.name(),
+ job.jobExecutionId(),
+ newStatus);
}
});
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index bfe3ac72a9..a72182d5df 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -508,14 +508,6 @@ public class JDBCBackend implements RelationalBackend {
String.format("ROLE_USER_REL doesn't support type %s",
identType.name()));
}
- case JOB_TEMPLATE_JOB_REL:
- if (identType == Entity.EntityType.JOB_TEMPLATE) {
- return (List<E>)
JobMetaService.getInstance().listJobsByTemplateIdent(nameIdentifier);
- } else {
- throw new IllegalArgumentException(
- String.format("JOB_TEMPLATE_JOB_REL doesn't support type %s",
identType.name()));
- }
-
case POLICY_METADATA_OBJECT_REL:
if (identType == Entity.EntityType.POLICY) {
return (List<E>)
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
index 485dba11b7..86af63c1fa 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
@@ -25,6 +25,7 @@ import java.util.stream.Collectors;
import org.apache.gravitino.Entity;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.IllegalNamespaceException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.job.JobHandle;
import org.apache.gravitino.meta.JobEntity;
@@ -48,22 +49,29 @@ public class JobMetaService {
public List<JobEntity> listJobsByNamespace(Namespace ns) {
String metalakeName = ns.level(0);
- List<JobPO> jobPOs =
- SessionUtils.getWithoutCommit(
- JobMetaMapper.class, mapper ->
mapper.listJobPOsByMetalake(metalakeName));
- return jobPOs.stream().map(po -> JobPO.fromJobPO(po,
ns)).collect(Collectors.toList());
- }
-
- public List<JobEntity> listJobsByTemplateIdent(NameIdentifier
jobTemplateIdent) {
- String metalakeName = jobTemplateIdent.namespace().level(0);
- String jobTemplateName = jobTemplateIdent.name();
- List<JobPO> jobPOs =
- SessionUtils.getWithoutCommit(
- JobMetaMapper.class,
- mapper -> mapper.listJobPOsByMetalakeAndTemplate(metalakeName,
jobTemplateName));
- return jobPOs.stream()
- .map(po -> JobPO.fromJobPO(po, NamespaceUtil.ofJob(metalakeName)))
- .collect(Collectors.toList());
+ if (ns.length() == 3) {
+ // If the namespace is job namespace, we will list all the jobs in the
metalake.
+ List<JobPO> jobPOs =
+ SessionUtils.getWithoutCommit(
+ JobMetaMapper.class, mapper ->
mapper.listJobPOsByMetalake(metalakeName));
+ return jobPOs.stream().map(po -> JobPO.fromJobPO(po,
ns)).collect(Collectors.toList());
+
+ } else if (ns.length() == 4) {
+ // If the namespace is generated by a job template identifier, we will
list all the jobs
+ // associate with the job template.
+ NameIdentifier jobTemplateIdent = NameIdentifier.of(ns.levels());
+ String jobTemplateName = jobTemplateIdent.name();
+ List<JobPO> jobPOs =
+ SessionUtils.getWithoutCommit(
+ JobMetaMapper.class,
+ mapper -> mapper.listJobPOsByMetalakeAndTemplate(metalakeName,
jobTemplateName));
+ return jobPOs.stream()
+ .map(po -> JobPO.fromJobPO(po, NamespaceUtil.ofJob(metalakeName)))
+ .collect(Collectors.toList());
+
+ } else {
+ throw new IllegalNamespaceException("Invalid namespace for listing jobs:
%s", ns);
+ }
}
public JobEntity getJobByIdentifier(NameIdentifier ident) {
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
index 1070af6bbd..a95cec8466 100644
--- a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
@@ -40,6 +40,7 @@ import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.Configs;
@@ -49,7 +50,6 @@ import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
-import org.apache.gravitino.SupportsRelationOperations;
import org.apache.gravitino.connector.job.JobExecutor;
import org.apache.gravitino.exceptions.InUseException;
import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
@@ -361,18 +361,11 @@ public class TestJobManager {
JobEntity job1 = newJobEntity("shell_job", JobHandle.Status.QUEUED);
JobEntity job2 = newJobEntity("spark_job", JobHandle.Status.QUEUED);
- SupportsRelationOperations supportsRelationOperations =
- Mockito.mock(SupportsRelationOperations.class);
- when(supportsRelationOperations.listEntitiesByRelation(
- SupportsRelationOperations.Type.JOB_TEMPLATE_JOB_REL,
- NameIdentifierUtil.ofJobTemplate(metalake,
shellJobTemplate.name()),
- Entity.EntityType.JOB_TEMPLATE))
+ String[] levels =
+ ArrayUtils.add(shellJobTemplate.namespace().levels(),
shellJobTemplate.name());
+ Namespace jobTemplateIdentNs = Namespace.of(levels);
+ when(entityStore.list(jobTemplateIdentNs, JobEntity.class,
Entity.EntityType.JOB))
.thenReturn(Lists.newArrayList(job1));
-
when(entityStore.relationOperations()).thenReturn(supportsRelationOperations);
-
- // Mock the listJobs method to return a list of jobs associated with the
job template
- when(entityStore.list(NamespaceUtil.ofJob(metalake), JobEntity.class,
Entity.EntityType.JOB))
- .thenReturn(Lists.newArrayList(job1, job2));
List<JobEntity> jobs = jobManager.listJobs(metalake,
Optional.of(shellJobTemplate.name()));
Assertions.assertEquals(1, jobs.size());
@@ -380,6 +373,10 @@ public class TestJobManager {
Assertions.assertFalse(jobs.contains(job2));
// List all jobs without filtering by job template
+ // Mock the listJobs method to return a list of jobs associated with the
job template
+ when(entityStore.list(NamespaceUtil.ofJob(metalake), JobEntity.class,
Entity.EntityType.JOB))
+ .thenReturn(Lists.newArrayList(job1, job2));
+
jobs = jobManager.listJobs(metalake, Optional.empty());
Assertions.assertEquals(2, jobs.size());
Assertions.assertTrue(jobs.contains(job1));
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
index e47cb50b6b..4fd59ef4df 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
@@ -21,7 +21,9 @@ package org.apache.gravitino.storage.relational.service;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.job.JobHandle;
import org.apache.gravitino.meta.AuditInfo;
@@ -70,17 +72,18 @@ public class TestJobMetaService extends TestJDBCBackend {
Assertions.assertTrue(jobs.contains(job2));
// Test listing jobs by job template identifier
+ String[] levels = ArrayUtils.add(jobTemplate.namespace().levels(),
jobTemplate.name());
+ Namespace jobTemplateIdentNs = Namespace.of(levels);
List<JobEntity> jobsByTemplate =
-
JobMetaService.getInstance().listJobsByTemplateIdent(jobTemplate.nameIdentifier());
+ JobMetaService.getInstance().listJobsByNamespace(jobTemplateIdentNs);
Assertions.assertEquals(2, jobsByTemplate.size());
Assertions.assertTrue(jobsByTemplate.contains(job1));
Assertions.assertTrue(jobsByTemplate.contains(job2));
// Test listing jobs by non-existing template identifier
+ levels = ArrayUtils.add(jobTemplate.namespace().levels(),
"non_existing_template");
List<JobEntity> emptyJobs =
- JobMetaService.getInstance()
- .listJobsByTemplateIdent(
- NameIdentifierUtil.ofJobTemplate(METALAKE_NAME,
"non_existing_template"));
+ JobMetaService.getInstance().listJobsByNamespace(Namespace.of(levels));
Assertions.assertTrue(emptyJobs.isEmpty());
}
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
index d1daed2a77..54ea3c1365 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
@@ -122,7 +122,10 @@ public class JobOperations {
@ResponseMetered(name = "register-job-template", absolute = true)
public Response registerJobTemplate(
@PathParam("metalake") String metalake, JobTemplateRegisterRequest
request) {
- LOG.info("Received request to register job template in metalake: {}",
metalake);
+ LOG.info(
+ "Received request to register job template {} in metalake: {}",
+ request.getJobTemplate().name(),
+ metalake);
try {
return Utils.doAs(
@@ -257,7 +260,8 @@ public class JobOperations {
@Timed(name = "run-job." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
@ResponseMetered(name = "run-job", absolute = true)
public Response runJob(@PathParam("metalake") String metalake, JobRunRequest
request) {
- LOG.info("Received request to run job in metalake: {}", metalake);
+ LOG.info(
+ "Received request to run job {} in metalake: {}",
request.getJobTemplateName(), metalake);
try {
return Utils.doAs(
@@ -270,7 +274,11 @@ public class JobOperations {
JobEntity jobEntity =
jobOperationDispatcher.runJob(metalake,
request.getJobTemplateName(), jobConf);
- LOG.info("Run job {} in metalake: {}", jobEntity.name(), metalake);
+ LOG.info(
+ "Run job[{}] {} in metalake: {}",
+ jobEntity.jobTemplateName(),
+ jobEntity.name(),
+ metalake);
return Utils.ok(new JobResponse(toDTO(jobEntity)));
});