valepakh commented on code in PR #4776:
URL: https://github.com/apache/ignite-3/pull/4776#discussion_r1853854147


##########
modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java:
##########
@@ -95,7 +97,16 @@ public <T, R> JobExecution<R> submit(JobTarget target, 
JobDescriptor<T, R> descr
         Objects.requireNonNull(target);
         Objects.requireNonNull(descriptor);
 
-        return new ClientJobExecution<>(ch, submit0(target, descriptor, arg), 
descriptor.resultMarshaller(), descriptor.resultClass());
+        ClientJobExecution<R> execution = new ClientJobExecution<>(ch, 
submit0(target, descriptor, arg), descriptor.resultMarshaller(),
+                descriptor.resultClass());
+
+        CancellationToken cancellationToken = 
descriptor.options().cancellationToken();
+
+        if (cancellationToken != null) {
+            CancelHandleHelper.addCancelAction(cancellationToken, 
execution::cancelAsync, execution.resultAsync());
+        }

Review Comment:
   Looks like this could be moved to the `ClientJobExecution` constructor to 
eliminate duplicate code in the `submitBroadcast`



##########
modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java:
##########
@@ -186,11 +203,19 @@ public <T, R> Map<ClusterNode, JobExecution<R>> 
submitBroadcast(Set<ClusterNode>
     public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable T arg) {
         Objects.requireNonNull(taskDescriptor);
 
-        return new ClientTaskExecution<>(ch,
+        ClientTaskExecution<R> clientExecution = new ClientTaskExecution<>(ch,
                 doExecuteMapReduceAsync(taskDescriptor, arg),
                 taskDescriptor.reduceJobResultMarshaller(),
                 taskDescriptor.reduceJobResultClass()
         );
+
+        CancellationToken cancellationToken = 
taskDescriptor.cancellationToken();
+
+        if (cancellationToken != null) {
+            CancelHandleHelper.addCancelAction(cancellationToken, 
clientExecution::cancelAsync, clientExecution.resultAsync());
+        }

Review Comment:
   Same here for completeness.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java:
##########
@@ -229,4 +238,29 @@ public String getVal() {
             return val;
         }
     }
+
+    static class InfiniteMapReduceTask implements MapReduceTask<Void, Void, 
Void, Void> {

Review Comment:
   Looks like this could be moved to the actual test.



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java:
##########
@@ -145,6 +150,14 @@ public <I, R> JobExecution<R> executeLocally(
             inFlightFutures.registerFuture(future);
 
             JobExecution<R> result = new DelegatingJobExecution<>(future);
+
+            if (cancellationToken != null) {
+                CancelHandleHelper.addCancelAction(cancellationToken, () -> 
classLoaderFut
+                        .cancel(true), classLoaderFut);

Review Comment:
   Should we also cancel `future`?
   BTW, looks like a good candidate for a shorthand method `addCancelFuture`.



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java:
##########
@@ -41,7 +41,12 @@ class RemoteExecutionContext<T, R> {
 
     private final AtomicReference<FailSafeJobExecution<R>> jobExecution;
 
-    RemoteExecutionContext(List<DeploymentUnit> units, String jobClassName, 
ExecutionOptions executionOptions, T arg) {
+    RemoteExecutionContext(
+            List<DeploymentUnit> units,
+            String jobClassName,
+            ExecutionOptions executionOptions,
+            T arg
+    ) {

Review Comment:
   This change seems redundant.



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java:
##########
@@ -227,11 +233,10 @@ private <T, R> JobExecution<R> 
executeOnOneNodeWithFailover(
         ExecutionOptions options = ExecutionOptions.from(jobExecutionOptions);
 
         if (isLocal(targetNode)) {
-            return computeComponent.executeLocally(options, units, 
jobClassName, arg);
+            return computeComponent.executeLocally(options, units, 
jobClassName, jobExecutionOptions.cancellationToken(), arg);

Review Comment:
   Same here, let's pass this token through the `ExecutionOptions`



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java:
##########
@@ -84,6 +86,9 @@ class ComputeJobFailover<R> {
      */
     private final RemoteExecutionContext<?, R> jobContext;
 
+    /** Cancellation token. */
+    @Nullable private final CancellationToken cancellationToken;

Review Comment:
   Can we pass this token through the `ExecutionOptions` and get it later from 
the job context?



##########
modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java:
##########
@@ -243,19 +268,24 @@ private void respondWhenAskForPrimaryReplica() {
     }
 
     private void respondWhenExecutingSimpleJobLocally(ExecutionOptions 
executionOptions) {
-        when(computeComponent.executeLocally(eq(executionOptions), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq("a")))
+        when(computeComponent.executeLocally(eq(executionOptions), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq(null), eq("a")))
                 .thenReturn(completedExecution("jobResponse"));
     }
 
+    private void respondWhenExecutingSimpleJobLocally(ExecutionOptions 
executionOptions, CancellationToken cancellationToken) {
+        when(computeComponent.executeLocally(eq(executionOptions), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME),
+                eq(cancellationToken), 
eq("a"))).thenReturn(completedExecution("jobResponse"));

Review Comment:
   Also not needed.



##########
modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java:
##########
@@ -243,19 +268,24 @@ private void respondWhenAskForPrimaryReplica() {
     }
 
     private void respondWhenExecutingSimpleJobLocally(ExecutionOptions 
executionOptions) {
-        when(computeComponent.executeLocally(eq(executionOptions), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq("a")))
+        when(computeComponent.executeLocally(eq(executionOptions), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq(null), eq("a")))

Review Comment:
   Looks like `eq` is not needed here, since all argument matchers are the same.



##########
modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java:
##########
@@ -243,19 +268,24 @@ private void respondWhenAskForPrimaryReplica() {
     }
 
     private void respondWhenExecutingSimpleJobLocally(ExecutionOptions 
executionOptions) {
-        when(computeComponent.executeLocally(eq(executionOptions), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq("a")))
+        when(computeComponent.executeLocally(eq(executionOptions), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq(null), eq("a")))
                 .thenReturn(completedExecution("jobResponse"));
     }
 
+    private void respondWhenExecutingSimpleJobLocally(ExecutionOptions 
executionOptions, CancellationToken cancellationToken) {
+        when(computeComponent.executeLocally(eq(executionOptions), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME),
+                eq(cancellationToken), 
eq("a"))).thenReturn(completedExecution("jobResponse"));
+    }
+
     private void respondWhenExecutingSimpleJobRemotely(ExecutionOptions 
options) {
         when(computeComponent.executeRemotelyWithFailover(
-                eq(remoteNode), any(), eq(testDeploymentUnits), 
eq(JOB_CLASS_NAME), eq(options), eq("a")
+                eq(remoteNode), any(), eq(testDeploymentUnits), 
eq(JOB_CLASS_NAME), eq(options), eq(null), eq("a")

Review Comment:
   ```suggestion
                   eq(remoteNode), any(), eq(testDeploymentUnits), 
eq(JOB_CLASS_NAME), eq(options), isNull(), eq("a")
   ```
   Here and below, `org.mockito.ArgumentMatchers#isNull()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to