It seems that there might be an issue with state recovery or some kind concurrency issue for GlobalCommitterOperator created by SinkV1Adapter that uses StandardSinkTopologies.addGlobalCommitter method in scenarios with cluster failover.
I've tried to locate the place where the issue is and from what I can tell is that for setup with two writers, SubtaskCommittableManager from both writer tasks are registered in CheckpointCommittableManagerImpl using upsertSummary method. Then, CommittableWithLineage objects are processed in CheckpointCommittableManagerImpl.addCommittable by SubtaskCommittableManager corresponding to subtaskId taken from CommittableWithLineage.getSubtaskId(). However with failover scenario (Task Manager crush), where for example there is an exception thrown in source, CheckpointCommittableManagerImpl is recreated without any SubtaskCommittableManager although that could be actually ok. What is more important is that during recovery, CheckpointCommittableManagerImpl.addCommittable is called twice but for CommittableSummary with the same subtaskID, for example 0 instead 0 and 1 like it was done before designed TM crush. The later CommittableSummary fails with exception pointing to FLINK-25920. After that or sometimes before the second CommittableSummary arrived, CheckpointCommittableManagerImpl.addCommittable is called for subtaskId 1 that is not registered in CheckpointCommittableManagerImpl. That causes NPE. My streaming environment has the following configuration: Configuration config = new Configuration(); config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING); env.configure(config, getClass().getClassLoader()); env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); For now, I don't have any other, more compact reproducer than testFileSink test from [1]. [1] https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java śr., 7 wrz 2022 o 13:22 Krzysztof Chmielewski < krzysiek.chmielew...@gmail.com> napisał(a): > A small update, > When I change number of Sinks from 3 to 1, test passes. > > śr., 7 wrz 2022 o 12:18 Krzysztof Chmielewski < > krzysiek.chmielew...@gmail.com> napisał(a): > >> Hi, >> I'm a co-author for opensource Delta-Flink connector hosted on [1]. >> The connector was originated for Flink 1.12 and currently we migrated to >> 1.14. >> Both sink and source are using new Unified API from Flink 1.12. >> >> I'm evaluating migration to Flink 1.15 where Sink v1 was marked as >> deprecated. >> After the migration, one of our integration test for Sink started to fail >> for cluster failover scenario [2] >> The test is heavily based on Flink's StreamingExecutionFileSinkITCase [3] >> but since we use Junit5, we do not extend this Flink's class. >> >> For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our >> V1 Sink instance. >> >> The test fails in one of the two ways: >> >> Caused by: java.lang.NullPointerException: Unknown subtask for 1 >> at >> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) >> at >> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96) >> at >> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90) >> at >> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234) >> at >> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126) >> at >> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) >> at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) >> at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) >> at java.lang.Thread.run(Thread.java:748) >> >> >> OR >> >> Caused by: java.lang.UnsupportedOperationException: Currently it is not >> supported to update the CommittableSummary for a checkpoint coming from the >> same subtask. Please check the status of FLINK-25920 >> at >> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84) >> at >> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230) >> at >> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124) >> at >> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) >> at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) >> at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) >> at java.lang.Thread.run(Thread.java:748) >> >> Test is passing for Flink 1.12, 1.13 and 1.14. >> >> I would like to ask for any suggestions, what might be causing this. >> >> Thanks, >> Krzysztof Chmielewski >> >> >> [1] https://github.com/delta-io/connectors/tree/master/flink >> [2] >> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java >> [3] >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java >> >