I think so also. But I was wondering if this was Consumer or actual Kafka Broker. But this error displayed on the flink task node where the task was running. The brokers looked fine at the time. I have about a dozen topics which all are single partition except one which is 18. So I really doubt the broker machines ran out of files.
On Mon, 17 Feb 2020 at 05:21, Piotr Nowojski <pi...@ververica.com> wrote: > Hey, sorry but I know very little about the KafkaConsumer. I hope that > someone else might know more. > > However, did you try to google this issue? It doesn’t sound like Flink > specific problem, but like a general Kafka issue. Also a solution might be > just as simple as bumping the limit of opened files on the unix system > (ulimit command if I remember correctly?) > > Piotrek > > On 14 Feb 2020, at 23:35, John Smith <java.dev....@gmail.com> wrote: > > Hi Piotr, any thoughts on this? > > On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas, <kklou...@apache.org> > wrote: > >> Hi John, >> >> As you suggested, I would also lean towards increasing the number of >> allowed open handles, but >> for recommendation on best practices, I am cc'ing Piotr who may be >> more familiar with the Kafka consumer. >> >> Cheers, >> Kostas >> >> On Tue, Feb 11, 2020 at 9:43 PM John Smith <java.dev....@gmail.com> >> wrote: >> > >> > Just wondering is this on the client side in the flink Job? I rebooted >> the task and the job deployed correctly on another node. >> > >> > Is there a specific ulimit that we should set for flink tasks nodes? >> > >> > org.apache.kafka.common.KafkaException: Failed to construct kafka >> consumer >> > at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799) >> > at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650) >> > at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630) >> > at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) >> > at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) >> > at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504) >> > at >> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >> > at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> > at java.lang.Thread.run(Thread.java:748) >> > Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: >> Too many open files >> > at org.apache.kafka.common.network.Selector.<init>(Selector.java:154) >> > at org.apache.kafka.common.network.Selector.<init>(Selector.java:188) >> > at org.apache.kafka.common.network.Selector.<init>(Selector.java:192) >> > at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722) >> > ... 11 more >> > Caused by: java.io.IOException: Too many open files >> > at sun.nio.ch.IOUtil.makePipe(Native Method) >> > at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65) >> > at sun.nio.ch >> .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) >> > at java.nio.channels.Selector.open(Selector.java:227) >> > at org.apache.kafka.common.network.Selector.<init>(Selector.java:152) >> > ... 14 more >> > >