Hi Robert, Running a single job does not mean that you are limited to a single JVM.
For example, a job with parallelism 4 by default requires 4 task slots to run. You can provision 4 single slot TaskMangers on different hosts to connect to the same JobManager. The JobManager can then take your job and distribute the execution on the 4 slots. To learn more about the distributed runtime environment: https://ci.apache.org/projects/flink/flink-docs-release-1.4/concepts/runtime.html Regarding your concerns about job failures, a failure in the JobManager or one of the TaskManagers can bring your job down but Flink has built-in fault-tolerance on different levels. You may want to read up on the following topics: - Data Streaming Fault Tolerance: https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html - Restart Strategies: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/restart_strategies.html - JobManager High Availability: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html Let me know if you have further questions. Best, Gary On Fri, Nov 17, 2017 at 11:11 PM, r. r. <rob...@abv.bg> wrote: > Hmm, but I want single slot task managers and multiple jobs so that if one > job fails it doesn't bring the whole setup (for example 30+ parallel > consumers) down. > What setup would you advise? The job is quite heavy and might bring the VM > down if run with such concurency in one JVM. > > Thanks! > > > > > > > > >-------- Оригинално писмо -------- > > >От: Gary Yao g...@data-artisans.com > > >Относно: Re: all task managers reading from all kafka partitions > > >До: "r. r." <rob...@abv.bg> > > >Изпратено на: 17.11.2017 22:58 > > > > > > > > > > > > > > > > > > Forgot to hit "reply all" in my last email. > > > > > > > > > > > > > > > On Fri, Nov 17, 2017 at 8:26 PM, Gary Yao > > > <g...@data-artisans.com> wrote: > > > > > > > > > > > > Hi Robert, > > > > > > > > > > > > > > > To get your desired behavior, you should start a single job with > parallelism set to 4. > > > > > > > > > > > > > > > > > > Flink does not rely on Kafka's consumer groups to distribute the > partitions to the parallel subtasks. > > > > > > > > > Instead, Flink does the assignment of partitions itself and also > tracks and checkpoints the offsets internally. > > > > > > > > > This is needed to achieve exactly-once semantics. > > > > > > > > > > > > > > > > > > The > > > group.id that you are setting is used for different purposes, > e.g., to track the consumer lag of a job. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > Gary > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 17, 2017 at 7:54 PM, r. r. > > > <rob...@abv.bg> wrote: > > > > > > > > > Hi it's Flink 1.3.2, Kafka 0.10.2.0 I am starting 1 JM > and 4 TM (with 1 task slot each). Then I deploy 4 times (via ./flink run > -p1 x.jar), job parallelism is set to 1. A new thing I just noticed: if > I start in parallel to the Flink jobs two kafka-console-consumer (with > --consumer-property group.id=TopicConsumers) and write a msg to Kafka, > then one of the console consumers receives the msg together with both Flink > jobs. I though maybe the Flink consumers didn't receive the group property > passed via "flink run .. --group.id TopicConsumers", but no - they do > belong to the group as well: taskmanager_3 | 2017-11-17 18:29:00,750 > INFO > > > org.apache.kafka.clients.consumer.ConsumerConfig > - > > > ConsumerConfig values: > > > > > > > > > > > > taskmanager_3 | > > > auto.commit.interval.ms = 5000 > > > > > > taskmanager_3 | auto.offset.reset = latest > > > > > > taskmanager_3 | bootstrap.servers = [kafka:9092] > > > > > > taskmanager_3 | check.crcs = true > > > > > > taskmanager_3 | > > > client.id = > > > > > > taskmanager_3 | > > > connections.max.idle.ms = 540000 > > > > > > taskmanager_3 | enable.auto.commit = true > > > > > > taskmanager_3 | exclude.internal.topics = true > > > > > > taskmanager_3 | fetch.max.bytes = 52428800 > > > > > > taskmanager_3 | > > > fetch.max.wait.ms = 500 > > > > > > taskmanager_3 | fetch.min.bytes = 1 > > > > > > taskmanager_3 | > > > group.id = TopicConsumers > > > > > > taskmanager_3 | > > > heartbeat.interval.ms = 3000 > > > > > > taskmanager_3 | interceptor.classes = null > > > > > > taskmanager_3 | key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > > > > > > taskmanager_3 | max.partition.fetch.bytes = 1048576 > > > > > > taskmanager_3 | > > > max.poll.interval.ms = 300000 > > > > > > taskmanager_3 | max.poll.records = 500 > > > > > > taskmanager_3 | > > > metadata.max.age.ms = 300000 > > > > > > taskmanager_3 | metric.reporters = [] > > > > > > taskmanager_3 | metrics.num.samples = 2 > > > > > > taskmanager_3 | metrics.recording.level = INFO > > > > > > taskmanager_3 | > > > metrics.sample.window.ms = 30000 > > > > > > taskmanager_3 | partition.assignment.strategy = > [class org.apache.kafka.clients.consumer.RangeAssignor] > > > > > > taskmanager_3 | receive.buffer.bytes = 65536 > > > > > > taskmanager_3 | > > > reconnect.backoff.ms = 50 > > > > > > taskmanager_3 | > > > request.timeout.ms = 305000 > > > > > > taskmanager_3 | > > > retry.backoff.ms = 100 > > > > > > taskmanager_3 | sasl.jaas.config = null > > > > > > taskmanager_3 | sasl.kerberos.kinit.cmd = > /usr/bin/kinit > > > > > > taskmanager_3 | sasl.kerberos.min.time.before.relogin > = 60000 > > > > > > taskmanager_3 | > > > sasl.kerberos.service.name = null > > > > > > taskmanager_3 | sasl.kerberos.ticket.renew.jitter = > 0.05 > > > > > > taskmanager_3 | sasl.kerberos.ticket.renew.window.factor > = 0.8 > > > > > > taskmanager_3 | sasl.mechanism = GSSAPI > > > > > > taskmanager_3 | security.protocol = PLAINTEXT > > > > > > taskmanager_3 | send.buffer.bytes = 131072 > > > > > > taskmanager_3 | > > > session.timeout.ms = 10000 > > > > > > taskmanager_3 | ssl.cipher.suites = null > > > > > > taskmanager_3 | ssl.enabled.protocols = [TLSv1.2, > TLSv1.1, TLSv1] > > > > > > taskmanager_3 | > > > ssl.endpoint.identification.algorithm = null > > > > > > taskmanager_3 | ssl.key.password = null > > > > > > taskmanager_3 | ssl.keymanager.algorithm = SunX509 > > > > > > taskmanager_3 | ssl.keystore.location = null > > > > > > taskmanager_3 | ssl.keystore.password = null > > > > > > taskmanager_3 | ssl.keystore.type = JKS > > > > > > taskmanager_3 | ssl.protocol = TLS > > > > > > taskmanager_3 | ssl.provider = null > > > > > > taskmanager_3 | ssl.secure.random.implementation = > null > > > > > > taskmanager_3 | ssl.trustmanager.algorithm = PKIX > > > > > > taskmanager_3 | ssl.truststore.location = null > > > > > > taskmanager_3 | ssl.truststore.password = null > > > > > > taskmanager_3 | ssl.truststore.type = JKS > > > > > > taskmanager_3 | value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > > > > > > taskmanager_3 | > > > > > > taskmanager_3 | 2017-11-17 18:29:00,765 WARN > > > org.apache.kafka.clients.consumer.ConsumerConfig > - The > > > configuration 'topic' was supplied but isn't a known > config. > > > > > > taskmanager_3 | 2017-11-17 18:29:00,765 INFO > > > org.apache.kafka.common.utils. > AppInfoParser - Kafka > > > version : 0.10.2.1 > > > > > > taskmanager_3 | 2017-11-17 18:29:00,770 INFO > > > org.apache.kafka.common.utils. > AppInfoParser - Kafka > > > commitId : e89bffd6b2eff799 > > > > > > taskmanager_3 | 2017-11-17 18:29:00,791 INFO > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator > - > > > Discovered coordinator kafka:9092 (id: > > > 2147482646 rack: null) for group > > > TopicConsumers. > > > > > > > > > > > > > > > > > > I'm running Kafka and Flink jobs in docker containers, the > console-consumers from localhost > > > > > > > > > > > > > > > > > > > > > > > > >-------- Оригинално писмо -------- > > > > > > >От: Gary Yao > > > g...@data-artisans.com > > > > > > >Относно: Re: all task managers reading from all kafka > partitions > > > > > > >До: "r. r." < > > > rob...@abv.bg> > > > > > > >Изпратено на: 17.11.2017 20:02 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Robert, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you tell us which Flink version you are using? > > > > > > > > > > > > > > > > > > > > > Also, are you starting a single job with > parallelism 4 or are you starting several jobs? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Gary > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 17, 2017 at 4:41 PM, r. r. > > > > > > > < > > > rob...@abv.bg> wrote: > > > > > > > > > > > > > > > > > > > > > Hi > > > > > > > > > > > > > > I have this strange problem: 4 task managers each > with one task slot, attaching to the same Kafka topic which has 10 > partitions. > > > > > > > > > > > > > > When I post a single message to the Kafka topic > it seems that all 4 consumers fetch the message and start processing > (confirmed by TM logs). > > > > > > > > > > > > > > If I run kafka-consumer-groups.sh --describe > --group TopicConsumers it says that only one message was posted to a single > partition. Next message would generally go to another partition. > > > > > > > > > > > > > > In addition, while the Flink jobs are running on > the message, I start two kafka-console-consumer.sh and each would get only > one message, as expected. > > > > > > > > > > > > > > On start each of the Flink TM would post > something that to me reads as if it would read from all partitions: > > > > > > > > > > > > > > 2017-11-17 15:03:38,688 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Got > 10 partitions from these topics: [TopicToConsume] > > > > > > > 2017-11-17 15:03:38,689 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - > Consumer is going to read the following topics (with number of partitions): > TopicToConsume (10), > > > > > > > 2017-11-17 15:03:38,689 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 will start reading the following 10 partitions from the > committed group offsets in Kafka: [KafkaTopicPartition{topic='TopicToConsume', > partition=8}, KafkaTopicPartition{topic='TopicToConsume', partition=9}, > KafkaTopicPartition{topic='TopicToConsume', partition=6}, > KafkaTopicPartition{topic='TopicToConsume', partition=7}, > KafkaTopicPartition{topic='TopicToConsume', partition=4}, > KafkaTopicPartition{topic='TopicToConsume', partition=5}, > KafkaTopicPartition{topic='TopicToConsume', partition=2}, > KafkaTopicPartition{topic='TopicToConsume', partition=3}, > KafkaTopicPartition{topic='TopicToConsume', partition=0}, > KafkaTopicPartition{topic='TopicToConsume', partition=1}] > > > > > > > 2017-11-17 15:03:38,699 INFO > org.apache.kafka.clients.consumer.ConsumerConfig - > ConsumerConfig values: > > > > > > > > > > > > > > > > > auto.commit.interval.ms = 5000 > > > > > > > auto.offset.reset = latest > > > > > > > > > > > > > > > > > > > > > > > > > > > > Any hints? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >