Hi there, We are running apache beam application with flink being the runner.
We use the KafkaIO connector to read from topics: https://beam.apache.org/releases/javadoc/2.19.0/ and we have the following configuration, which enables auto commit of offset, no checkpointing is enabled, and it is performing element wise processing. So we run our application in Flink Job Cluster mode, and if I run the same job twice, meaning start 2 flink job clusters, then I see message being processed twice. My understanding is, upon startup, Flink Job Manager will contact kafka to get the offset for each partition for this consume group, and distribute the task to task managers, and it does not use kafka to manage the consumer group. and when the 2nd job cluster starts up, it does the same thing, so the 1st job cluster is not aware of there are new consumers from the same consume group have joined. But if I add more task managers to the same job cluster, then job manager is aware of more consumers from this consume group has joined, and it will rebalance the partition consumption if needed. Is my understanding correct? Thanks a lot! Eleanore Map<String, Object> consumerConfig = ImmutableMap.<String, Object>builder() .put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup) .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) .put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") .build(); return KafkaIO.<String, JsonNode>read() .withBootstrapServers(kafkaSettings.getBootstrapServers()) .withTopic(topic) .withKeyDeserializer(KeyDeserializer.class) .withValueDeserializerAndCoder(getDeserializer(encoding), new JsonNodeCoder<>()) .withConsumerConfigUpdates(consumerConfig) .withoutMetadata();