Hmm, but I want single slot task managers and multiple jobs so that if one job fails it doesn't bring the whole setup (for example 30+ parallel consumers) down. What setup would you advise? The job is quite heavy and might bring the VM down if run with such concurency in one JVM.
Thanks! >-------- Оригинално писмо -------- >От: Gary Yao g...@data-artisans.com >Относно: Re: all task managers reading from all kafka partitions >До: "r. r." <rob...@abv.bg> >Изпратено на: 17.11.2017 22:58 > > > > > Forgot to hit "reply all" in my last email. > > > > > On Fri, Nov 17, 2017 at 8:26 PM, Gary Yao > <g...@data-artisans.com> wrote: > > > > Hi Robert, > > > > > To get your desired behavior, you should start a single job with > parallelism set to 4. > > > > > > Flink does not rely on Kafka's consumer groups to distribute the > partitions to the parallel subtasks. > > > Instead, Flink does the assignment of partitions itself and also > tracks and checkpoints the offsets internally. > > > This is needed to achieve exactly-once semantics. > > > > > > The > group.id that you are setting is used for different purposes, e.g., > to track the consumer lag of a job. > > > > > > Best, > > > > > > Gary > > > > > > > > On Fri, Nov 17, 2017 at 7:54 PM, r. r. > <rob...@abv.bg> wrote: > > > Hi it's Flink 1.3.2, Kafka 0.10.2.0 I am starting 1 JM and 4 > TM (with 1 task slot each). Then I deploy 4 times (via ./flink run -p1 > x.jar), job parallelism is set to 1. A new thing I just noticed: if I > start in parallel to the Flink jobs two kafka-console-consumer (with > --consumer-property group.id=TopicConsumers) and write a msg to Kafka, then > one of the console consumers receives the msg together with both Flink jobs. > I though maybe the Flink consumers didn't receive the group property passed > via "flink run .. --group.id TopicConsumers", but no - they do belong to the > group as well: taskmanager_3 | 2017-11-17 18:29:00,750 INFO > org.apache.kafka.clients.consumer.ConsumerConfig - > ConsumerConfig values: > > > > taskmanager_3 | > auto.commit.interval.ms = 5000 > > taskmanager_3 | auto.offset.reset = latest > > taskmanager_3 | bootstrap.servers = [kafka:9092] > > taskmanager_3 | check.crcs = true > > taskmanager_3 | > client.id = > > taskmanager_3 | > connections.max.idle.ms = 540000 > > taskmanager_3 | enable.auto.commit = true > > taskmanager_3 | exclude.internal.topics = true > > taskmanager_3 | fetch.max.bytes = 52428800 > > taskmanager_3 | > fetch.max.wait.ms = 500 > > taskmanager_3 | fetch.min.bytes = 1 > > taskmanager_3 | > group.id = TopicConsumers > > taskmanager_3 | > heartbeat.interval.ms = 3000 > > taskmanager_3 | interceptor.classes = null > > taskmanager_3 | key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > > taskmanager_3 | max.partition.fetch.bytes = 1048576 > > taskmanager_3 | > max.poll.interval.ms = 300000 > > taskmanager_3 | max.poll.records = 500 > > taskmanager_3 | > metadata.max.age.ms = 300000 > > taskmanager_3 | metric.reporters = [] > > taskmanager_3 | metrics.num.samples = 2 > > taskmanager_3 | metrics.recording.level = INFO > > taskmanager_3 | > metrics.sample.window.ms = 30000 > > taskmanager_3 | partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > > taskmanager_3 | receive.buffer.bytes = 65536 > > taskmanager_3 | > reconnect.backoff.ms = 50 > > taskmanager_3 | > request.timeout.ms = 305000 > > taskmanager_3 | > retry.backoff.ms = 100 > > taskmanager_3 | sasl.jaas.config = null > > taskmanager_3 | sasl.kerberos.kinit.cmd = /usr/bin/kinit > > taskmanager_3 | sasl.kerberos.min.time.before.relogin = > 60000 > > taskmanager_3 | > sasl.kerberos.service.name = null > > taskmanager_3 | sasl.kerberos.ticket.renew.jitter = 0.05 > > taskmanager_3 | sasl.kerberos.ticket.renew.window.factor = > 0.8 > > taskmanager_3 | sasl.mechanism = GSSAPI > > taskmanager_3 | security.protocol = PLAINTEXT > > taskmanager_3 | send.buffer.bytes = 131072 > > taskmanager_3 | > session.timeout.ms = 10000 > > taskmanager_3 | ssl.cipher.suites = null > > taskmanager_3 | ssl.enabled.protocols = [TLSv1.2, TLSv1.1, > TLSv1] > > taskmanager_3 | > ssl.endpoint.identification.algorithm = null > > taskmanager_3 | ssl.key.password = null > > taskmanager_3 | ssl.keymanager.algorithm = SunX509 > > taskmanager_3 | ssl.keystore.location = null > > taskmanager_3 | ssl.keystore.password = null > > taskmanager_3 | ssl.keystore.type = JKS > > taskmanager_3 | ssl.protocol = TLS > > taskmanager_3 | ssl.provider = null > > taskmanager_3 | ssl.secure.random.implementation = null > > taskmanager_3 | ssl.trustmanager.algorithm = PKIX > > taskmanager_3 | ssl.truststore.location = null > > taskmanager_3 | ssl.truststore.password = null > > taskmanager_3 | ssl.truststore.type = JKS > > taskmanager_3 | value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > > taskmanager_3 | > > taskmanager_3 | 2017-11-17 18:29:00,765 WARN > org.apache.kafka.clients.consumer.ConsumerConfig - > The > configuration 'topic' was supplied but isn't a known config. > > taskmanager_3 | 2017-11-17 18:29:00,765 INFO > org.apache.kafka.common.utils.AppInfoParser - > Kafka > version : 0.10.2.1 > > taskmanager_3 | 2017-11-17 18:29:00,770 INFO > org.apache.kafka.common.utils.AppInfoParser - > Kafka > commitId : e89bffd6b2eff799 > > taskmanager_3 | 2017-11-17 18:29:00,791 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator > - > Discovered coordinator kafka:9092 (id: > 2147482646 rack: null) for group > TopicConsumers. > > > > > > I'm running Kafka and Flink jobs in docker containers, the > console-consumers from localhost > > > > > > > > >-------- Оригинално писмо -------- > > >От: Gary Yao > g...@data-artisans.com > > >Относно: Re: all task managers reading from all kafka > partitions > > >До: "r. r." < > rob...@abv.bg> > > >Изпратено на: 17.11.2017 20:02 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Robert, > > > > > > > > > > > > > > > > > > Can you tell us which Flink version you are using? > > > > > > > > > Also, are you starting a single job with parallelism 4 > or are you starting several jobs? > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > > > > > > > Gary > > > > > > > > > > > > > > > > > > On Fri, Nov 17, 2017 at 4:41 PM, r. r. > > > < > rob...@abv.bg> wrote: > > > > > > > > > Hi > > > > > > I have this strange problem: 4 task managers each with > one task slot, attaching to the same Kafka topic which has 10 partitions. > > > > > > When I post a single message to the Kafka topic it > seems that all 4 consumers fetch the message and start processing (confirmed > by TM logs). > > > > > > If I run kafka-consumer-groups.sh --describe --group > TopicConsumers it says that only one message was posted to a single > partition. Next message would generally go to another partition. > > > > > > In addition, while the Flink jobs are running on the > message, I start two kafka-console-consumer.sh and each would get only one > message, as expected. > > > > > > On start each of the Flink TM would post something > that to me reads as if it would read from all partitions: > > > > > > 2017-11-17 15:03:38,688 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Got 10 > partitions from these topics: [TopicToConsume] > > > 2017-11-17 15:03:38,689 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Consumer > is going to read the following topics (with number of partitions): > TopicToConsume (10), > > > 2017-11-17 15:03:38,689 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 will start reading the following 10 partitions from the > committed group offsets in Kafka: > [KafkaTopicPartition{topic='TopicToConsume', partition=8}, > KafkaTopicPartition{topic='TopicToConsume', partition=9}, > KafkaTopicPartition{topic='TopicToConsume', partition=6}, > KafkaTopicPartition{topic='TopicToConsume', partition=7}, > KafkaTopicPartition{topic='TopicToConsume', partition=4}, > KafkaTopicPartition{topic='TopicToConsume', partition=5}, > KafkaTopicPartition{topic='TopicToConsume', partition=2}, > KafkaTopicPartition{topic='TopicToConsume', partition=3}, > KafkaTopicPartition{topic='TopicToConsume', partition=0}, > KafkaTopicPartition{topic='TopicToConsume', partition=1}] > > > 2017-11-17 15:03:38,699 INFO > org.apache.kafka.clients.consumer.ConsumerConfig - > ConsumerConfig values: > > > > > > > auto.commit.interval.ms = 5000 > > > auto.offset.reset = latest > > > > > > > > > > > > Any hints? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >