If we are shutting down any sources of unbounded jobs that run on Flink versions without FLIP-147 (available in 1.14) [1], that Matthias has mentioned, than it's IMO a bug, because it effectively breaks checkpointing. Fabian, can you please verify whether this is an intended behavior?
In the meantime, another workaround could be simply keeping the source parallelism lower or equal the number of Kafka partitions. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished Best, D. On Wed, Sep 15, 2021 at 9:00 PM Lars Skjærven <lar...@gmail.com> wrote: > Got it. So the workaround for now (1.13.2) is to fall back to > FlinkKafkaConsumer if I read you correctly. > Thanks > L > > On Wed, Sep 15, 2021 at 2:58 PM Matthias Pohl <matth...@ververica.com> > wrote: > >> Hi Lars, >> I guess you are looking >> for execution.checkpointing.checkpoints-after-tasks-finish.enabled [1]. >> This configuration parameter is going to be introduced in the upcoming >> Flink 1.14 release. >> >> Best, >> Matthias >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled >> >> On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven <lar...@gmail.com> wrote: >> >>> Using KafkaSource builder with a job parallelism larger than the number >>> of kafka partitions, the job is unable to checkpoint. >>> >>> With a job parallelism of 4, 3 of the tasks are marked as FINISHED for >>> the kafka topic with one partition. For this reason checkpointing seems to >>> be disabled. >>> >>> When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't >>> see this behavior, and all 4 tasks have status RUNNING. >>> >>> Is there any way of using KafkaSource builder ang get the same behavior >>> as FlinkKafkaConsumer for the number of tasks being used ? >>> >>> Code with KafkaSource.builder: >>> >>> val metadataSource = KafkaSource.builder[Metadata]() >>> .setBootstrapServers("kafka-server") >>> .setGroupId("my-group") >>> .setTopics("my-topic") >>> .setDeserializer(new MetadataDeserializationSchema) >>> .setStartingOffsets(OffsetsInitializer.earliest()) >>> .build() >>> >>> Code with FlinkKafkaConsumer: >>> val metadataSource = new FlinkKafkaConsumer[Metadata]( >>> "my-topic", >>> new MetadataDeserializationSchema, >>> "my-server) >>> .setStartFromEarliest() >>> >>> Thanks in advance, >>> Lars >>> >>