If we are shutting down any sources of unbounded jobs that run on Flink
versions without FLIP-147 (available in 1.14) [1], that Matthias has
mentioned, than it's IMO a bug, because it effectively breaks
checkpointing. Fabian, can you please verify whether this is an intended
behavior?

In the meantime, another workaround could be simply keeping the source
parallelism lower or equal the number of Kafka partitions.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

Best,
D.

On Wed, Sep 15, 2021 at 9:00 PM Lars Skjærven <lar...@gmail.com> wrote:

> Got it. So the workaround for now (1.13.2) is to fall back to
> FlinkKafkaConsumer if I read you correctly.
> Thanks
> L
>
> On Wed, Sep 15, 2021 at 2:58 PM Matthias Pohl <matth...@ververica.com>
> wrote:
>
>> Hi Lars,
>> I guess you are looking
>> for execution.checkpointing.checkpoints-after-tasks-finish.enabled [1].
>> This configuration parameter is going to be introduced in the upcoming
>> Flink 1.14 release.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled
>>
>> On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven <lar...@gmail.com> wrote:
>>
>>> Using KafkaSource builder with a job parallelism larger than the number
>>> of kafka partitions, the job is unable to checkpoint.
>>>
>>> With a job parallelism of 4, 3 of the tasks are marked as FINISHED for
>>> the kafka topic with one partition. For this reason checkpointing seems to
>>> be disabled.
>>>
>>> When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't
>>> see this behavior, and all 4 tasks have status RUNNING.
>>>
>>> Is there any way of using KafkaSource builder ang get the same behavior
>>> as FlinkKafkaConsumer for the number of tasks being used ?
>>>
>>> Code with KafkaSource.builder:
>>>
>>>     val metadataSource = KafkaSource.builder[Metadata]()
>>>       .setBootstrapServers("kafka-server")
>>>       .setGroupId("my-group")
>>>       .setTopics("my-topic")
>>>       .setDeserializer(new MetadataDeserializationSchema)
>>>       .setStartingOffsets(OffsetsInitializer.earliest())
>>>       .build()
>>>
>>> Code with FlinkKafkaConsumer:
>>>     val metadataSource = new FlinkKafkaConsumer[Metadata](
>>>       "my-topic",
>>>       new MetadataDeserializationSchema,
>>>       "my-server)
>>>       .setStartFromEarliest()
>>>
>>> Thanks in advance,
>>> Lars
>>>
>>

Reply via email to