navin created KAFKA-10682: ----------------------------- Summary: Windows Kafka cluster not reachable via Azure data Bricks Key: KAFKA-10682 URL: https://issues.apache.org/jira/browse/KAFKA-10682 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.6.0 Reporter: navin
We have windows Kafka cluster, * We enabled inbound and outbound for port 9092/9093 * Topic return results on local windows cmd used ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning --bootstrap-server 10.53.56.140:9092 * We trying to consume the topic from Azure data bricks ** df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "10.53.56.140:9092") \ .option("subscribe", "SIP.SIP.SHIPMENT") \ .option("minPartitions", "10") \ .option("startingOffsets", "earliest") \ .load() #df.isStreaming() # Returns True for DataFrames that have streaming sources df.printSchema() ** Display(df) On using display command after before amount of time we got below error: Lost connection to cluster. The notebook may have been detached or the cluster may have been terminated due to an error in the driver such as an OutOfMemoryError. What we see in Logs is below error 20/11/04 18:23:52 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)20/11/04 18:23:52 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)java.net.UnknownHostException: Navin.us.corp.tim.com at java.net.InetAddress.getAllByName0(InetAddress.java:1281) at java.net.InetAddress.getAllByName(InetAddress.java:1193) at java.net.InetAddress.getAllByName(InetAddress.java:1127) at kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) at kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949) at kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71) at kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122) at kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010) at kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161) at kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:540) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:602) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:601) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:538) at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:569) at org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:538) at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:300) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:398) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:398) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:276) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:274) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:71) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:391) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.Map$Map1.foreach(Map.scala:128) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:388) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:619) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:384) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:216) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:276) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:274) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:71) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:199) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:193) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:346) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:259) -- This message was sent by Atlassian Jira (v8.3.4#803005)