AlanConfluent commented on code in PR #26122:
URL: https://github.com/apache/flink/pull/26122#discussion_r1990580234


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java:
##########
@@ -1321,6 +1327,123 @@ private void 
testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.Ou
         }
     }
 
+    @Test
+    public void testProcessingTimeWithMailboxThreadOrdered() throws Exception {
+        testProcessingTimeWithCallThread(AsyncDataStream.OutputMode.ORDERED, 
NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    public void testProcessingTimeWithMailboxThreadUnordered() throws 
Exception {
+        testProcessingTimeWithCallThread(AsyncDataStream.OutputMode.UNORDERED, 
NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    public void testProcessingTimeWithMailboxThreadOrderedWithRetry() throws 
Exception {
+        testProcessingTimeWithCallThread(
+                AsyncDataStream.OutputMode.ORDERED, exceptionRetryStrategy);
+    }
+
+    @Test
+    public void testProcessingTimeWithMailboxThreadUnorderedWithRetry() throws 
Exception {
+        testProcessingTimeWithCallThread(
+                AsyncDataStream.OutputMode.UNORDERED, exceptionRetryStrategy);
+    }
+
+    @Test
+    public void testProcessingTimeWithMailboxThreadError() throws Exception {
+        testProcessingTimeWithMailboxThreadError(NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    public void testProcessingTimeWithMailboxThreadErrorWithRetry() throws 
Exception {
+        testProcessingTimeWithMailboxThreadError(exceptionRetryStrategy);
+    }
+
+    public void testProcessingTimeWithMailboxThreadError(
+            @Nullable AsyncRetryStrategy<Integer> asyncRetryStrategy) throws 
Exception {
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                builder.setupOutputForSingletonOperatorChain(
+                                new AsyncWaitOperatorFactory<>(
+                                        new CallThreadAsyncFunctionError(),
+                                        TIMEOUT,
+                                        1,
+                                        AsyncDataStream.OutputMode.UNORDERED,
+                                        asyncRetryStrategy))
+                        .build()) {
+            final long initialTime = 0L;
+            AtomicReference<Throwable> error = new AtomicReference<>();
+            
testHarness.getStreamMockEnvironment().setExternalExceptionHandler(error::set);
+
+            // Sometimes, processElement invoke the async function 
immediately, so we should catch
+            // any exception.
+            try {
+                testHarness.processElement(new StreamRecord<>(1, initialTime + 
1));
+                while (error.get() == null) {
+                    testHarness.processAll();
+                }
+            } catch (Exception e) {
+                // This simulates a mailbox failure failing the job
+                error.set(e);
+            }
+
+            ExceptionUtils.assertThrowable(error.get(), 
ExpectedTestException.class);
+
+            testHarness.endInput();
+        }
+    }
+
+    private void testProcessingTimeWithCallThread(

Review Comment:
   I think I had intended that "call thread" just means the original thread 
which calls eval, but this is a little misleading for the reasons you mention.
   
   >The only difference between testProcessingTimeWithMailboxThreadError vs 
testProcessingTimeWithCallThread is that the former one throws an error, the 
latter process the records?
   
   Yes, that's right.
   
   >testProcessingTimeWithCallThread -> 
testProcessingTimeWithCollectFromMailboxThread
   testProcessingTimeWithMailboxThreadError -> 
testProcessingTimeWithErrorFromMailboxThread
   
   Sounds good to me.  Changed.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java:
##########
@@ -1321,6 +1327,123 @@ private void 
testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.Ou
         }
     }
 
+    @Test
+    public void testProcessingTimeWithMailboxThreadOrdered() throws Exception {
+        testProcessingTimeWithCallThread(AsyncDataStream.OutputMode.ORDERED, 
NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    public void testProcessingTimeWithMailboxThreadUnordered() throws 
Exception {
+        testProcessingTimeWithCallThread(AsyncDataStream.OutputMode.UNORDERED, 
NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    public void testProcessingTimeWithMailboxThreadOrderedWithRetry() throws 
Exception {
+        testProcessingTimeWithCallThread(
+                AsyncDataStream.OutputMode.ORDERED, exceptionRetryStrategy);
+    }
+
+    @Test
+    public void testProcessingTimeWithMailboxThreadUnorderedWithRetry() throws 
Exception {
+        testProcessingTimeWithCallThread(
+                AsyncDataStream.OutputMode.UNORDERED, exceptionRetryStrategy);
+    }
+
+    @Test
+    public void testProcessingTimeWithMailboxThreadError() throws Exception {
+        testProcessingTimeWithMailboxThreadError(NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    public void testProcessingTimeWithMailboxThreadErrorWithRetry() throws 
Exception {
+        testProcessingTimeWithMailboxThreadError(exceptionRetryStrategy);
+    }
+
+    public void testProcessingTimeWithMailboxThreadError(

Review Comment:
   Done



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/ResultFuture.java:
##########
@@ -47,4 +48,13 @@ public interface ResultFuture<OUT> {
      * @param error A Throwable object.
      */
     void completeExceptionally(Throwable error);
+
+    /**
+     * The same as complete, but will execute the supplier on the Mailbox 
thread which initiated the
+     * asynchronous process.
+     *
+     * <p>Note that if an exception is thrown while executing the supplier, 
the result should be the
+     * same as calling {@link ResultFuture#completeExceptionally(Throwable)}.
+     */
+    void complete(SupplierWithException<Collection<OUT>, Exception> supplier);

Review Comment:
   Sounds good.  I defined this class in the 
`org.apache.flink.streaming.api.functions.async` package.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java:
##########
@@ -1428,4 +1551,36 @@ public void timeout(Integer input, ResultFuture<Integer> 
resultFuture) {
             resultFuture.complete(Collections.singletonList(-1));
         }
     }
+
+    private static class CallThreadAsyncFunction extends 
MyAbstractAsyncFunction<Integer> {
+        private static final long serialVersionUID = -1504699677704123889L;
+
+        @Override
+        public void asyncInvoke(final Integer input, final 
ResultFuture<Integer> resultFuture)
+                throws Exception {
+            final Thread callThread = Thread.currentThread();
+            executorService.submit(
+                    () ->
+                            resultFuture.complete(
+                                    () -> {
+                                        assertEquals(callThread, 
Thread.currentThread());
+                                        return Collections.singletonList(input 
* 2);
+                                    }));
+        }
+    }
+
+    private static class CallThreadAsyncFunctionError extends 
MyAbstractAsyncFunction<Integer> {
+        private static final long serialVersionUID = -1504699677704123889L;
+
+        @Override
+        public void asyncInvoke(final Integer input, final 
ResultFuture<Integer> resultFuture)
+                throws Exception {
+            executorService.submit(
+                    () ->
+                            resultFuture.complete(
+                                    () -> {

Review Comment:
   May as well check there too.  Done.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/collector/TableFunctionResultFuture.java:
##########
@@ -60,4 +63,9 @@ public ResultFuture<?> getResultFuture() {
     public void completeExceptionally(Throwable error) {
         this.resultFuture.completeExceptionally(error);
     }
+
+    @Override
+    public void complete(SupplierWithException<Collection<T>, Exception> 
supplier) {
+        throw new UnsupportedOperationException();

Review Comment:
   I added:
   ```
   /**
      * Unsupported, because the containing classes are AsyncFunctions which 
don't have
      * access to the mailbox to invoke from the caller thread.
      */
    ```
    
    Such a thing could be added to the context and supported, but I didn't want 
to do that for this small change when it wouldn't be used.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/AsyncLookupJoinRunner.java:
##########
@@ -274,6 +275,11 @@ public void completeExceptionally(Throwable error) {
             realOutput.completeExceptionally(error);
         }
 
+        @Override
+        public void complete(SupplierWithException<Collection<Object>, 
Exception> supplier) {
+            throw new UnsupportedOperationException();

Review Comment:
   I added:
   
   ```
   /**
      * Unsupported, because the containing classes are AsyncFunctions which 
don't have
      * access to the mailbox to invoke from the caller thread.
      */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to