Gary, thanks a lot! I completely forgot that parallelism extends over all slots visible to the JobManager! So adding e.g. -p4 to 'flink run' approach should suit my use case just fine, I believe. I'll look deeper into failure recovery with this scheme
Have a great weekend! -Robert >-------- Оригинално писмо -------- >От: Gary Yao g...@data-artisans.com >Относно: Re: all task managers reading from all kafka partitions >До: "r. r." <rob...@abv.bg> >Изпратено на: 18.11.2017 11:28 > > > > > > > Hi Robert, > > > > > > Running a single job does not mean that you are limited to a single JVM. > > > > > > For example, a job with parallelism 4 by default requires 4 task slots > to run. > > > You can provision 4 single slot TaskMangers on different hosts to > connect to the > > > same JobManager. The JobManager can then take your job and distribute the > > > execution on the 4 slots. To learn more about the distributed runtime > > > environment: > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/concepts/runtime.html > > > > > > Regarding your concerns about job failures, a failure in the JobManager > or one > > > of the TaskManagers can bring your job down but Flink has built-in > > > fault-tolerance on different levels. You may want to read up on the > following > > > topics: > > > > > > - Data Streaming Fault Tolerance: > > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html > > > - Restart Strategies: > > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/restart_strategies.html > > > - JobManager High Availability: > > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html > > > > > > Let me know if you have further questions. > > > > > > Best, > > > > > > Gary > > > > > > On Fri, Nov 17, 2017 at 11:11 PM, r. r. > <rob...@abv.bg> wrote: > > > Hmm, but I want single slot task managers and multiple jobs so that if > one job fails it doesn't bring the whole setup (for example 30+ parallel > consumers) down. > What setup would you advise? The job is quite heavy and might bring > the VM down if run with such concurency in one JVM. > > Thanks! > > >-------- Оригинално писмо -------- >От: Gary Yao > g...@data-artisans.com >Относно: Re: all task managers reading from all > kafka partitions >До: "r. r." <rob...@abv.bg> >Изпратено на: 17.11.2017 > 22:58 > > > > > > > > > > > > > > > > > > > > Forgot to hit "reply all" in my last email. > > > > > > > > > > > > > > > On Fri, Nov 17, 2017 at 8:26 PM, Gary Yao > > > < > g...@data-artisans.com> wrote: > > > > > > > > > > > > Hi Robert, > > > > > > > > > > > > > > > To get your desired behavior, you should start a single > job with parallelism set to 4. > > > > > > > > > > > > > > > > > > Flink does not rely on Kafka's consumer groups to > distribute the partitions to the parallel subtasks. > > > > > > > > > Instead, Flink does the assignment of partitions itself > and also tracks and checkpoints the offsets internally. > > > > > > > > > This is needed to achieve exactly-once semantics. > > > > > > > > > > > > > > > > > > The > > > > group.id that you are setting is used for different purposes, e.g., > to track the consumer lag of a job. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > Gary > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 17, 2017 at 7:54 PM, r. r. > > > < > rob...@abv.bg> wrote: > > > > > > > > > Hi it's Flink 1.3.2, Kafka 0.10.2.0 I am starting 1 > JM and 4 TM (with 1 task slot each). Then I deploy 4 times (via ./flink run > -p1 x.jar), job parallelism is set to 1. A new thing I just noticed: if I > start in parallel to the Flink jobs two kafka-console-consumer (with > --consumer-property > group.id=TopicConsumers) and write a msg to Kafka, then one of the > console consumers receives the msg together with both Flink jobs. I though > maybe the Flink consumers didn't receive the group property passed via "flink > run .. -- > group.id TopicConsumers", but no - they do belong to the group as > well: taskmanager_3 | 2017-11-17 18:29:00,750 INFO > > > org.apache.kafka.clients.consumer.ConsumerConfig > - > > > ConsumerConfig values: > > > > > > > > > > > > taskmanager_3 | > > > > auto.commit.interval.ms = 5000 > > > > > > taskmanager_3 | auto.offset.reset = latest > > > > > > taskmanager_3 | bootstrap.servers = [kafka:9092] > > > > > > taskmanager_3 | check.crcs = true > > > > > > taskmanager_3 | > > > > client.id = > > > > > > taskmanager_3 | > > > > connections.max.idle.ms = 540000 > > > > > > taskmanager_3 | enable.auto.commit = true > > > > > > taskmanager_3 | exclude.internal.topics = true > > > > > > taskmanager_3 | fetch.max.bytes = 52428800 > > > > > > taskmanager_3 | > > > > fetch.max.wait.ms = 500 > > > > > > taskmanager_3 | fetch.min.bytes = 1 > > > > > > taskmanager_3 | > > > > group.id = TopicConsumers > > > > > > taskmanager_3 | > > > > heartbeat.interval.ms = 3000 > > > > > > taskmanager_3 | interceptor.classes = null > > > > > > taskmanager_3 | key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > > > > > > taskmanager_3 | max.partition.fetch.bytes = > 1048576 > > > > > > taskmanager_3 | > > > > max.poll.interval.ms = 300000 > > > > > > taskmanager_3 | max.poll.records = 500 > > > > > > taskmanager_3 | > > > > metadata.max.age.ms = 300000 > > > > > > taskmanager_3 | metric.reporters = [] > > > > > > taskmanager_3 | metrics.num.samples = 2 > > > > > > taskmanager_3 | metrics.recording.level = INFO > > > > > > taskmanager_3 | > > > > metrics.sample.window.ms = 30000 > > > > > > taskmanager_3 | partition.assignment.strategy = > [class org.apache.kafka.clients.consumer.RangeAssignor] > > > > > > taskmanager_3 | receive.buffer.bytes = 65536 > > > > > > taskmanager_3 | > > > > reconnect.backoff.ms = 50 > > > > > > taskmanager_3 | > > > > request.timeout.ms = 305000 > > > > > > taskmanager_3 | > > > > retry.backoff.ms = 100 > > > > > > taskmanager_3 | sasl.jaas.config = null > > > > > > taskmanager_3 | sasl.kerberos.kinit.cmd = > /usr/bin/kinit > > > > > > taskmanager_3 | > sasl.kerberos.min.time.before.relogin = 60000 > > > > > > taskmanager_3 | > > > > sasl.kerberos.service.name = null > > > > > > taskmanager_3 | > sasl.kerberos.ticket.renew.jitter = 0.05 > > > > > > taskmanager_3 | > sasl.kerberos.ticket.renew.window.factor = 0.8 > > > > > > taskmanager_3 | sasl.mechanism = GSSAPI > > > > > > taskmanager_3 | security.protocol = PLAINTEXT > > > > > > taskmanager_3 | send.buffer.bytes = 131072 > > > > > > taskmanager_3 | > > > > session.timeout.ms = 10000 > > > > > > taskmanager_3 | ssl.cipher.suites = null > > > > > > taskmanager_3 | ssl.enabled.protocols = > [TLSv1.2, TLSv1.1, TLSv1] > > > > > > taskmanager_3 | > > > ssl.endpoint.identification.algorithm = null > > > > > > taskmanager_3 | ssl.key.password = null > > > > > > taskmanager_3 | ssl.keymanager.algorithm = > SunX509 > > > > > > taskmanager_3 | ssl.keystore.location = null > > > > > > taskmanager_3 | ssl.keystore.password = null > > > > > > taskmanager_3 | ssl.keystore.type = JKS > > > > > > taskmanager_3 | ssl.protocol = TLS > > > > > > taskmanager_3 | ssl.provider = null > > > > > > taskmanager_3 | > ssl.secure.random.implementation = null > > > > > > taskmanager_3 | ssl.trustmanager.algorithm = > PKIX > > > > > > taskmanager_3 | ssl.truststore.location = null > > > > > > taskmanager_3 | ssl.truststore.password = null > > > > > > taskmanager_3 | ssl.truststore.type = JKS > > > > > > taskmanager_3 | value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > > > > > > taskmanager_3 | > > > > > > taskmanager_3 | 2017-11-17 18:29:00,765 WARN > > > org.apache.kafka.clients.consumer.ConsumerConfig > - The > > > configuration 'topic' was supplied but isn't a known > config. > > > > > > taskmanager_3 | 2017-11-17 18:29:00,765 INFO > > > org.apache.kafka.common.utils.AppInfoParser > - Kafka > > > version : 0.10.2.1 > > > > > > taskmanager_3 | 2017-11-17 18:29:00,770 INFO > > > org.apache.kafka.common.utils.AppInfoParser > - Kafka > > > commitId : e89bffd6b2eff799 > > > > > > taskmanager_3 | 2017-11-17 18:29:00,791 INFO > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - > > > Discovered coordinator kafka:9092 (id: > > > 2147482646 rack: null) for group > > > TopicConsumers. > > > > > > > > > > > > > > > > > > I'm running Kafka and Flink jobs in docker > containers, the console-consumers from localhost > > > > > > > > > > > > > > > > > > > > > > > > >-------- Оригинално писмо -------- > > > > > > >От: Gary Yao > > > > g...@data-artisans.com > > > > > > >Относно: Re: all task managers reading from all > kafka partitions > > > > > > >До: "r. r." < > > > > rob...@abv.bg> > > > > > > >Изпратено на: 17.11.2017 20:02 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Robert, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you tell us which Flink version you are > using? > > > > > > > > > > > > > > > > > > > > > Also, are you starting a single job with > parallelism 4 or are you starting several jobs? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Gary > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 17, 2017 at 4:41 PM, r. r. > > > > > > > < > > > > rob...@abv.bg> wrote: > > > > > > > > > > > > > > > > > > > > > Hi > > > > > > > > > > > > > > I have this strange problem: 4 task > managers each with one task slot, attaching to the same Kafka topic which has > 10 partitions. > > > > > > > > > > > > > > When I post a single message to the Kafka > topic it seems that all 4 consumers fetch the message and start processing > (confirmed by TM logs). > > > > > > > > > > > > > > If I run kafka-consumer-groups.sh > --describe --group TopicConsumers it says that only one message was posted to > a single partition. Next message would generally go to another partition. > > > > > > > > > > > > > > In addition, while the Flink jobs are > running on the message, I start two kafka-console-consumer.sh and each would > get only one message, as expected. > > > > > > > > > > > > > > On start each of the Flink TM would post > something that to me reads as if it would read from all partitions: > > > > > > > > > > > > > > 2017-11-17 15:03:38,688 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Got 10 > partitions from these topics: [TopicToConsume] > > > > > > > 2017-11-17 15:03:38,689 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Consumer > is going to read the following topics (with number of partitions): > TopicToConsume (10), > > > > > > > 2017-11-17 15:03:38,689 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 will start reading the following 10 partitions from the > committed group offsets in Kafka: > [KafkaTopicPartition{topic='TopicToConsume', partition=8}, > KafkaTopicPartition{topic='TopicToConsume', partition=9}, > KafkaTopicPartition{topic='TopicToConsume', partition=6}, > KafkaTopicPartition{topic='TopicToConsume', partition=7}, > KafkaTopicPartition{topic='TopicToConsume', partition=4}, > KafkaTopicPartition{topic='TopicToConsume', partition=5}, > KafkaTopicPartition{topic='TopicToConsume', partition=2}, > KafkaTopicPartition{topic='TopicToConsume', partition=3}, > KafkaTopicPartition{topic='TopicToConsume', partition=0}, > KafkaTopicPartition{topic='TopicToConsume', partition=1}] > > > > > > > 2017-11-17 15:03:38,699 INFO > org.apache.kafka.clients.consumer.ConsumerConfig - > ConsumerConfig values: > > > > > > > > > > > > > > > > > > auto.commit.interval.ms = 5000 > > > > > > > auto.offset.reset = latest > > > > > > > > > > > > > > > > > > > > > > > > > > > > Any hints? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >