Thanks for the additional information Lars. Could you maybe also share the full stack traces of the errors you see when the checkpoint fails?
@Becket Qin <becket....@gmail.com> is it a known issue with the new Kafka sources trying to checkpoint negative offsets? Cheers, Till On Thu, Apr 29, 2021 at 1:06 PM Lars Skjærven <lars.skjer...@tv2.no> wrote: > Thanks Till. > > Here is how we created the KafkaSource: > > val sensorSource = KafkaSource.builder[SensorInput]() > .setBootstrapServers(myConfig.kafkaBrokers) > .setGroupId(myConfig.kafkaGroupId) > .setTopics(myConfig.kafkaTopicIn) > .setDeserializer(new SensorInputPBDeserializationSchema) > .setStartingOffsets(OffsetsInitializer.earliest()) > .build() > > The stream was built with > > env.fromSource(sensorSource , WatermarkStrategy. > forMonotonousTimestamps(), "sensor events") > > The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer > that does SensorInputPB.parseFrom(record.value()) and finally > collector.collect(v) > > From here on we're doing a keyed windowed aggregation with .keyBy(...). > window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregate(new > SensorEventAggregator) > > L > > > ------------------------------ > *From:* Till Rohrmann <trohrm...@apache.org> > *Sent:* Thursday, April 29, 2021 09:16 > *To:* Lars Skjærven <lars.skjer...@tv2.no>; Becket Qin < > becket....@gmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: KafkaSourceBuilder causing invalid negative offset on > checkpointing > > Hi Lars, > > The KafkaSourceBuilder constructs the new KafkaSource which has not been > fully hardened in 1.12.2. In fact, it should not be documented yet. I think > you are running into an instability/bug of. The new Kafka source should be > hardened a lot more in the 1.13.0 release. > > Could you tell us exactly how you created the KafkaSource so that we can > verify that this problem has been properly fixed in the 1.13.0 release? I > am also pulling in Becket who is the original author of this connector. He > might be able to tell you more. > > Cheers, > Till > > On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven <lars.skjer...@tv2.no> > wrote: > > Hello, > I ran into some issues when using the new KafkaSourceBuilder (running > Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java > 8). Initially it generated warnings on kafka configuration, but the job was > able to consume and produce messages. > > The configuration 'client.id.prefix' was supplied but isn't a known config. > The configuration 'partition.discovery.interval.ms' was supplied but isn't a > known config. > > > Finally the job crashed with a checkpointing error: > > java.lang.Exception: Could not perform checkpoint 10 for operator Source: > progress source (4/6)#9. > .... > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not > complete snapshot 10 for operator Source: progress source (4/6)#9. Failure > reason: Checkpoint was declined. > ... > Caused by: java.lang.IllegalArgumentException: Invalid negative offset > > > > Switching back to using FlinkKafkaConsumer, the warnings on the kafka > config disapeared, and the job was able to checkpoint successfully. > > I'm wondering if the warnings and the errors are connected, and if there > is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 > ? > > Thanks, > L > >