zstan commented on code in PR #4738:
URL: https://github.com/apache/ignite-3/pull/4738#discussion_r1853363887


##########
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java:
##########
@@ -379,6 +389,168 @@ void 
executeColocatedThrowsTableNotFoundExceptionWhenTableDoesNotExist() {
         assertThat(ex.getCause().getMessage(), containsString("The table does 
not exist [name=\"PUBLIC\".\"bad-table\"]"));
     }
 
+    @ParameterizedTest(name = "local: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancelComputeSubmitWithCancelHandle(boolean local) {
+        Ignite entryNode = node(0);
+        Ignite executeNode = local ? node(0) : node(1);
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        JobExecutionOptions executionOptions = 
JobExecutionOptions.builder().cancellationToken(cancelHandle.token()).build();
+
+        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SilentSleepJob.class)
+                .options(executionOptions).units(units()).build();
+        JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job, 100L);
+
+        cancelHandle.cancel();
+
+        assertThat(execution.stateAsync(), 
willBe(jobStateWithStatus(CANCELED)));
+        assertThat(execution.resultAsync(), willBe(nullValue()));
+
+        IgniteTestUtils.await(execution.cancelAsync());
+
+        assertThat(execution.cancelAsync(), willBe(false));
+    }
+
+    @ParameterizedTest(name = "local: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancelComputeExecuteAsyncWithCancelHandle(boolean local) {
+        Ignite entryNode = node(0);
+        Ignite executeNode = local ? node(0) : node(1);
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        JobExecutionOptions executionOptions = 
JobExecutionOptions.builder().cancellationToken(cancelHandle.token()).build();
+
+        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SilentSleepJob.class)
+                .options(executionOptions).units(units()).build();
+        CompletableFuture<Void> execution = entryNode.compute()
+                .executeAsync(JobTarget.node(clusterNode(executeNode)), job, 
100L);
+
+        cancelHandle.cancel();
+
+        assertThat(execution, willBe(nullValue()));
+        assertThat(execution, willCompleteSuccessfully());
+    }
+
+    @ParameterizedTest(name = "local: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancelComputeExecuteWithCancelHandle(boolean local) {
+        Ignite entryNode = node(0);
+        Ignite executeNode = local ? node(0) : node(1);
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        JobExecutionOptions executionOptions = 
JobExecutionOptions.builder().cancellationToken(cancelHandle.token()).build();
+
+        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SilentSleepJob.class)
+                .options(executionOptions).units(units()).build();
+
+        CompletableFuture<Void> runFut = IgniteTestUtils.runAsync(() -> 
entryNode.compute()
+                .execute(JobTarget.node(clusterNode(executeNode)), job, 100L));
+
+        cancelHandle.cancel();
+
+        await().atMost(10, TimeUnit.SECONDS).until(runFut::isDone);
+    }
+
+    @ParameterizedTest(name = "withLocal: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancelComputeSubmitBroadcastWithCancelHandle(boolean local) {
+        Ignite entryNode = node(0);
+        Set<ClusterNode> executeNodes =
+                local ? Set.of(clusterNode(entryNode), clusterNode(node(2))) : 
Set.of(clusterNode(node(1)), clusterNode(node(2)));
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        JobExecutionOptions executionOptions = 
JobExecutionOptions.builder().cancellationToken(cancelHandle.token()).build();
+
+        Map<ClusterNode, JobExecution<Object>> executions = 
entryNode.compute().submitBroadcast(
+                executeNodes,
+                
JobDescriptor.builder(SilentSleepJob.class.getName()).units(units()).options(executionOptions).build(),
 100L);
+
+        cancelHandle.cancel();
+
+        CompletableFuture<Void> all = 
allOf(executions.values().stream().map(JobExecution::resultAsync).toArray(CompletableFuture[]::new));
+
+        await().atMost(10, TimeUnit.SECONDS).until(all::isDone);

Review Comment:
   change it a bit different call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to