Hey, sorry but I know very little about the KafkaConsumer. I hope that someone else might know more.
However, did you try to google this issue? It doesn’t sound like Flink specific problem, but like a general Kafka issue. Also a solution might be just as simple as bumping the limit of opened files on the unix system (ulimit command if I remember correctly?) Piotrek > On 14 Feb 2020, at 23:35, John Smith <java.dev....@gmail.com> wrote: > > Hi Piotr, any thoughts on this? > > On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas, <kklou...@apache.org > <mailto:kklou...@apache.org>> wrote: > Hi John, > > As you suggested, I would also lean towards increasing the number of > allowed open handles, but > for recommendation on best practices, I am cc'ing Piotr who may be > more familiar with the Kafka consumer. > > Cheers, > Kostas > > On Tue, Feb 11, 2020 at 9:43 PM John Smith <java.dev....@gmail.com > <mailto:java.dev....@gmail.com>> wrote: > > > > Just wondering is this on the client side in the flink Job? I rebooted the > > task and the job deployed correctly on another node. > > > > Is there a specific ulimit that we should set for flink tasks nodes? > > > > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630) > > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) > > at > > org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) > > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504) > > at > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > at > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too > > many open files > > at org.apache.kafka.common.network.Selector.<init>(Selector.java:154) > > at org.apache.kafka.common.network.Selector.<init>(Selector.java:188) > > at org.apache.kafka.common.network.Selector.<init>(Selector.java:192) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722) > > ... 11 more > > Caused by: java.io.IOException: Too many open files > > at sun.nio.ch.IOUtil.makePipe(Native Method) > > at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65) > > at sun.nio.ch > > <http://sun.nio.ch/>.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) > > at java.nio.channels.Selector.open(Selector.java:227) > > at org.apache.kafka.common.network.Selector.<init>(Selector.java:152) > > ... 14 more