It is not 100 consumers, the checkpoint is created every 100 records.
So, if your message throughput is high enough, the consumers might be
created really often. But most importantly - DirectRunner is really not
intended for performance sensitive applications. You should use a
different runner for that.
Best,
Jan
On 5/24/21 10:03 AM, Sozonoff Serge wrote:
Hi Jan,
So if I read your SO answer correctly and looking at the Github link
you provided we are talking about ~100 consumers ? Since I am
developing locally with a dockerized minimal Kafka broker it is
possible that this is enough to hit the max open files limit.
Depending on your definition of “limited” I would say there are more
than a limited number present at the same time. If you look at the
below log extract everyone of those “Kafka version: 2.5.0” lines
corresponds to a Kafka consumer instantiation and that’s within a very
short period of time !!
Thanks,
Serge
[INFO ] 2021-05-24 09:53:48.663 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.688 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.803 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.815 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.864 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.871 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.955 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.969 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.046 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.052 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.113 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.128 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.231 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.236 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.278 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.281 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.316 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.321 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.435 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.444 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.486 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.494 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.564 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.575 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.662 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.668 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.725 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.730 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.776 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.782 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.863 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.876 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.935 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.940 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.976 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.979 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.026 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.038 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.107 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.130 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.165 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.169 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.201 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.205 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.261 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.276 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.339 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.343 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.375 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.378 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.409 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.417 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.498 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.509 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.559 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.562 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.589 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.591 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.624 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.628 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.693 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.704 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.775 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.778 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.806 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.808 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.862 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.870 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.940 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.950 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.988 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.990 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.018 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.020 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.046 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.048 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.077 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.083 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.156 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.167 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.226 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.232 [direct-runner-worker]
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
On 24 May 2021 at 09:35:50, Jan Lukavský (je...@seznam.cz
<mailto:je...@seznam.cz>) wrote:
Hi Serge,
I posted answer to the SO question, hope that helps. One question - a
frequent creation of consumers should be expected with DirectRunner,
but there should be only a limited number of them at a time. Do you
see many of them present simultaneously? Or are they correctly closed
and released?
Jan
On 5/23/21 8:40 AM, Sozonoff Serge wrote:
Hi,
I would like to refer to the following Stackoverflow issue I found.
https://stackoverflow.com/questions/56496611/apache-beam-kafka-consumer-restarted-over-and-over-again#new-answer
I have the very same issue when developing my Pipeline. Originally
the pipeline was bound and would read and process a CSV where the
name came from a parameter. Following some reading up on various
patterns for being able to process a new incoming file automatically
I added a KafkaIO read to the front of my pipeline to listen for
messages which contain the name of a file to be processed and then I
pass this on to FileIO etc …. As such my pipeline is now unbound.
My pipeline fails using DirectRunner once we have reached the
maximum number of open files!! Looking at the logging I see a very
large number of threads (consumers) which seem to be connecting to
the Kafka broker which makes no sense. I have a topic with a single
partition!
So literally 100’s of these. Notice the pool thread numbers and
Consumer client id's
[INFO ] 2021-05-23 08:37:56.511 [pool-82-thread-1]
kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-Reader-0_offset_consumer_1203867505_report-processor--75,
groupId=Reader-0_offset_consumer_1203867505_report-processor-]
Seeking to LATEST offset of partition my_topic-0
…..
[INFO ] 2021-05-23 08:37:56.516 [pool-134-thread-1]
kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-Reader-0_offset_consumer_1204976634_report-processor—127,
groupId=Reader-0_offset_consumer_1204976634_report-processor-]
Seeking to LATEST offset of partition my_topic-0
….
[INFO ] 2021-05-23 08:37:56.517 [pool-48-thread-1]
kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-Reader-0_offset_consumer_283343789_report-processor--41,
groupId=Reader-0_offset_consumer_283343789_report-processor-]
Seeking to LATEST offset of partition my_topic-0
….
etc ….
So my issue resembles the one which is described in the
Stackoverflow and I can confirm that switching to a Flink runner
resolves the problem but surely there is an explanation ? Is there a
know bug with Direct Runner ?
Kind thanks,
Serge