StephanEwen commented on a change in pull request #14239: URL: https://github.com/apache/flink/pull/14239#discussion_r531155457
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java ########## @@ -170,6 +178,70 @@ public void testSnapshotAndRestore() throws Exception { } + @Test + public void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedException { Review comment: This test looks to be testing against a specific implementation, rather against a contract. You need to assume the exact specific way of how the `close()` method is implemented for this test to be meaningful. The test re-engineers how `close()` works to produce a special race. That usually causes issues: as soon as the `close()` implementation changes - either the test breaks (despite everything being okay) - or the test becomes meaningless (doesn't reproduce the critical situation any more). Testing such concurrent race situations is always a bit tricky. I think the "manual executors" have a decent trade-off between reliably producing situations and not assuming too much implementation. You could take a look at the `ContinuousFileSplitEnumeratorTest` and the `ManuallyTriggeredScheduledExecutorService` in the `TestingSplitEnumeratorContext`. Using that, you can probably replace all the complex code in the test by: ```java // async action before the enumerator closes testingContext.callAsync(actionThatThrowsException); asyncExecutor.triggerAll(); // callback after the enumerator closes testingContext.close(); enumeratorExecutor.triggerAll(); assertFalse(operatorCoordinatorContext.isJobFailed()); ``` This only ties the test to the implementation aspect that there are two executors, not any specific structure of the close method. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java ########## @@ -321,4 +328,37 @@ private void registerReader() { ListState<SplitT> getReaderState() { return readerState; } + + // ----------- private class -------------- + + private static class CountingDataOutput<T> implements DataOutput<T> { Review comment: Can we put the counting in `SourceOperatorStreamTask`? That way we can - avoid yet another wrapping layer for the output - have the record counting the responsibility of the task and operator chain, which looks like the level on which it is solved in the remaining cases ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java ########## @@ -241,6 +251,14 @@ void failJob(Throwable cause) { } void handleUncaughtExceptionFromAsyncCall(Throwable t) { + final boolean tempClosed = closed; Review comment: Leftover code from debugging? ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java ########## @@ -131,7 +131,14 @@ public void start() { LOG.info("Starting the KafkaSourceEnumerator for consumer group {} " + "without periodic partition discovery.", consumerGroupId); context.callAsync( - this::discoverAndInitializePartitionSplit, + () -> { + try { + return discoverAndInitializePartitionSplit(); + } finally { + // Close the admin client early because we won't use it anymore. + adminClient.close(); Review comment: Is the admin client created in the `start()` method and closed in the start method? Should it be a local variable then? Having it as a class field suggest other methods can use it. Edit: Seems it is used in other places, which seems a bit hard to understand now. What is the life-cycle and use of that field? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org