It seems that there might be an issue with state recovery or some kind
concurrency issue for GlobalCommitterOperator created by  SinkV1Adapter
that uses  StandardSinkTopologies.addGlobalCommitter method in scenarios
with cluster failover.

I've tried to locate the place where the issue is and from what I can tell
is that for setup with two writers, SubtaskCommittableManager from both
writer tasks are registered in CheckpointCommittableManagerImpl using
upsertSummary method.

Then, CommittableWithLineage objects are processed in
CheckpointCommittableManagerImpl.addCommittable by
SubtaskCommittableManager corresponding to subtaskId taken from
CommittableWithLineage.getSubtaskId().

However with failover scenario (Task Manager crush), where for example
there is an exception thrown in source,  CheckpointCommittableManagerImpl
is recreated without any  SubtaskCommittableManager although that could be
actually ok. What is more important is that during recovery,
CheckpointCommittableManagerImpl.addCommittable is called twice but
for CommittableSummary with the same subtaskID, for example 0 instead 0 and
1 like it was done before designed TM crush.
The later CommittableSummary fails with exception pointing to FLINK-25920.

 After that or sometimes before the second CommittableSummary arrived,
CheckpointCommittableManagerImpl.addCommittable is called for subtaskId 1
that is not registered in  CheckpointCommittableManagerImpl. That causes
NPE.

My streaming environment has the following configuration:
Configuration config = new Configuration();
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
env.configure(config, getClass().getClassLoader());
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

For now, I don't have any other, more compact reproducer than testFileSink
test from [1].

[1]
https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java



śr., 7 wrz 2022 o 13:22 Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> napisał(a):

> A small update,
> When I change number of Sinks from 3 to 1, test passes.
>
> śr., 7 wrz 2022 o 12:18 Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> napisał(a):
>
>> Hi,
>> I'm a co-author for opensource Delta-Flink connector hosted on [1].
>> The connector was originated for Flink 1.12 and currently we migrated to
>> 1.14.
>> Both sink and source are using new Unified API from Flink 1.12.
>>
>> I'm evaluating migration to Flink 1.15 where Sink v1 was marked as
>> deprecated.
>> After the migration, one of our integration test for Sink started to fail
>> for cluster failover scenario [2]
>> The test is heavily based on Flink's StreamingExecutionFileSinkITCase [3]
>> but since we use Junit5, we do not extend this Flink's class.
>>
>> For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our
>> V1 Sink instance.
>>
>> The test fails in one of the two ways:
>>
>> Caused by: java.lang.NullPointerException: Unknown subtask for 1
>> at
>> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
>> at
>> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> OR
>>
>> Caused by: java.lang.UnsupportedOperationException: Currently it is not
>> supported to update the CommittableSummary for a checkpoint coming from the
>> same subtask. Please check the status of FLINK-25920
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
>> at
>> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Test is passing for Flink 1.12, 1.13 and 1.14.
>>
>> I would like to ask for any suggestions, what might be causing this.
>>
>> Thanks,
>> Krzysztof Chmielewski
>>
>>
>> [1] https://github.com/delta-io/connectors/tree/master/flink
>> [2]
>> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
>> [3]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
>>
>

Reply via email to