zstan commented on code in PR #4898: URL: https://github.com/apache/ignite-3/pull/4898#discussion_r1887943749
########## modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java: ########## @@ -157,4 +283,21 @@ private static AsyncSqlCursor<InternalSqlRow> executeQueryInternal(Ignite node, return await(fut); } + + private static class InfiniteJob implements ComputeJob<Void, Void> { Review Comment: can we avoid copy-paste ? can it be public ItComputeSystemViewTest.InfiniteJob ? ########## modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java: ########## @@ -95,46 +126,141 @@ public void killQueryFromLocal() { assertThat(queries.size(), is(1)); UUID targetQueryId = queries.get(0).id(); - checkKillQuery(node, targetQueryId, true); + assertThat(executeKillSqlQuery(node, targetQueryId), is(true)); assertThat(runningQueries(), is(empty())); expectQueryCancelled(new DrainCursor(cursor)); - checkKillQuery(node, targetQueryId, false); - checkKillQuery(node, targetQueryId, true, true); + assertThat(executeKillSqlQuery(node, targetQueryId), is(false)); + assertThat(executeKill(node, QUERY, targetQueryId, true), is(true)); + } + + @Test + public void killComputeJobFromLocal() { + Ignite node = CLUSTER.aliveNode(); + JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build(); + JobExecution<Void> execution = node.compute().submit(JobTarget.node(clusterNode(node)), job, null); + + Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING))); Review Comment: why do we need to wait status here ? let`s change this test - kill job immediately. ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/kill/KillCommandHandler.java: ########## @@ -102,7 +102,7 @@ public CompletableFuture<Boolean> handle(KillCommand cmd) { CompletableFuture<Boolean> killFut = invokeCancel(handler, cmd.operationId()); - if (killFut.isDone() || !cmd.noWait()) { + if (killFut.isCompletedExceptionally() || !cmd.noWait()) { Review Comment: I miss smth here, if noWait == true - we must return immediatelly i.e. return CompletableFuture.completedFuture(true); otherwise need to return killFut, isn`t it ? ########## modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java: ########## @@ -95,46 +126,141 @@ public void killQueryFromLocal() { assertThat(queries.size(), is(1)); UUID targetQueryId = queries.get(0).id(); - checkKillQuery(node, targetQueryId, true); + assertThat(executeKillSqlQuery(node, targetQueryId), is(true)); assertThat(runningQueries(), is(empty())); expectQueryCancelled(new DrainCursor(cursor)); - checkKillQuery(node, targetQueryId, false); - checkKillQuery(node, targetQueryId, true, true); + assertThat(executeKillSqlQuery(node, targetQueryId), is(false)); + assertThat(executeKill(node, QUERY, targetQueryId, true), is(true)); + } + + @Test + public void killComputeJobFromLocal() { + Ignite node = CLUSTER.aliveNode(); + JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build(); + JobExecution<Void> execution = node.compute().submit(JobTarget.node(clusterNode(node)), job, null); + + Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING))); + + UUID jobId = await(execution.idAsync()); + assertThat(jobId, not(nullValue())); + assertThat(executeKillJob(node, jobId), is(true)); + + Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED))); + + assertThat(executeKillJob(node, jobId), is(false)); + assertThat(executeKill(node, COMPUTE, jobId, true), is(true)); } @Test public void killQueryFromRemote() { Review Comment: seems we have no "real" tests for killing long-living queries ? WDYT, do we need such a tests ? Because we already have real infinite job in compute, but it not tested in sql. ########## modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java: ########## @@ -95,46 +126,141 @@ public void killQueryFromLocal() { assertThat(queries.size(), is(1)); UUID targetQueryId = queries.get(0).id(); - checkKillQuery(node, targetQueryId, true); + assertThat(executeKillSqlQuery(node, targetQueryId), is(true)); assertThat(runningQueries(), is(empty())); expectQueryCancelled(new DrainCursor(cursor)); - checkKillQuery(node, targetQueryId, false); - checkKillQuery(node, targetQueryId, true, true); + assertThat(executeKillSqlQuery(node, targetQueryId), is(false)); + assertThat(executeKill(node, QUERY, targetQueryId, true), is(true)); + } + + @Test + public void killComputeJobFromLocal() { + Ignite node = CLUSTER.aliveNode(); + JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build(); + JobExecution<Void> execution = node.compute().submit(JobTarget.node(clusterNode(node)), job, null); + + Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING))); + + UUID jobId = await(execution.idAsync()); + assertThat(jobId, not(nullValue())); + assertThat(executeKillJob(node, jobId), is(true)); + + Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED))); + + assertThat(executeKillJob(node, jobId), is(false)); + assertThat(executeKill(node, COMPUTE, jobId, true), is(true)); } @Test public void killQueryFromRemote() { Ignite local = CLUSTER.node(0); Ignite remote = CLUSTER.node(2); - AsyncSqlCursor<InternalSqlRow> cursor = executeQueryInternal(local, "SELECT 1"); + { + AsyncSqlCursor<InternalSqlRow> cursor = executeQueryInternal(local, "SELECT 1"); - List<QueryInfo> queries = runningQueries(); - assertThat(queries.size(), is(1)); - UUID targetQueryId = queries.get(0).id(); + List<QueryInfo> queries = runningQueries(); + assertThat(queries.size(), is(1)); + UUID targetQueryId = queries.get(0).id(); - checkKillQuery(remote, targetQueryId, true); + assertThat(executeKillSqlQuery(remote, targetQueryId), is(true)); - assertThat(runningQueries(), is(empty())); - expectQueryCancelled(new DrainCursor(cursor)); + assertThat(runningQueries(), is(empty())); + expectQueryCancelled(new DrainCursor(cursor)); + + assertThat(executeKillSqlQuery(remote, targetQueryId), is(false)); + assertThat(executeKillSqlQuery(local, targetQueryId), is(false)); + } + + // No wait. + { + AsyncSqlCursor<InternalSqlRow> cursor = executeQueryInternal(local, "SELECT 1"); - checkKillQuery(remote, targetQueryId, false); - checkKillQuery(local, targetQueryId, false); + List<QueryInfo> queries = runningQueries(); + assertThat(queries.size(), is(1)); + UUID targetQueryId = queries.get(0).id(); + + assertThat(executeKill(remote, QUERY, targetQueryId, true), is(true)); + + Awaitility.await().untilAsserted(() -> assertThat(runningQueries(), is(empty()))); + expectQueryCancelled(new DrainCursor(cursor)); + } } - private static void checkKillQuery(Ignite node, UUID queryId, boolean expectedResult) { - checkKillQuery(node, queryId, expectedResult, false); + @Test + public void killComputeJobFromRemote() { + Ignite local = CLUSTER.node(0); + Ignite remote = CLUSTER.node(2); + + // Single execution. + { + JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build(); + JobExecution<Void> execution = local.compute().submit(JobTarget.node(clusterNode(local)), job, null); + + Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING))); + + UUID jobId = await(execution.idAsync()); + assertThat(jobId, not(nullValue())); + assertThat(executeKillJob(remote, jobId), is(true)); + + Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED))); + + assertThat(executeKillJob(remote, jobId), is(false)); + assertThat(executeKillJob(local, jobId), is(false)); + } + + // Single execution with nowait. + { + JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build(); + JobExecution<Void> execution = local.compute().submit(JobTarget.node(clusterNode(local)), job, null); + + Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING))); + + UUID jobId = await(execution.idAsync()); + assertThat(jobId, not(nullValue())); + assertThat(executeKill(remote, COMPUTE, jobId, true), is(true)); + + Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED))); + + assertThat(executeKill(remote, COMPUTE, jobId, true), is(true)); + } + + // Multiple executions. + { + JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build(); + Map<ClusterNode, JobExecution<Void>> executions = local.compute().submitBroadcast( + Set.of(clusterNode(CLUSTER.node(0)), clusterNode(CLUSTER.node(1))), job, null); + + executions.forEach((node, execution) -> { + UUID jobId = await(execution.idAsync()); + assertThat(jobId, not(nullValue())); + assertThat("Node=" + node.name(), executeKillJob(remote, jobId), is(true)); + + Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED))); + + assertThat("Node=" + node.name(), executeKillJob(remote, jobId), is(false)); + assertThat("Node=" + node.name(), executeKillJob(local, jobId), is(false)); + }); + } } - private static void checkKillQuery(Ignite node, UUID queryId, boolean expectedResult, boolean noWait) { - String query = IgniteStringFormatter - .format("KILL QUERY '{}'{}", queryId, noWait ? " NO WAIT" : ""); + private static boolean executeKillSqlQuery(Ignite node, UUID queryId) { + return executeKill(node, QUERY, queryId, false); + } + + private static boolean executeKillJob(Ignite node, UUID jonId) { + return executeKill(node, COMPUTE, jonId, false); Review Comment: ```suggestion private static boolean executeKillJob(Ignite node, UUID jobId) { return executeKill(node, COMPUTE, jobId, false); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org