[ https://issues.apache.org/jira/browse/KAFKA-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16162987#comment-16162987 ]
Elliot commented on KAFKA-5649: ------------------------------- Hello. We're encountering the same issue after a short period of our cluster running and receiving events. I've enable the flags as suggested above using the KAFKA_OPTS environment variable to no effect. [2017-09-12 13:54:48,238] INFO Partition [users,38] on broker 3: Expanding ISR from 3 to 3,1 (kafka.cluster.Partition) [2017-09-12 13:54:48,239] INFO Partition [users,17] on broker 3: Expanding ISR from 3 to 3,1 (kafka.cluster.Partition) [2017-09-12 13:54:48,240] WARN Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer) java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59) at org.apache.kafka.common.network.Selector.doClose(Selector.java:580) at org.apache.kafka.common.network.Selector.close(Selector.java:571) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404) at org.apache.kafka.common.network.Selector.poll(Selector.java:326) at kafka.network.Processor.poll(SocketServer.scala:500) at kafka.network.Processor.run(SocketServer.scala:435) at java.lang.Thread.run(Thread.java:745) [2017-09-12 13:54:48,258] INFO Partition [users,19] on broker 3: Expanding ISR from 3 to 3,1 (kafka.cluster.Partition) [2017-09-12 13:54:48,276] INFO Partition [users,32] on broker 3: Expanding ISR from 3 to 3,1 (kafka.cluster.Partition) [2017-09-12 13:54:48,278] INFO Partition [users,4] on broker 3: Expanding ISR from 3 to 3,1 (kafka.cluster.Partition) We found identical errors to this by tracing the time when our main producer's send queue overflowed. > Producer is being closed generating ssl exception > ------------------------------------------------- > > Key: KAFKA-5649 > URL: https://issues.apache.org/jira/browse/KAFKA-5649 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.10.2.1 > Environment: Spark 2.2.0 and kafka 0.10.2.0 > Reporter: Pablo Panero > > On a streaming job using built-in kafka source and sink (over SSL), with I am > getting the following exception: > On a streaming job using built-in kafka source and sink (over SSL), with I > am getting the following exception: > Config of the source: > {code:java} > val df = spark.readStream > .format("kafka") > .option("kafka.bootstrap.servers", config.bootstrapServers) > .option("failOnDataLoss", value = false) > .option("kafka.connections.max.idle.ms", 3600000) > //SSL: this only applies to communication between Spark and Kafka > brokers; you are still responsible for separately securing Spark inter-node > communication. > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.mechanism", "GSSAPI") > .option("kafka.sasl.kerberos.service.name", "kafka") > .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") > .option("kafka.ssl.truststore.password", "changeit") > .option("subscribe", config.topicConfigList.keys.mkString(",")) > .load() > {code} > Config of the sink: > {code:java} > .writeStream > .option("checkpointLocation", > s"${config.checkpointDir}/${topicConfig._1}/") > .format("kafka") > .option("kafka.bootstrap.servers", config.bootstrapServers) > .option("kafka.connections.max.idle.ms", 3600000) > //SSL: this only applies to communication between Spark and Kafka > brokers; you are still responsible for separately securing Spark inter-node > communication. > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.mechanism", "GSSAPI") > .option("kafka.sasl.kerberos.service.name", "kafka") > .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") > .option("kafka.ssl.truststore.password", "changeit") > .start() > {code} > And in some cases it throws the exception making the spark job stuck in that > step. Exception stack trace is the following: > {code:java} > 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731) > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54) > at org.apache.kafka.common.network.Selector.doClose(Selector.java:540) > at org.apache.kafka.common.network.Selector.close(Selector.java:531) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378) > at org.apache.kafka.common.network.Selector.poll(Selector.java:303) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106) > at > org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106) > at > org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157) > at > org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) > at > org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47) > at > org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91) > at > org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91) > at > org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) > at > org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91) > at > org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)