Hi Lars, I guess you are looking for execution.checkpointing.checkpoints-after-tasks-finish.enabled [1]. This configuration parameter is going to be introduced in the upcoming Flink 1.14 release.
Best, Matthias [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven <lar...@gmail.com> wrote: > Using KafkaSource builder with a job parallelism larger than the number of > kafka partitions, the job is unable to checkpoint. > > With a job parallelism of 4, 3 of the tasks are marked as FINISHED for the > kafka topic with one partition. For this reason checkpointing seems to be > disabled. > > When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't > see this behavior, and all 4 tasks have status RUNNING. > > Is there any way of using KafkaSource builder ang get the same behavior as > FlinkKafkaConsumer for the number of tasks being used ? > > Code with KafkaSource.builder: > > val metadataSource = KafkaSource.builder[Metadata]() > .setBootstrapServers("kafka-server") > .setGroupId("my-group") > .setTopics("my-topic") > .setDeserializer(new MetadataDeserializationSchema) > .setStartingOffsets(OffsetsInitializer.earliest()) > .build() > > Code with FlinkKafkaConsumer: > val metadataSource = new FlinkKafkaConsumer[Metadata]( > "my-topic", > new MetadataDeserializationSchema, > "my-server) > .setStartFromEarliest() > > Thanks in advance, > Lars >