Hi Benchao, Thanks a lot! Eleanore
On Thu, Feb 20, 2020 at 4:30 PM Benchao Li <libenc...@gmail.com> wrote: > Hi Jin, > > See below inline replies: > > My understanding is, upon startup, Flink Job Manager will contact kafka to >> get the offset for each partition for this consume group, and distribute >> the task to task managers, and it does not use kafka to manage the consumer >> group. > > > Generally, yes. If you are not using checkpoint, and starting from > group-offsets, Flink will read offset from Kafka at startup. > > and when the 2nd job cluster starts up, it does the same thing, so the 1st >> job cluster is not aware of there are new consumers from the same consume >> group have joined. > > > Yes. > > But if I add more task managers to the same job cluster, then job manager >> is aware of more consumers from this consume group has joined, and it will >> rebalance the partition consumption if needed. > > > No. Flink does not rebalance the partitions when new task managers joined > cluster. It only did so when job restarts and job parallelism changes. > > Hope it helps. > > Jin Yi <eleanore....@gmail.com> 于2020年2月21日周五 上午6:14写道: > >> Hi there, >> >> We are running apache beam application with flink being the runner. >> >> We use the KafkaIO connector to read from topics: >> https://beam.apache.org/releases/javadoc/2.19.0/ >> >> and we have the following configuration, which enables auto commit of >> offset, no checkpointing is enabled, and it is performing element wise >> processing. >> >> So we run our application in Flink Job Cluster mode, and if I run the >> same job twice, meaning start 2 flink job clusters, then I see message >> being processed twice. >> >> My understanding is, upon startup, Flink Job Manager will contact kafka >> to get the offset for each partition for this consume group, and distribute >> the task to task managers, and it does not use kafka to manage the consumer >> group. >> >> and when the 2nd job cluster starts up, it does the same thing, so the >> 1st job cluster is not aware of there are new consumers from the same >> consume group have joined. >> >> But if I add more task managers to the same job cluster, then job manager >> is aware of more consumers from this consume group has joined, and it will >> rebalance the partition consumption if needed. >> >> Is my understanding correct? >> >> Thanks a lot! >> Eleanore >> >> Map<String, Object> consumerConfig = ImmutableMap.<String, >> Object>builder() >> .put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup) >> .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) >> .put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") >> .build(); >> >> return KafkaIO.<String, JsonNode>read() >> .withBootstrapServers(kafkaSettings.getBootstrapServers()) >> .withTopic(topic) >> .withKeyDeserializer(KeyDeserializer.class) >> .withValueDeserializerAndCoder(getDeserializer(encoding), new >> JsonNodeCoder<>()) >> .withConsumerConfigUpdates(consumerConfig) >> .withoutMetadata(); >> > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn > >