valepakh commented on code in PR #4776: URL: https://github.com/apache/ignite-3/pull/4776#discussion_r1853854147
########## modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java: ########## @@ -95,7 +97,16 @@ public <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R> descr Objects.requireNonNull(target); Objects.requireNonNull(descriptor); - return new ClientJobExecution<>(ch, submit0(target, descriptor, arg), descriptor.resultMarshaller(), descriptor.resultClass()); + ClientJobExecution<R> execution = new ClientJobExecution<>(ch, submit0(target, descriptor, arg), descriptor.resultMarshaller(), + descriptor.resultClass()); + + CancellationToken cancellationToken = descriptor.options().cancellationToken(); + + if (cancellationToken != null) { + CancelHandleHelper.addCancelAction(cancellationToken, execution::cancelAsync, execution.resultAsync()); + } Review Comment: Looks like this could be moved to the `ClientJobExecution` constructor to eliminate duplicate code in the `submitBroadcast` ########## modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java: ########## @@ -186,11 +203,19 @@ public <T, R> Map<ClusterNode, JobExecution<R>> submitBroadcast(Set<ClusterNode> public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg) { Objects.requireNonNull(taskDescriptor); - return new ClientTaskExecution<>(ch, + ClientTaskExecution<R> clientExecution = new ClientTaskExecution<>(ch, doExecuteMapReduceAsync(taskDescriptor, arg), taskDescriptor.reduceJobResultMarshaller(), taskDescriptor.reduceJobResultClass() ); + + CancellationToken cancellationToken = taskDescriptor.cancellationToken(); + + if (cancellationToken != null) { + CancelHandleHelper.addCancelAction(cancellationToken, clientExecution::cancelAsync, clientExecution.resultAsync()); + } Review Comment: Same here for completeness. ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java: ########## @@ -229,4 +238,29 @@ public String getVal() { return val; } } + + static class InfiniteMapReduceTask implements MapReduceTask<Void, Void, Void, Void> { Review Comment: Looks like this could be moved to the actual test. ########## modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java: ########## @@ -145,6 +150,14 @@ public <I, R> JobExecution<R> executeLocally( inFlightFutures.registerFuture(future); JobExecution<R> result = new DelegatingJobExecution<>(future); + + if (cancellationToken != null) { + CancelHandleHelper.addCancelAction(cancellationToken, () -> classLoaderFut + .cancel(true), classLoaderFut); Review Comment: Should we also cancel `future`? BTW, looks like a good candidate for a shorthand method `addCancelFuture`. ########## modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java: ########## @@ -41,7 +41,12 @@ class RemoteExecutionContext<T, R> { private final AtomicReference<FailSafeJobExecution<R>> jobExecution; - RemoteExecutionContext(List<DeploymentUnit> units, String jobClassName, ExecutionOptions executionOptions, T arg) { + RemoteExecutionContext( + List<DeploymentUnit> units, + String jobClassName, + ExecutionOptions executionOptions, + T arg + ) { Review Comment: This change seems redundant. ########## modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java: ########## @@ -227,11 +233,10 @@ private <T, R> JobExecution<R> executeOnOneNodeWithFailover( ExecutionOptions options = ExecutionOptions.from(jobExecutionOptions); if (isLocal(targetNode)) { - return computeComponent.executeLocally(options, units, jobClassName, arg); + return computeComponent.executeLocally(options, units, jobClassName, jobExecutionOptions.cancellationToken(), arg); Review Comment: Same here, let's pass this token through the `ExecutionOptions` ########## modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java: ########## @@ -84,6 +86,9 @@ class ComputeJobFailover<R> { */ private final RemoteExecutionContext<?, R> jobContext; + /** Cancellation token. */ + @Nullable private final CancellationToken cancellationToken; Review Comment: Can we pass this token through the `ExecutionOptions` and get it later from the job context? ########## modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java: ########## @@ -243,19 +268,24 @@ private void respondWhenAskForPrimaryReplica() { } private void respondWhenExecutingSimpleJobLocally(ExecutionOptions executionOptions) { - when(computeComponent.executeLocally(eq(executionOptions), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq("a"))) + when(computeComponent.executeLocally(eq(executionOptions), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq(null), eq("a"))) .thenReturn(completedExecution("jobResponse")); } + private void respondWhenExecutingSimpleJobLocally(ExecutionOptions executionOptions, CancellationToken cancellationToken) { + when(computeComponent.executeLocally(eq(executionOptions), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), + eq(cancellationToken), eq("a"))).thenReturn(completedExecution("jobResponse")); Review Comment: Also not needed. ########## modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java: ########## @@ -243,19 +268,24 @@ private void respondWhenAskForPrimaryReplica() { } private void respondWhenExecutingSimpleJobLocally(ExecutionOptions executionOptions) { - when(computeComponent.executeLocally(eq(executionOptions), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq("a"))) + when(computeComponent.executeLocally(eq(executionOptions), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq(null), eq("a"))) Review Comment: Looks like `eq` is not needed here, since all argument matchers are the same. ########## modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java: ########## @@ -243,19 +268,24 @@ private void respondWhenAskForPrimaryReplica() { } private void respondWhenExecutingSimpleJobLocally(ExecutionOptions executionOptions) { - when(computeComponent.executeLocally(eq(executionOptions), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq("a"))) + when(computeComponent.executeLocally(eq(executionOptions), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq(null), eq("a"))) .thenReturn(completedExecution("jobResponse")); } + private void respondWhenExecutingSimpleJobLocally(ExecutionOptions executionOptions, CancellationToken cancellationToken) { + when(computeComponent.executeLocally(eq(executionOptions), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), + eq(cancellationToken), eq("a"))).thenReturn(completedExecution("jobResponse")); + } + private void respondWhenExecutingSimpleJobRemotely(ExecutionOptions options) { when(computeComponent.executeRemotelyWithFailover( - eq(remoteNode), any(), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq(options), eq("a") + eq(remoteNode), any(), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq(options), eq(null), eq("a") Review Comment: ```suggestion eq(remoteNode), any(), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq(options), isNull(), eq("a") ``` Here and below, `org.mockito.ArgumentMatchers#isNull()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org