Hi Serge,
I posted answer to the SO question, hope that helps. One question - a
frequent creation of consumers should be expected with DirectRunner, but
there should be only a limited number of them at a time. Do you see many
of them present simultaneously? Or are they correctly closed and released?
Jan
On 5/23/21 8:40 AM, Sozonoff Serge wrote:
Hi,
I would like to refer to the following Stackoverflow issue I found.
https://stackoverflow.com/questions/56496611/apache-beam-kafka-consumer-restarted-over-and-over-again#new-answer
I have the very same issue when developing my Pipeline. Originally the
pipeline was bound and would read and process a CSV where the name
came from a parameter. Following some reading up on various patterns
for being able to process a new incoming file automatically I added a
KafkaIO read to the front of my pipeline to listen for messages which
contain the name of a file to be processed and then I pass this on to
FileIO etc …. As such my pipeline is now unbound.
My pipeline fails using DirectRunner once we have reached the maximum
number of open files!! Looking at the logging I see a very large
number of threads (consumers) which seem to be connecting to the Kafka
broker which makes no sense. I have a topic with a single partition!
So literally 100’s of these. Notice the pool thread numbers and
Consumer client id's
[INFO ] 2021-05-23 08:37:56.511 [pool-82-thread-1]
kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-Reader-0_offset_consumer_1203867505_report-processor--75,
groupId=Reader-0_offset_consumer_1203867505_report-processor-] Seeking
to LATEST offset of partition my_topic-0
…..
[INFO ] 2021-05-23 08:37:56.516 [pool-134-thread-1]
kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-Reader-0_offset_consumer_1204976634_report-processor—127,
groupId=Reader-0_offset_consumer_1204976634_report-processor-] Seeking
to LATEST offset of partition my_topic-0
….
[INFO ] 2021-05-23 08:37:56.517 [pool-48-thread-1]
kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-Reader-0_offset_consumer_283343789_report-processor--41,
groupId=Reader-0_offset_consumer_283343789_report-processor-] Seeking
to LATEST offset of partition my_topic-0
….
etc ….
So my issue resembles the one which is described in the Stackoverflow
and I can confirm that switching to a Flink runner resolves the
problem but surely there is an explanation ? Is there a know bug with
Direct Runner ?
Kind thanks,
Serge