Thanks for the additional information Lars. Could you maybe also share the
full stack traces of the errors you see when the checkpoint fails?

@Becket Qin <becket....@gmail.com> is it a known issue with the new Kafka
sources trying to checkpoint negative offsets?

Cheers,
Till

On Thu, Apr 29, 2021 at 1:06 PM Lars Skjærven <lars.skjer...@tv2.no> wrote:

> Thanks Till.
>
> Here is how we created the KafkaSource:
>
>     val sensorSource = KafkaSource.builder[SensorInput]()
>       .setBootstrapServers(myConfig.kafkaBrokers)
>       .setGroupId(myConfig.kafkaGroupId)
>       .setTopics(myConfig.kafkaTopicIn)
>       .setDeserializer(new SensorInputPBDeserializationSchema)
>       .setStartingOffsets(OffsetsInitializer.earliest())
>       .build()
>
> The stream was built with
>
>     env.fromSource(sensorSource , WatermarkStrategy.
> forMonotonousTimestamps(), "sensor events")
>
> The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer
> that does SensorInputPB.parseFrom(record.value()) and finally
> collector.collect(v)
>
> From here on we're doing a keyed windowed aggregation with .keyBy(...).
> window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregate(new
> SensorEventAggregator)
>
> L
>
>
> ------------------------------
> *From:* Till Rohrmann <trohrm...@apache.org>
> *Sent:* Thursday, April 29, 2021 09:16
> *To:* Lars Skjærven <lars.skjer...@tv2.no>; Becket Qin <
> becket....@gmail.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: KafkaSourceBuilder causing invalid negative offset on
> checkpointing
>
> Hi Lars,
>
> The KafkaSourceBuilder constructs the new KafkaSource which has not been
> fully hardened in 1.12.2. In fact, it should not be documented yet. I think
> you are running into an instability/bug of. The new Kafka source should be
> hardened a lot more in the 1.13.0 release.
>
> Could you tell us exactly how you created the KafkaSource so that we can
> verify that this problem has been properly fixed in the 1.13.0 release? I
> am also pulling in Becket who is the original author of this connector. He
> might be able to tell you more.
>
> Cheers,
> Till
>
> On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven <lars.skjer...@tv2.no>
> wrote:
>
> Hello,
> I ran into some issues when using the new KafkaSourceBuilder (running
> Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java
> 8). Initially it generated warnings on kafka configuration, but the job was
> able to consume and produce messages.
>
> The configuration 'client.id.prefix' was supplied but isn't a known config.   
> The configuration 'partition.discovery.interval.ms' was supplied but isn't a 
> known config.    
>
>
> Finally the job crashed with a checkpointing error:
>
> java.lang.Exception: Could not perform checkpoint 10 for operator Source: 
> progress source (4/6)#9.
> ....
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
> complete snapshot 10 for operator Source: progress source (4/6)#9. Failure 
> reason: Checkpoint was declined.
> ...
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>
>
>
> Switching back to using FlinkKafkaConsumer, the warnings on the kafka
> config disapeared, and the job was able to checkpoint successfully.
>
> I'm wondering if the warnings and the errors are connected, and if there
> is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5
> ?
>
> Thanks,
> L
>
>

Reply via email to