Hi
it's Flink 1.3.2, Kafka 0.10.2.0 I am starting 1 JM and 4 TM (with 1 task slot each). Then I deploy 4 times (via ./flink run -p1 x.jar), job parallelism is set to 1. A new thing I just noticed: if I start in parallel to the Flink jobs two kafka-console-consumer (with --consumer-property group.id=TopicConsumers) and write a msg to Kafka, then one of the console consumers receives the msg together with both Flink jobs. I though maybe the Flink consumers didn't receive the group property passed via "flink run .. --group.id TopicConsumers", but no - they do belong to the group as well: taskmanager_3 | 2017-11-17 18:29:00,750 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: taskmanager_3 | auto.commit.interval.ms = 5000 taskmanager_3 | auto.offset.reset = latest taskmanager_3 | bootstrap.servers = [kafka:9092] taskmanager_3 | check.crcs = true taskmanager_3 | client.id = taskmanager_3 | connections.max.idle.ms = 540000 taskmanager_3 | enable.auto.commit = true taskmanager_3 | exclude.internal.topics = true taskmanager_3 | fetch.max.bytes = 52428800 taskmanager_3 | fetch.max.wait.ms = 500 taskmanager_3 | fetch.min.bytes = 1 taskmanager_3 | group.id = TopicConsumers taskmanager_3 | heartbeat.interval.ms = 3000 taskmanager_3 | interceptor.classes = null taskmanager_3 | key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer taskmanager_3 | max.partition.fetch.bytes = 1048576 taskmanager_3 | max.poll.interval.ms = 300000 taskmanager_3 | max.poll.records = 500 taskmanager_3 | metadata.max.age.ms = 300000 taskmanager_3 | metric.reporters = [] taskmanager_3 | metrics.num.samples = 2 taskmanager_3 | metrics.recording.level = INFO taskmanager_3 | metrics.sample.window.ms = 30000 taskmanager_3 | partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] taskmanager_3 | receive.buffer.bytes = 65536 taskmanager_3 | reconnect.backoff.ms = 50 taskmanager_3 | request.timeout.ms = 305000 taskmanager_3 | retry.backoff.ms = 100 taskmanager_3 | sasl.jaas.config = null taskmanager_3 | sasl.kerberos.kinit.cmd = /usr/bin/kinit taskmanager_3 | sasl.kerberos.min.time.before.relogin = 60000 taskmanager_3 | sasl.kerberos.service.name = null taskmanager_3 | sasl.kerberos.ticket.renew.jitter = 0.05 taskmanager_3 | sasl.kerberos.ticket.renew.window.factor = 0.8 taskmanager_3 | sasl.mechanism = GSSAPI taskmanager_3 | security.protocol = PLAINTEXT taskmanager_3 | send.buffer.bytes = 131072 taskmanager_3 | session.timeout.ms = 10000 taskmanager_3 | ssl.cipher.suites = null taskmanager_3 | ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] taskmanager_3 | ssl.endpoint.identification.algorithm = null taskmanager_3 | ssl.key.password = null taskmanager_3 | ssl.keymanager.algorithm = SunX509 taskmanager_3 | ssl.keystore.location = null taskmanager_3 | ssl.keystore.password = null taskmanager_3 | ssl.keystore.type = JKS taskmanager_3 | ssl.protocol = TLS taskmanager_3 | ssl.provider = null taskmanager_3 | ssl.secure.random.implementation = null taskmanager_3 | ssl.trustmanager.algorithm = PKIX taskmanager_3 | ssl.truststore.location = null taskmanager_3 | ssl.truststore.password = null taskmanager_3 | ssl.truststore.type = JKS taskmanager_3 | value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer taskmanager_3 | taskmanager_3 | 2017-11-17 18:29:00,765 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'topic' was supplied but isn't a known config. taskmanager_3 | 2017-11-17 18:29:00,765 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1 taskmanager_3 | 2017-11-17 18:29:00,770 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799 taskmanager_3 | 2017-11-17 18:29:00,791 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator kafka:9092 (id: 2147482646 rack: null) for group TopicConsumers. I'm running Kafka and Flink jobs in docker containers, the console-consumers from localhost >-------- Оригинално писмо -------- >От: Gary Yao g...@data-artisans.com >Относно: Re: all task managers reading from all kafka partitions >До: "r. r." <rob...@abv.bg> >Изпратено на: 17.11.2017 20:02 > > > > > > > Hi Robert, > > > > > > Can you tell us which Flink version you are using? > > > Also, are you starting a single job with parallelism 4 or are you > starting several jobs? > > > > > > Thanks! > > > > > > Gary > > > > > > On Fri, Nov 17, 2017 at 4:41 PM, r. r. > <rob...@abv.bg> wrote: > > > Hi > > I have this strange problem: 4 task managers each with one task slot, > attaching to the same Kafka topic which has 10 partitions. > > When I post a single message to the Kafka topic it seems that all 4 > consumers fetch the message and start processing (confirmed by TM logs). > > If I run kafka-consumer-groups.sh --describe --group TopicConsumers > it says that only one message was posted to a single partition. Next message > would generally go to another partition. > > In addition, while the Flink jobs are running on the message, I start > two kafka-console-consumer.sh and each would get only one message, as > expected. > > On start each of the Flink TM would post something that to me reads as > if it would read from all partitions: > > 2017-11-17 15:03:38,688 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Got 10 > partitions from these topics: [TopicToConsume] > 2017-11-17 15:03:38,689 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Consumer > is going to read the following topics (with number of partitions): > TopicToConsume (10), > 2017-11-17 15:03:38,689 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 will start reading the following 10 partitions from the > committed group offsets in Kafka: > [KafkaTopicPartition{topic='TopicToConsume', partition=8}, > KafkaTopicPartition{topic='TopicToConsume', partition=9}, > KafkaTopicPartition{topic='TopicToConsume', partition=6}, > KafkaTopicPartition{topic='TopicToConsume', partition=7}, > KafkaTopicPartition{topic='TopicToConsume', partition=4}, > KafkaTopicPartition{topic='TopicToConsume', partition=5}, > KafkaTopicPartition{topic='TopicToConsume', partition=2}, > KafkaTopicPartition{topic='TopicToConsume', partition=3}, > KafkaTopicPartition{topic='TopicToConsume', partition=0}, > KafkaTopicPartition{topic='TopicToConsume', partition=1}] > 2017-11-17 15:03:38,699 INFO > org.apache.kafka.clients.consumer.ConsumerConfig - > ConsumerConfig values: > > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > > > > Any hints? > > > > > > > > >