Yun Gao, Thank you very much for this. Do you see this ticked be picked up any time soon? For a moment I was thinking about trying to take my change with it but I guessing it would be rather hard without deeper understanding of Flink's internals, plus its marked as "major"
Regards, Krzysztof Chmielewski śr., 28 wrz 2022 o 17:22 Yun Gao <yungao...@aliyun.com.invalid> napisał(a): > Hi Krzysztof, > Very sorry for the long delay for it takes a bit of time to have a full > investigation, > the issue should be caused by the implementation bugs and I have filed an > issue > for the bugs. > For the following actions, let's move to the thread of [2] for global > synchronization. > Very sorry for the inconvenience brought. > Best, > Yun Gao > [1] https://issues.apache.org/jira/browse/FLINK-29459 < > https://issues.apache.org/jira/browse/FLINK-29459 > > [2] https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 < > https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 > > ------------------------------------------------------------------ > From:Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> > Send Time:2022 Sep. 10 (Sat.) 04:04 > To:dev <dev@flink.apache.org> > Subject:Re: Migrating Sink v1 to v2 > It seems that there might be an issue with state recovery or some kind > concurrency issue for GlobalCommitterOperator created by SinkV1Adapter > that uses StandardSinkTopologies.addGlobalCommitter method in scenarios > with cluster failover. > I've tried to locate the place where the issue is and from what I can tell > is that for setup with two writers, SubtaskCommittableManager from both > writer tasks are registered in CheckpointCommittableManagerImpl using > upsertSummary method. > Then, CommittableWithLineage objects are processed in > CheckpointCommittableManagerImpl.addCommittable by > SubtaskCommittableManager corresponding to subtaskId taken from > CommittableWithLineage.getSubtaskId(). > However with failover scenario (Task Manager crush), where for example > there is an exception thrown in source, CheckpointCommittableManagerImpl > is recreated without any SubtaskCommittableManager although that could be > actually ok. What is more important is that during recovery, > CheckpointCommittableManagerImpl.addCommittable is called twice but > for CommittableSummary with the same subtaskID, for example 0 instead 0 and > 1 like it was done before designed TM crush. > The later CommittableSummary fails with exception pointing to FLINK-25920. > After that or sometimes before the second CommittableSummary arrived, > CheckpointCommittableManagerImpl.addCommittable is called for subtaskId 1 > that is not registered in CheckpointCommittableManagerImpl. That causes > NPE. > My streaming environment has the following configuration: > Configuration config = new Configuration(); > config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING); > env.configure(config, getClass().getClassLoader()); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > For now, I don't have any other, more compact reproducer than testFileSink > test from [1]. > [1] > > https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java > < > https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java > > > śr., 7 wrz 2022 o 13:22 Krzysztof Chmielewski < > krzysiek.chmielew...@gmail.com> napisał(a): > > A small update, > > When I change number of Sinks from 3 to 1, test passes. > > > > śr., 7 wrz 2022 o 12:18 Krzysztof Chmielewski < > > krzysiek.chmielew...@gmail.com> napisał(a): > > > >> Hi, > >> I'm a co-author for opensource Delta-Flink connector hosted on [1]. > >> The connector was originated for Flink 1.12 and currently we migrated to > >> 1.14. > >> Both sink and source are using new Unified API from Flink 1.12. > >> > >> I'm evaluating migration to Flink 1.15 where Sink v1 was marked as > >> deprecated. > >> After the migration, one of our integration test for Sink started to > fail > >> for cluster failover scenario [2] > >> The test is heavily based on Flink's StreamingExecutionFileSinkITCase > [3] > >> but since we use Junit5, we do not extend this Flink's class. > >> > >> For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our > >> V1 Sink instance. > >> > >> The test fails in one of the two ways: > >> > >> Caused by: java.lang.NullPointerException: Unknown subtask for 1 > >> at > >> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > >> at > >> > org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96) > >> at > >> > org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90) > >> at > >> > org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234) > >> at > >> > org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126) > >> at > >> > org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > >> at > >> org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > >> at > >> org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > >> at > >> org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > >> at > >> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > >> at > >> > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > >> at > >> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > >> at java.lang.Thread.run(Thread.java:748) > >> > >> > >> OR > >> > >> Caused by: java.lang.UnsupportedOperationException: Currently it is not > >> supported to update the CommittableSummary for a checkpoint coming from > the > >> same subtask. Please check the status of FLINK-25920 > >> at > >> > org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84) > >> at > >> > org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230) > >> at > >> > org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124) > >> at > >> > org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > >> at > >> org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > >> at > >> org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > >> at > >> org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > >> at > >> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > >> at > >> > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > >> at > >> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > >> at java.lang.Thread.run(Thread.java:748) > >> > >> Test is passing for Flink 1.12, 1.13 and 1.14. > >> > >> I would like to ask for any suggestions, what might be causing this. > >> > >> Thanks, > >> Krzysztof Chmielewski > >> > >> > >> [1] https://github.com/delta-io/connectors/tree/master/flink < > https://github.com/delta-io/connectors/tree/master/flink > > >> [2] > >> > https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java > < > https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java > > > >> [3] > >> > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java > < > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java > > > >> > > >