Yun Gao,
Thank you very much for this.

Do you see this ticked be picked up any time soon?
For a moment I was thinking about trying to take my change with it but I
guessing it would be rather hard without deeper understanding of Flink's
internals, plus its marked as "major"

Regards,
Krzysztof Chmielewski

śr., 28 wrz 2022 o 17:22 Yun Gao <yungao...@aliyun.com.invalid> napisał(a):

> Hi Krzysztof,
> Very sorry for the long delay for it takes a bit of time to have a full
> investigation,
> the issue should be caused by the implementation bugs and I have filed an
> issue
> for the bugs.
> For the following actions, let's move to the thread of [2] for global
> synchronization.
> Very sorry for the inconvenience brought.
> Best,
> Yun Gao
> [1] https://issues.apache.org/jira/browse/FLINK-29459 <
> https://issues.apache.org/jira/browse/FLINK-29459 >
> [2] https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 <
> https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 >
> ------------------------------------------------------------------
> From:Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com>
> Send Time:2022 Sep. 10 (Sat.) 04:04
> To:dev <dev@flink.apache.org>
> Subject:Re: Migrating Sink v1 to v2
> It seems that there might be an issue with state recovery or some kind
> concurrency issue for GlobalCommitterOperator created by SinkV1Adapter
> that uses StandardSinkTopologies.addGlobalCommitter method in scenarios
> with cluster failover.
> I've tried to locate the place where the issue is and from what I can tell
> is that for setup with two writers, SubtaskCommittableManager from both
> writer tasks are registered in CheckpointCommittableManagerImpl using
> upsertSummary method.
> Then, CommittableWithLineage objects are processed in
> CheckpointCommittableManagerImpl.addCommittable by
> SubtaskCommittableManager corresponding to subtaskId taken from
> CommittableWithLineage.getSubtaskId().
> However with failover scenario (Task Manager crush), where for example
> there is an exception thrown in source, CheckpointCommittableManagerImpl
> is recreated without any SubtaskCommittableManager although that could be
> actually ok. What is more important is that during recovery,
> CheckpointCommittableManagerImpl.addCommittable is called twice but
> for CommittableSummary with the same subtaskID, for example 0 instead 0 and
> 1 like it was done before designed TM crush.
> The later CommittableSummary fails with exception pointing to FLINK-25920.
>  After that or sometimes before the second CommittableSummary arrived,
> CheckpointCommittableManagerImpl.addCommittable is called for subtaskId 1
> that is not registered in CheckpointCommittableManagerImpl. That causes
> NPE.
> My streaming environment has the following configuration:
> Configuration config = new Configuration();
> config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
> env.configure(config, getClass().getClassLoader());
> env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
> For now, I don't have any other, more compact reproducer than testFileSink
> test from [1].
> [1]
>
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
> <
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
> >
> śr., 7 wrz 2022 o 13:22 Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> napisał(a):
> > A small update,
> > When I change number of Sinks from 3 to 1, test passes.
> >
> > śr., 7 wrz 2022 o 12:18 Krzysztof Chmielewski <
> > krzysiek.chmielew...@gmail.com> napisał(a):
> >
> >> Hi,
> >> I'm a co-author for opensource Delta-Flink connector hosted on [1].
> >> The connector was originated for Flink 1.12 and currently we migrated to
> >> 1.14.
> >> Both sink and source are using new Unified API from Flink 1.12.
> >>
> >> I'm evaluating migration to Flink 1.15 where Sink v1 was marked as
> >> deprecated.
> >> After the migration, one of our integration test for Sink started to
> fail
> >> for cluster failover scenario [2]
> >> The test is heavily based on Flink's StreamingExecutionFileSinkITCase
> [3]
> >> but since we use Junit5, we do not extend this Flink's class.
> >>
> >> For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our
> >> V1 Sink instance.
> >>
> >> The test fails in one of the two ways:
> >>
> >> Caused by: java.lang.NullPointerException: Unknown subtask for 1
> >> at
> >> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96)
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90)
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
> >> at
> >>
> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> >> at
> >> org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> >> at
> >> org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> >> at
> >> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> >> at
> >>
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> >> at
> >>
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >>
> >> OR
> >>
> >> Caused by: java.lang.UnsupportedOperationException: Currently it is not
> >> supported to update the CommittableSummary for a checkpoint coming from
> the
> >> same subtask. Please check the status of FLINK-25920
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84)
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
> >> at
> >>
> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> >> at
> >> org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> >> at
> >> org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> >> at
> >> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> >> at
> >>
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> >> at
> >>
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >> Test is passing for Flink 1.12, 1.13 and 1.14.
> >>
> >> I would like to ask for any suggestions, what might be causing this.
> >>
> >> Thanks,
> >> Krzysztof Chmielewski
> >>
> >>
> >> [1] https://github.com/delta-io/connectors/tree/master/flink <
> https://github.com/delta-io/connectors/tree/master/flink >
> >> [2]
> >>
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
> <
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
> >
> >> [3]
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
> <
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
> >
> >>
> >
>

Reply via email to