StephanEwen commented on a change in pull request #14239:
URL: https://github.com/apache/flink/pull/14239#discussion_r531155457



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
##########
@@ -170,6 +178,70 @@ public void testSnapshotAndRestore() throws Exception {
 
        }
 
+       @Test
+       public void testCallableInterruptedDuringShutdownDoNotFailJob() throws 
InterruptedException {

Review comment:
       This test looks to be testing against a specific implementation, rather 
against a contract.
   You need to assume the exact specific way of how the `close()` method is 
implemented for this test to be meaningful. The test re-engineers how `close()` 
works to produce a special race.
   
   That usually causes issues: as soon as the `close()` implementation changes
     - either the test breaks (despite everything being okay)
     - or the test becomes meaningless (doesn't reproduce the critical 
situation any more).
   
   Testing such concurrent race situations is always a bit tricky. I think the 
"manual executors" have a decent trade-off between reliably producing 
situations and not assuming too much implementation.
   You could take a look at the `ContinuousFileSplitEnumeratorTest` and the 
`ManuallyTriggeredScheduledExecutorService` in the 
`TestingSplitEnumeratorContext`.
   
   Using that, you can probably replace all the complex code in the test by:
   ```java
   // async action before the enumerator closes
   testingContext.callAsync(actionThatThrowsException);
   asyncExecutor.triggerAll();
   
   // callback after the enumerator closes
   testingContext.close();
   enumeratorExecutor.triggerAll();
   
   assertFalse(operatorCoordinatorContext.isJobFailed());
   ```
   
   This only ties the test to the implementation aspect that there are two 
executors, not any specific structure of the close method.
   

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -321,4 +328,37 @@ private void registerReader() {
        ListState<SplitT> getReaderState() {
                return readerState;
        }
+
+       // ----------- private class --------------
+
+       private static class CountingDataOutput<T> implements DataOutput<T> {

Review comment:
       Can we put the counting in `SourceOperatorStreamTask`?
   That way we can
     - avoid yet another wrapping layer for the output
     - have the record counting the responsibility of the task and operator 
chain, which looks like the level on which it is solved in the remaining cases

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -241,6 +251,14 @@ void failJob(Throwable cause) {
        }
 
        void handleUncaughtExceptionFromAsyncCall(Throwable t) {
+               final boolean tempClosed = closed;

Review comment:
       Leftover code from debugging?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
##########
@@ -131,7 +131,14 @@ public void start() {
                        LOG.info("Starting the KafkaSourceEnumerator for 
consumer group {} " +
                                "without periodic partition discovery.", 
consumerGroupId);
                        context.callAsync(
-                                       
this::discoverAndInitializePartitionSplit,
+                                       () -> {
+                                               try {
+                                                       return 
discoverAndInitializePartitionSplit();
+                                               } finally {
+                                                       // Close the admin 
client early because we won't use it anymore.
+                                                       adminClient.close();

Review comment:
       Is the admin client created in the `start()` method and closed in the 
start method? Should it be a local variable then? Having it as a class field 
suggest other methods can use it.
   
   Edit: Seems it is used in other places, which seems a bit hard to understand 
now. What is the life-cycle and use of that field?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to