Using KafkaSource builder with a job parallelism larger than the number of
kafka partitions, the job is unable to checkpoint.

With a job parallelism of 4, 3 of the tasks are marked as FINISHED for the
kafka topic with one partition. For this reason checkpointing seems to be
disabled.

When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't see
this behavior, and all 4 tasks have status RUNNING.

Is there any way of using KafkaSource builder ang get the same behavior as
FlinkKafkaConsumer for the number of tasks being used ?

Code with KafkaSource.builder:

    val metadataSource = KafkaSource.builder[Metadata]()
      .setBootstrapServers("kafka-server")
      .setGroupId("my-group")
      .setTopics("my-topic")
      .setDeserializer(new MetadataDeserializationSchema)
      .setStartingOffsets(OffsetsInitializer.earliest())
      .build()

Code with FlinkKafkaConsumer:
    val metadataSource = new FlinkKafkaConsumer[Metadata](
      "my-topic",
      new MetadataDeserializationSchema,
      "my-server)
      .setStartFromEarliest()

Thanks in advance,
Lars

Reply via email to