zstan commented on code in PR #4898:
URL: https://github.com/apache/ignite-3/pull/4898#discussion_r1887943749


##########
modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java:
##########
@@ -157,4 +283,21 @@ private static AsyncSqlCursor<InternalSqlRow> 
executeQueryInternal(Ignite node,
 
         return await(fut);
     }
+
+    private static class InfiniteJob implements ComputeJob<Void, Void> {

Review Comment:
   can we avoid copy-paste ? can it be public 
ItComputeSystemViewTest.InfiniteJob ?



##########
modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java:
##########
@@ -95,46 +126,141 @@ public void killQueryFromLocal() {
         assertThat(queries.size(), is(1));
         UUID targetQueryId = queries.get(0).id();
 
-        checkKillQuery(node, targetQueryId, true);
+        assertThat(executeKillSqlQuery(node, targetQueryId), is(true));
 
         assertThat(runningQueries(), is(empty()));
         expectQueryCancelled(new DrainCursor(cursor));
 
-        checkKillQuery(node, targetQueryId, false);
-        checkKillQuery(node, targetQueryId, true, true);
+        assertThat(executeKillSqlQuery(node, targetQueryId), is(false));
+        assertThat(executeKill(node, QUERY, targetQueryId, true), is(true));
+    }
+
+    @Test
+    public void killComputeJobFromLocal() {
+        Ignite node = CLUSTER.aliveNode();
+        JobDescriptor<Void, Void> job = 
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+        JobExecution<Void> execution = 
node.compute().submit(JobTarget.node(clusterNode(node)), job, null);
+
+        Awaitility.await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));

Review Comment:
   why do we need to wait status here ? let`s change this test - kill job 
immediately.



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/kill/KillCommandHandler.java:
##########
@@ -102,7 +102,7 @@ public CompletableFuture<Boolean> handle(KillCommand cmd) {
 
         CompletableFuture<Boolean> killFut = invokeCancel(handler, 
cmd.operationId());
 
-        if (killFut.isDone() || !cmd.noWait()) {
+        if (killFut.isCompletedExceptionally() || !cmd.noWait()) {

Review Comment:
   I miss smth here, if noWait == true - we must return immediatelly i.e. 
return CompletableFuture.completedFuture(true); otherwise need to return 
killFut, isn`t it ?



##########
modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java:
##########
@@ -95,46 +126,141 @@ public void killQueryFromLocal() {
         assertThat(queries.size(), is(1));
         UUID targetQueryId = queries.get(0).id();
 
-        checkKillQuery(node, targetQueryId, true);
+        assertThat(executeKillSqlQuery(node, targetQueryId), is(true));
 
         assertThat(runningQueries(), is(empty()));
         expectQueryCancelled(new DrainCursor(cursor));
 
-        checkKillQuery(node, targetQueryId, false);
-        checkKillQuery(node, targetQueryId, true, true);
+        assertThat(executeKillSqlQuery(node, targetQueryId), is(false));
+        assertThat(executeKill(node, QUERY, targetQueryId, true), is(true));
+    }
+
+    @Test
+    public void killComputeJobFromLocal() {
+        Ignite node = CLUSTER.aliveNode();
+        JobDescriptor<Void, Void> job = 
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+        JobExecution<Void> execution = 
node.compute().submit(JobTarget.node(clusterNode(node)), job, null);
+
+        Awaitility.await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+        UUID jobId = await(execution.idAsync());
+        assertThat(jobId, not(nullValue()));
+        assertThat(executeKillJob(node, jobId), is(true));
+
+        Awaitility.await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+
+        assertThat(executeKillJob(node, jobId), is(false));
+        assertThat(executeKill(node, COMPUTE, jobId, true), is(true));
     }
 
     @Test
     public void killQueryFromRemote() {

Review Comment:
   seems we have no "real" tests for killing long-living queries ? WDYT, do we 
need such a tests ? Because we already have real infinite job in compute, but 
it not tested in sql.



##########
modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java:
##########
@@ -95,46 +126,141 @@ public void killQueryFromLocal() {
         assertThat(queries.size(), is(1));
         UUID targetQueryId = queries.get(0).id();
 
-        checkKillQuery(node, targetQueryId, true);
+        assertThat(executeKillSqlQuery(node, targetQueryId), is(true));
 
         assertThat(runningQueries(), is(empty()));
         expectQueryCancelled(new DrainCursor(cursor));
 
-        checkKillQuery(node, targetQueryId, false);
-        checkKillQuery(node, targetQueryId, true, true);
+        assertThat(executeKillSqlQuery(node, targetQueryId), is(false));
+        assertThat(executeKill(node, QUERY, targetQueryId, true), is(true));
+    }
+
+    @Test
+    public void killComputeJobFromLocal() {
+        Ignite node = CLUSTER.aliveNode();
+        JobDescriptor<Void, Void> job = 
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+        JobExecution<Void> execution = 
node.compute().submit(JobTarget.node(clusterNode(node)), job, null);
+
+        Awaitility.await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+        UUID jobId = await(execution.idAsync());
+        assertThat(jobId, not(nullValue()));
+        assertThat(executeKillJob(node, jobId), is(true));
+
+        Awaitility.await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+
+        assertThat(executeKillJob(node, jobId), is(false));
+        assertThat(executeKill(node, COMPUTE, jobId, true), is(true));
     }
 
     @Test
     public void killQueryFromRemote() {
         Ignite local = CLUSTER.node(0);
         Ignite remote = CLUSTER.node(2);
 
-        AsyncSqlCursor<InternalSqlRow> cursor = executeQueryInternal(local, 
"SELECT 1");
+        {
+            AsyncSqlCursor<InternalSqlRow> cursor = 
executeQueryInternal(local, "SELECT 1");
 
-        List<QueryInfo> queries = runningQueries();
-        assertThat(queries.size(), is(1));
-        UUID targetQueryId = queries.get(0).id();
+            List<QueryInfo> queries = runningQueries();
+            assertThat(queries.size(), is(1));
+            UUID targetQueryId = queries.get(0).id();
 
-        checkKillQuery(remote, targetQueryId, true);
+            assertThat(executeKillSqlQuery(remote, targetQueryId), is(true));
 
-        assertThat(runningQueries(), is(empty()));
-        expectQueryCancelled(new DrainCursor(cursor));
+            assertThat(runningQueries(), is(empty()));
+            expectQueryCancelled(new DrainCursor(cursor));
+
+            assertThat(executeKillSqlQuery(remote, targetQueryId), is(false));
+            assertThat(executeKillSqlQuery(local, targetQueryId), is(false));
+        }
+
+        // No wait.
+        {
+            AsyncSqlCursor<InternalSqlRow> cursor = 
executeQueryInternal(local, "SELECT 1");
 
-        checkKillQuery(remote, targetQueryId, false);
-        checkKillQuery(local, targetQueryId, false);
+            List<QueryInfo> queries = runningQueries();
+            assertThat(queries.size(), is(1));
+            UUID targetQueryId = queries.get(0).id();
+
+            assertThat(executeKill(remote, QUERY, targetQueryId, true), 
is(true));
+
+            Awaitility.await().untilAsserted(() -> 
assertThat(runningQueries(), is(empty())));
+            expectQueryCancelled(new DrainCursor(cursor));
+        }
     }
 
-    private static void checkKillQuery(Ignite node, UUID queryId, boolean 
expectedResult) {
-        checkKillQuery(node, queryId, expectedResult, false);
+    @Test
+    public void killComputeJobFromRemote() {
+        Ignite local = CLUSTER.node(0);
+        Ignite remote = CLUSTER.node(2);
+
+        // Single execution.
+        {
+            JobDescriptor<Void, Void> job = 
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+            JobExecution<Void> execution = 
local.compute().submit(JobTarget.node(clusterNode(local)), job, null);
+
+            Awaitility.await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+            UUID jobId = await(execution.idAsync());
+            assertThat(jobId, not(nullValue()));
+            assertThat(executeKillJob(remote, jobId), is(true));
+
+            Awaitility.await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+
+            assertThat(executeKillJob(remote, jobId), is(false));
+            assertThat(executeKillJob(local, jobId), is(false));
+        }
+
+        // Single execution with nowait.
+        {
+            JobDescriptor<Void, Void> job = 
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+            JobExecution<Void> execution = 
local.compute().submit(JobTarget.node(clusterNode(local)), job, null);
+
+            Awaitility.await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+            UUID jobId = await(execution.idAsync());
+            assertThat(jobId, not(nullValue()));
+            assertThat(executeKill(remote, COMPUTE, jobId, true), is(true));
+
+            Awaitility.await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+
+            assertThat(executeKill(remote, COMPUTE, jobId, true), is(true));
+        }
+
+        // Multiple executions.
+        {
+            JobDescriptor<Void, Void> job = 
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+            Map<ClusterNode, JobExecution<Void>> executions = 
local.compute().submitBroadcast(
+                    Set.of(clusterNode(CLUSTER.node(0)), 
clusterNode(CLUSTER.node(1))), job, null);
+
+            executions.forEach((node, execution) -> {
+                UUID jobId = await(execution.idAsync());
+                assertThat(jobId, not(nullValue()));
+                assertThat("Node=" + node.name(), executeKillJob(remote, 
jobId), is(true));
+
+                Awaitility.await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+
+                assertThat("Node=" + node.name(), executeKillJob(remote, 
jobId), is(false));
+                assertThat("Node=" + node.name(), executeKillJob(local, 
jobId), is(false));
+            });
+        }
     }
 
-    private static void checkKillQuery(Ignite node, UUID queryId, boolean 
expectedResult, boolean noWait) {
-        String query = IgniteStringFormatter
-                .format("KILL QUERY '{}'{}", queryId, noWait ? " NO WAIT" : 
"");
+    private static boolean executeKillSqlQuery(Ignite node, UUID queryId) {
+        return executeKill(node, QUERY, queryId, false);
+    }
+
+    private static boolean executeKillJob(Ignite node, UUID jonId) {
+        return executeKill(node, COMPUTE, jonId, false);

Review Comment:
   ```suggestion
       private static boolean executeKillJob(Ignite node, UUID jobId) {
           return executeKill(node, COMPUTE, jobId, false);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to