Hi Lars,
I guess you are looking
for execution.checkpointing.checkpoints-after-tasks-finish.enabled [1].
This configuration parameter is going to be introduced in the upcoming
Flink 1.14 release.

Best,
Matthias

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled

On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven <lar...@gmail.com> wrote:

> Using KafkaSource builder with a job parallelism larger than the number of
> kafka partitions, the job is unable to checkpoint.
>
> With a job parallelism of 4, 3 of the tasks are marked as FINISHED for the
> kafka topic with one partition. For this reason checkpointing seems to be
> disabled.
>
> When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't
> see this behavior, and all 4 tasks have status RUNNING.
>
> Is there any way of using KafkaSource builder ang get the same behavior as
> FlinkKafkaConsumer for the number of tasks being used ?
>
> Code with KafkaSource.builder:
>
>     val metadataSource = KafkaSource.builder[Metadata]()
>       .setBootstrapServers("kafka-server")
>       .setGroupId("my-group")
>       .setTopics("my-topic")
>       .setDeserializer(new MetadataDeserializationSchema)
>       .setStartingOffsets(OffsetsInitializer.earliest())
>       .build()
>
> Code with FlinkKafkaConsumer:
>     val metadataSource = new FlinkKafkaConsumer[Metadata](
>       "my-topic",
>       new MetadataDeserializationSchema,
>       "my-server)
>       .setStartFromEarliest()
>
> Thanks in advance,
> Lars
>

Reply via email to