Hi Piotr,
the option you mention is applicable only for the deprecated
KafkaProducer, is there an equivalent to the modern KafkaSink? I found
this article comparing the behavior of the two:
https://ververica.zendesk.com/hc/en-us/articles/360013269680-Best-Practices-for-Using-Kafka-Sources-Sinks-in-Flink-Jobs
it suggests that the default behavior of KafkaSink would be: "The
recovery continues with an ERROR message like the following is
logged:", however this is not what I observe, instead the job fails. I
am attaching the relevant part of the log. This error happens upon
trying to recover from a one month old savepoint.
Regards,
Yordan
On Tue, 15 Nov 2022 at 18:53, Piotr Nowojski <[email protected]> wrote:
>
> Hi Yordan,
>
> I don't understand where the problem is, why do you think savepoints are
> unusable? If you recover with `ignoreFailuresAfterTransactionTimeout`
> enabled, the current Flink behaviour shouldn't cause any problems (except for
> maybe some logged errors).
>
> Best,
> Piotrek
>
> wt., 15 lis 2022 o 15:36 Yordan Pavlov <[email protected]> napisaĆ(a):
>>
>> Hi,
>> we are using Kafka savepoints as a recovery tool and want to store
>> multiple ones for the past months. However as we use Kafka
>> transactions for our KafkaSink this puts expiration time on our
>> savepoints. We can use a savepoint only as old as our Kafka
>> transaction timeout. The problem is explained in this issue:
>> https://issues.apache.org/jira/browse/FLINK-16419
>> the relative comment being this one:
>> "FlinkKafkaProducer or KafkaSink do not know during recovery if they
>> have to recover and commit or if it has already happened. Due to that,
>> they are always attempting to recover and commit transactions during
>> startup."
>> I'm surprised that more people are not hitting this problem as this
>> makes Savepoints pretty much unusable as a recovery mechanism.
2022-11-16 10:01:07.168 [flink-akka.actor.default-dispatcher-13] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Balances aggreagation
ETH -> (Filter -> Map -> Save to Kafka realtime ETH: Writer -> Save to Kafka
realtime ETH: Committer, Filter -> Map -> Save to Kafka daily ETH: Writer ->
Save to Kafka daily ETH: Committer) (4/5) (6d4d91ab8657bba830695b9a011f7db6)
switched from INITIALIZING to RUNNING.
2022-11-16 10:01:37.222 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 65436 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1668592897201 for job
00000000000000000000000000000000.
2022-11-16 10:01:39.082 [flink-akka.actor.default-dispatcher-13] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Balances aggreagation
ETH -> (Filter -> Map -> Save to Kafka realtime ETH: Writer -> Save to Kafka
realtime ETH: Committer, Filter -> Map -> Save to Kafka daily ETH: Writer ->
Save to Kafka daily ETH: Committer) (1/5) (cfaca46e7f4dc89629cdcaed5b48c059)
switched from RUNNING to FAILED on 10.42.145.181:33297-efc328 @
eth-top-holders-v2-flink-taskmanager-0.eth-top-holders-v2-flink-taskmanager.flink.svc.cluster.local
(dataPort=43125).
java.io.IOException: Could not perform checkpoint 65436 for operator Balances
aggreagation ETH -> (Filter -> Map -> Save to Kafka realtime ETH: Writer ->
Save to Kafka realtime ETH: Committer, Filter -> Map -> Save to Kafka daily
ETH: Writer -> Save to Kafka daily ETH: Committer) (1/5)#0.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emit(SinkWriterOperator.java:234)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:204)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:166)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:300)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1198)
... 22 common frames omitted
Caused by: java.lang.IllegalStateException: Failed to commit
KafkaCommittable{producerId=6640191, epoch=0,
transactionalId=eth_top_holders_daily_v11-0-65435}
at
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
at
org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
at
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
at
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
at
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
at
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:199)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 35 common frames omitted
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The
producer attempted to use a producer id which is not currently assigned to its
transactional id.