Hey, sorry but I know very little about the KafkaConsumer. I hope that someone 
else might know more.

However, did you try to google this issue? It doesn’t sound like Flink specific 
problem, but like a general Kafka issue. Also a solution might be just as 
simple as bumping the limit of opened files on the unix system (ulimit command 
if I remember correctly?)

Piotrek

> On 14 Feb 2020, at 23:35, John Smith <java.dev....@gmail.com> wrote:
> 
> Hi Piotr, any thoughts on this?
> 
> On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas, <kklou...@apache.org 
> <mailto:kklou...@apache.org>> wrote:
> Hi John,
> 
> As you suggested, I would also lean towards increasing the number of
> allowed open handles, but
> for recommendation on best practices, I am cc'ing Piotr who may be
> more familiar with the Kafka consumer.
> 
> Cheers,
> Kostas
> 
> On Tue, Feb 11, 2020 at 9:43 PM John Smith <java.dev....@gmail.com 
> <mailto:java.dev....@gmail.com>> wrote:
> >
> > Just wondering is this on the client side in the flink Job? I rebooted the 
> > task and the job deployed correctly on another node.
> >
> > Is there a specific ulimit that we should set for flink tasks nodes?
> >
> > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> > at 
> > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
> > at 
> > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
> > at 
> > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
> > at 
> > org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> > at 
> > org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> > at 
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
> > at 
> > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> > at 
> > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too 
> > many open files
> > at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
> > at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
> > at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
> > at 
> > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722)
> > ... 11 more
> > Caused by: java.io.IOException: Too many open files
> > at sun.nio.ch.IOUtil.makePipe(Native Method)
> > at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> > at sun.nio.ch 
> > <http://sun.nio.ch/>.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> > at java.nio.channels.Selector.open(Selector.java:227)
> > at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
> > ... 14 more

Reply via email to