[ https://issues.apache.org/jira/browse/KAFKA-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
navin resolved KAFKA-10682. --------------------------- Resolution: Fixed kafka\config Add listeners = PLAINTEXT://10.53.56.140:9092 > Windows Kafka cluster not reachable via Azure data Bricks > --------------------------------------------------------- > > Key: KAFKA-10682 > URL: https://issues.apache.org/jira/browse/KAFKA-10682 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 2.6.0 > Reporter: navin > Priority: Minor > > We have windows Kafka cluster, > * We enabled inbound and outbound for port 9092/9093 > * Topic return results on local windows cmd used > ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning > --bootstrap-server 10.53.56.140:9092 > * We trying to consume the topic from Azure data bricks > ** Simple ping and telnet works fine and connects to underlying server > *** %sh telnet 10.53.56.140 9092 > *** %sh ping 10.53.56.140 > ** df = spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", "10.53.56.140:9092") \ > .option("subscribe", "SIP.SIP.SHIPMENT") \ > .option("minPartitions", "10") \ > .option("startingOffsets", "earliest") \ > .load() > #df.isStreaming() # Returns True for DataFrames that have streaming sources > df.printSchema() > * > ** Display(df) > On using display command after before amount of time we got below error: > Lost connection to cluster. The notebook may have been detached or the > cluster may have been terminated due to an error in the driver such as an > OutOfMemoryError. > What we see in Logs is below error > 20/11/04 18:23:52 WARN NetworkClient: [Consumer > clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, > > groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] > Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: > null)20/11/04 18:23:52 WARN NetworkClient: [Consumer > clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, > > groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] > Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: > null)java.net.UnknownHostException: Navin.us.corp.tim.com at > java.net.InetAddress.getAllByName0(InetAddress.java:1281) at > java.net.InetAddress.getAllByName(InetAddress.java:1193) at > java.net.InetAddress.getAllByName(InetAddress.java:1127) at > kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) > at > kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) > at > kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) > at > kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) > at > kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949) > at > kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71) > at > kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122) > at > kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010) > at > kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240) > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) > at > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:540) > at > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:602) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at > org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) > at > org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:601) > at > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:538) > at > org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:569) > at > org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:538) > at > org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:300) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151) > at scala.Option.getOrElse(Option.scala:189) at > org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:398) > at scala.Option.getOrElse(Option.scala:189) at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:398) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:276) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:274) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:71) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:391) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at > scala.collection.immutable.Map$Map1.foreach(Map.scala:128) at > scala.collection.TraversableLike.map(TraversableLike.scala:238) at > scala.collection.TraversableLike.map$(TraversableLike.scala:231) at > scala.collection.AbstractTraversable.map(Traversable.scala:108) at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:388) > at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:619) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:384) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:216) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:276) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:274) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:71) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:199) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:193) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:346) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:259) > -- This message was sent by Atlassian Jira (v8.3.4#803005)