Got it. So the workaround for now (1.13.2) is to fall back to FlinkKafkaConsumer if I read you correctly. Thanks L
On Wed, Sep 15, 2021 at 2:58 PM Matthias Pohl <matth...@ververica.com> wrote: > Hi Lars, > I guess you are looking > for execution.checkpointing.checkpoints-after-tasks-finish.enabled [1]. > This configuration parameter is going to be introduced in the upcoming > Flink 1.14 release. > > Best, > Matthias > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled > > On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven <lar...@gmail.com> wrote: > >> Using KafkaSource builder with a job parallelism larger than the number >> of kafka partitions, the job is unable to checkpoint. >> >> With a job parallelism of 4, 3 of the tasks are marked as FINISHED for >> the kafka topic with one partition. For this reason checkpointing seems to >> be disabled. >> >> When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't >> see this behavior, and all 4 tasks have status RUNNING. >> >> Is there any way of using KafkaSource builder ang get the same behavior >> as FlinkKafkaConsumer for the number of tasks being used ? >> >> Code with KafkaSource.builder: >> >> val metadataSource = KafkaSource.builder[Metadata]() >> .setBootstrapServers("kafka-server") >> .setGroupId("my-group") >> .setTopics("my-topic") >> .setDeserializer(new MetadataDeserializationSchema) >> .setStartingOffsets(OffsetsInitializer.earliest()) >> .build() >> >> Code with FlinkKafkaConsumer: >> val metadataSource = new FlinkKafkaConsumer[Metadata]( >> "my-topic", >> new MetadataDeserializationSchema, >> "my-server) >> .setStartFromEarliest() >> >> Thanks in advance, >> Lars >> >