Thanks for the stack traces Lars. With them I could confirm that the problem should be fixed with FLINK-20114 [1]. The fixes will be contained in the 1.12.4 and 1.13.0 release. Sorry for the inconveniences.
[1] https://issues.apache.org/jira/browse/FLINK-20114 Cheers, Till On Thu, Apr 29, 2021 at 8:30 PM Lars Skjærven <lars.skjer...@tv2.no> wrote: > Unfortunately, I only have the truncated stack trace available (from the > flink UI). > L > > > 2021-04-27 16:32:02 > java.lang.Exception: Could not perform checkpoint 10 for operator Source: > progress source (4/6)#9. > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:924) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:885) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not > complete snapshot 10 for operator Source: progress source (4/6)#9. Failure > reason: Checkpoint was declined. > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912) > ... 10 more > Caused by: java.lang.IllegalArgumentException: Invalid negative offset > at > org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:50) > at > org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:69) > at > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.snapshotState(KafkaSourceReader.java:96) > at > org.apache.flink.streaming.api.operators.SourceOperator.snapshotState(SourceOperator.java:288) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205) > ... 20 more > > > > ------------------------------ > *From:* Till Rohrmann <trohrm...@apache.org> > *Sent:* Thursday, April 29, 2021 18:44 > *To:* Lars Skjærven <lars.skjer...@tv2.no> > *Cc:* Becket Qin <becket....@gmail.com>; user@flink.apache.org < > user@flink.apache.org> > *Subject:* Re: KafkaSourceBuilder causing invalid negative offset on > checkpointing > > Thanks for the additional information Lars. Could you maybe also share the > full stack traces of the errors you see when the checkpoint fails? > > @Becket Qin <becket....@gmail.com> is it a known issue with the new Kafka > sources trying to checkpoint negative offsets? > > Cheers, > Till > > On Thu, Apr 29, 2021 at 1:06 PM Lars Skjærven <lars.skjer...@tv2.no> > wrote: > > Thanks Till. > > Here is how we created the KafkaSource: > > val sensorSource = KafkaSource.builder[SensorInput]() > .setBootstrapServers(myConfig.kafkaBrokers) > .setGroupId(myConfig.kafkaGroupId) > .setTopics(myConfig.kafkaTopicIn) > .setDeserializer(new SensorInputPBDeserializationSchema) > .setStartingOffsets(OffsetsInitializer.earliest()) > .build() > > The stream was built with > > env.fromSource(sensorSource , WatermarkStrategy. > forMonotonousTimestamps(), "sensor events") > > The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer > that does SensorInputPB.parseFrom(record.value()) and finally > collector.collect(v) > > From here on we're doing a keyed windowed aggregation with .keyBy(...). > window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregate(new > SensorEventAggregator) > > L > > > ------------------------------ > *From:* Till Rohrmann <trohrm...@apache.org> > *Sent:* Thursday, April 29, 2021 09:16 > *To:* Lars Skjærven <lars.skjer...@tv2.no>; Becket Qin < > becket....@gmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: KafkaSourceBuilder causing invalid negative offset on > checkpointing > > Hi Lars, > > The KafkaSourceBuilder constructs the new KafkaSource which has not been > fully hardened in 1.12.2. In fact, it should not be documented yet. I think > you are running into an instability/bug of. The new Kafka source should be > hardened a lot more in the 1.13.0 release. > > Could you tell us exactly how you created the KafkaSource so that we can > verify that this problem has been properly fixed in the 1.13.0 release? I > am also pulling in Becket who is the original author of this connector. He > might be able to tell you more. > > Cheers, > Till > > On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven <lars.skjer...@tv2.no> > wrote: > > Hello, > I ran into some issues when using the new KafkaSourceBuilder (running > Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java > 8). Initially it generated warnings on kafka configuration, but the job was > able to consume and produce messages. > > The configuration 'client.id.prefix' was supplied but isn't a known config. > The configuration 'partition.discovery.interval.ms' was supplied but isn't a > known config. > > > Finally the job crashed with a checkpointing error: > > java.lang.Exception: Could not perform checkpoint 10 for operator Source: > progress source (4/6)#9. > .... > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not > complete snapshot 10 for operator Source: progress source (4/6)#9. Failure > reason: Checkpoint was declined. > ... > Caused by: java.lang.IllegalArgumentException: Invalid negative offset > > > > Switching back to using FlinkKafkaConsumer, the warnings on the kafka > config disapeared, and the job was able to checkpoint successfully. > > I'm wondering if the warnings and the errors are connected, and if there > is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 > ? > > Thanks, > L > >