Hi Benchao,

Thanks a lot!
Eleanore

On Thu, Feb 20, 2020 at 4:30 PM Benchao Li <libenc...@gmail.com> wrote:

> Hi Jin,
>
> See below inline replies:
>
> My understanding is, upon startup, Flink Job Manager will contact kafka to
>> get the offset for each partition for this consume group, and distribute
>> the task to task managers, and it does not use kafka to manage the consumer
>> group.
>
>
> Generally, yes. If you are not using checkpoint, and starting from
> group-offsets, Flink will read offset from Kafka at startup.
>
> and when the 2nd job cluster starts up, it does the same thing, so the 1st
>> job cluster is not aware of there are new consumers from the same consume
>> group have joined.
>
>
> Yes.
>
> But if I add more task managers to the same job cluster, then job manager
>> is aware of more consumers from this consume group has joined, and it will
>> rebalance the partition consumption if needed.
>
>
> No. Flink does not rebalance the partitions when new task managers joined
> cluster. It only did so when job restarts and job parallelism changes.
>
> Hope it helps.
>
> Jin Yi <eleanore....@gmail.com> 于2020年2月21日周五 上午6:14写道:
>
>> Hi there,
>>
>> We are running apache beam application with flink being the runner.
>>
>> We use the KafkaIO connector to read from topics:
>> https://beam.apache.org/releases/javadoc/2.19.0/
>>
>> and we have the following configuration, which enables auto commit of
>> offset, no checkpointing is enabled, and it is performing element wise
>> processing.
>>
>> So we run our application in Flink Job Cluster mode, and if I run the
>> same job twice, meaning start 2 flink job clusters, then I see message
>> being processed twice.
>>
>> My understanding is, upon startup, Flink Job Manager will contact kafka
>> to get the offset for each partition for this consume group, and distribute
>> the task to task managers, and it does not use kafka to manage the consumer
>> group.
>>
>> and when the 2nd job cluster starts up, it does the same thing, so the
>> 1st job cluster is not aware of there are new consumers from the same
>> consume group have joined.
>>
>> But if I add more task managers to the same job cluster, then job manager
>> is aware of more consumers from this consume group has joined, and it will
>> rebalance the partition consumption if needed.
>>
>> Is my understanding correct?
>>
>> Thanks a lot!
>> Eleanore
>>
>> Map<String, Object> consumerConfig = ImmutableMap.<String,
>> Object>builder()
>> .put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
>> .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
>> .put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
>> .build();
>>
>> return KafkaIO.<String, JsonNode>read()
>> .withBootstrapServers(kafkaSettings.getBootstrapServers())
>> .withTopic(topic)
>> .withKeyDeserializer(KeyDeserializer.class)
>> .withValueDeserializerAndCoder(getDeserializer(encoding), new
>> JsonNodeCoder<>())
>> .withConsumerConfigUpdates(consumerConfig)
>> .withoutMetadata();
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>

Reply via email to