Hi,

I would like to refer to the following Stackoverflow issue I found.

https://stackoverflow.com/questions/56496611/apache-beam-kafka-consumer-restarted-over-and-over-again#new-answer

I have the very same issue when developing my Pipeline. Originally the pipeline 
was bound and would read and process a CSV where the name came from a 
parameter. Following some reading up on various patterns for being able to 
process a new incoming file automatically I added a KafkaIO read to the front 
of my pipeline to listen for messages which contain the name of a file to be 
processed and then I pass this on to FileIO etc …. As such my pipeline is now 
unbound.

My pipeline fails using DirectRunner once we have reached the maximum number of 
open files!! Looking at the logging I see a very large number of threads 
(consumers) which seem to be connecting to the Kafka broker which makes no 
sense. I have a topic with a single partition!


So literally 100’s of these. Notice the pool thread numbers and Consumer client 
id's

[INFO ] 2021-05-23 08:37:56.511 [pool-82-thread-1] 
kafka.clients.consumer.internals.SubscriptionState - [Consumer 
clientId=consumer-Reader-0_offset_consumer_1203867505_report-processor--75, 
groupId=Reader-0_offset_consumer_1203867505_report-processor-] Seeking to 
LATEST offset of partition my_topic-0
…..
[INFO ] 2021-05-23 08:37:56.516 [pool-134-thread-1] 
kafka.clients.consumer.internals.SubscriptionState - [Consumer 
clientId=consumer-Reader-0_offset_consumer_1204976634_report-processor—127, 
groupId=Reader-0_offset_consumer_1204976634_report-processor-] Seeking to 
LATEST offset of partition my_topic-0
….
[INFO ] 2021-05-23 08:37:56.517 [pool-48-thread-1] 
kafka.clients.consumer.internals.SubscriptionState - [Consumer 
clientId=consumer-Reader-0_offset_consumer_283343789_report-processor--41, 
groupId=Reader-0_offset_consumer_283343789_report-processor-] Seeking to LATEST 
offset of partition my_topic-0
….
etc ….


So my issue resembles the one which is described in the Stackoverflow and I can 
confirm that switching to a Flink runner resolves the problem but surely there 
is an explanation ? Is there a know bug with Direct Runner ?

Kind thanks,
Serge


Reply via email to