A small update,
When I change number of Sinks from 3 to 1, test passes.

śr., 7 wrz 2022 o 12:18 Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> napisał(a):

> Hi,
> I'm a co-author for opensource Delta-Flink connector hosted on [1].
> The connector was originated for Flink 1.12 and currently we migrated to
> 1.14.
> Both sink and source are using new Unified API from Flink 1.12.
>
> I'm evaluating migration to Flink 1.15 where Sink v1 was marked as
> deprecated.
> After the migration, one of our integration test for Sink started to fail
> for cluster failover scenario [2]
> The test is heavily based on Flink's StreamingExecutionFileSinkITCase [3]
> but since we use Junit5, we do not extend this Flink's class.
>
> For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our V1
> Sink instance.
>
> The test fails in one of the two ways:
>
> Caused by: java.lang.NullPointerException: Unknown subtask for 1
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96)
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90)
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
> at
> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
>
>
> OR
>
> Caused by: java.lang.UnsupportedOperationException: Currently it is not
> supported to update the CommittableSummary for a checkpoint coming from the
> same subtask. Please check the status of FLINK-25920
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84)
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
> at
> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
>
> Test is passing for Flink 1.12, 1.13 and 1.14.
>
> I would like to ask for any suggestions, what might be causing this.
>
> Thanks,
> Krzysztof Chmielewski
>
>
> [1] https://github.com/delta-io/connectors/tree/master/flink
> [2]
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
> [3]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
>

Reply via email to