Got it. So the workaround for now (1.13.2) is to fall back to
FlinkKafkaConsumer if I read you correctly.
Thanks
L

On Wed, Sep 15, 2021 at 2:58 PM Matthias Pohl <matth...@ververica.com>
wrote:

> Hi Lars,
> I guess you are looking
> for execution.checkpointing.checkpoints-after-tasks-finish.enabled [1].
> This configuration parameter is going to be introduced in the upcoming
> Flink 1.14 release.
>
> Best,
> Matthias
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled
>
> On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven <lar...@gmail.com> wrote:
>
>> Using KafkaSource builder with a job parallelism larger than the number
>> of kafka partitions, the job is unable to checkpoint.
>>
>> With a job parallelism of 4, 3 of the tasks are marked as FINISHED for
>> the kafka topic with one partition. For this reason checkpointing seems to
>> be disabled.
>>
>> When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't
>> see this behavior, and all 4 tasks have status RUNNING.
>>
>> Is there any way of using KafkaSource builder ang get the same behavior
>> as FlinkKafkaConsumer for the number of tasks being used ?
>>
>> Code with KafkaSource.builder:
>>
>>     val metadataSource = KafkaSource.builder[Metadata]()
>>       .setBootstrapServers("kafka-server")
>>       .setGroupId("my-group")
>>       .setTopics("my-topic")
>>       .setDeserializer(new MetadataDeserializationSchema)
>>       .setStartingOffsets(OffsetsInitializer.earliest())
>>       .build()
>>
>> Code with FlinkKafkaConsumer:
>>     val metadataSource = new FlinkKafkaConsumer[Metadata](
>>       "my-topic",
>>       new MetadataDeserializationSchema,
>>       "my-server)
>>       .setStartFromEarliest()
>>
>> Thanks in advance,
>> Lars
>>
>

Reply via email to