Thanks for the stack traces Lars. With them I could confirm that the
problem should be fixed with FLINK-20114 [1]. The fixes will be contained
in the 1.12.4 and 1.13.0 release. Sorry for the inconveniences.

[1] https://issues.apache.org/jira/browse/FLINK-20114

Cheers,
Till

On Thu, Apr 29, 2021 at 8:30 PM Lars Skjærven <lars.skjer...@tv2.no> wrote:

> Unfortunately, I only have the truncated stack trace available (from the
> flink UI).
> L
>
>
> 2021-04-27 16:32:02
> java.lang.Exception: Could not perform checkpoint 10 for operator Source: 
> progress source (4/6)#9.
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:924)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:885)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
> complete snapshot 10 for operator Source: progress source (4/6)#9. Failure 
> reason: Checkpoint was declined.
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
>       at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
>       at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
>       at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
>       at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
>       ... 10 more
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>       at 
> org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:50)
>       at 
> org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:69)
>       at 
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.snapshotState(KafkaSourceReader.java:96)
>       at 
> org.apache.flink.streaming.api.operators.SourceOperator.snapshotState(SourceOperator.java:288)
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
>       ... 20 more
>
>
>
> ------------------------------
> *From:* Till Rohrmann <trohrm...@apache.org>
> *Sent:* Thursday, April 29, 2021 18:44
> *To:* Lars Skjærven <lars.skjer...@tv2.no>
> *Cc:* Becket Qin <becket....@gmail.com>; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: KafkaSourceBuilder causing invalid negative offset on
> checkpointing
>
> Thanks for the additional information Lars. Could you maybe also share the
> full stack traces of the errors you see when the checkpoint fails?
>
> @Becket Qin <becket....@gmail.com> is it a known issue with the new Kafka
> sources trying to checkpoint negative offsets?
>
> Cheers,
> Till
>
> On Thu, Apr 29, 2021 at 1:06 PM Lars Skjærven <lars.skjer...@tv2.no>
> wrote:
>
> Thanks Till.
>
> Here is how we created the KafkaSource:
>
>     val sensorSource = KafkaSource.builder[SensorInput]()
>       .setBootstrapServers(myConfig.kafkaBrokers)
>       .setGroupId(myConfig.kafkaGroupId)
>       .setTopics(myConfig.kafkaTopicIn)
>       .setDeserializer(new SensorInputPBDeserializationSchema)
>       .setStartingOffsets(OffsetsInitializer.earliest())
>       .build()
>
> The stream was built with
>
>     env.fromSource(sensorSource , WatermarkStrategy.
> forMonotonousTimestamps(), "sensor events")
>
> The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer
> that does SensorInputPB.parseFrom(record.value()) and finally
> collector.collect(v)
>
> From here on we're doing a keyed windowed aggregation with .keyBy(...).
> window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregate(new
> SensorEventAggregator)
>
> L
>
>
> ------------------------------
> *From:* Till Rohrmann <trohrm...@apache.org>
> *Sent:* Thursday, April 29, 2021 09:16
> *To:* Lars Skjærven <lars.skjer...@tv2.no>; Becket Qin <
> becket....@gmail.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: KafkaSourceBuilder causing invalid negative offset on
> checkpointing
>
> Hi Lars,
>
> The KafkaSourceBuilder constructs the new KafkaSource which has not been
> fully hardened in 1.12.2. In fact, it should not be documented yet. I think
> you are running into an instability/bug of. The new Kafka source should be
> hardened a lot more in the 1.13.0 release.
>
> Could you tell us exactly how you created the KafkaSource so that we can
> verify that this problem has been properly fixed in the 1.13.0 release? I
> am also pulling in Becket who is the original author of this connector. He
> might be able to tell you more.
>
> Cheers,
> Till
>
> On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven <lars.skjer...@tv2.no>
> wrote:
>
> Hello,
> I ran into some issues when using the new KafkaSourceBuilder (running
> Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java
> 8). Initially it generated warnings on kafka configuration, but the job was
> able to consume and produce messages.
>
> The configuration 'client.id.prefix' was supplied but isn't a known config.   
> The configuration 'partition.discovery.interval.ms' was supplied but isn't a 
> known config.    
>
>
> Finally the job crashed with a checkpointing error:
>
> java.lang.Exception: Could not perform checkpoint 10 for operator Source: 
> progress source (4/6)#9.
> ....
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
> complete snapshot 10 for operator Source: progress source (4/6)#9. Failure 
> reason: Checkpoint was declined.
> ...
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>
>
>
> Switching back to using FlinkKafkaConsumer, the warnings on the kafka
> config disapeared, and the job was able to checkpoint successfully.
>
> I'm wondering if the warnings and the errors are connected, and if there
> is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5
> ?
>
> Thanks,
> L
>
>

Reply via email to