You should include commons-pool2-2.9.0.jar and remove
spark-streaming-kafka-0-10_2.12-3.0.1.jar (unnecessary jar).

On Mon, Feb 22, 2021 at 12:42 PM Mich Talebzadeh <[email protected]>
wrote:

> Hi,
>
> Trying to make PySpark with PyCharm work with Structured Streaming
>
> spark-3.0.1-bin-hadoop3.2
> kafka_2.12-1.1.0
>
> Basic code
>
> from __future__ import print_function
> from src.config import config, hive_url
> import sys
> from sparkutils import sparkstuff as s
>
> class MDStreaming:
>     def __init__(self, spark_session,spark_context):
>         self.spark = spark_session
>         self.sc = spark_context
>         self.config = config
>
>     def startStreaming(self):
>         self.sc.setLogLevel("ERROR")
>         try:
>             kafkaReaderWithHeaders = self.spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                 .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
>                 .option("group.id", config['common']['appName']) \
>                 .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>                 .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
>                 .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>                 .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
>                 .option("subscribe", config['MDVariables']['topic']) \
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "earliest") \
>                 .load()
>         except Exception as e:
>                 print(f"""{e}, quitting""")
>                 sys.exit(1)
>
>         kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)",
> "CAST(value AS STRING)", "headers") \
>             .writeStream \
>             .format("console") \
>             .option("truncate","false") \
>             .start() \
>             .awaitTermination()
>         kafkaReaderWithHeaders.printSchema()
>
> if __name__ == "__main__":
>     appName = config['common']['appName']
>     spark_session = s.spark_session(appName)
>     spark_context = s.sparkcontext()
>     mdstreaming = MDStreaming(spark_session, spark_context)
>     mdstreaming.startStreaming()
>
> I have used the following jars in $SYBASE_HOME/jars
>
>   spark-sql-kafka-0-10_2.12-3.0.1.jar
>  kafka-clients-2.7.0.jar
>  spark-streaming-kafka-0-10_2.12-3.0.1.jar
>  spark-token-provider-kafka-0-10_2.12-3.0.1.jar
>
> and also in $SPARK_HOME/conf/spark-defaults.conf
>
> spark.driver.extraClassPath        $SPARK_HOME/jars/*.jar
> spark.executor.extraClassPath      $SPARK_HOME/jars/*.jar
>
>
> The error is this:
>
> 2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0 in
> stage 0.0 (TID 3)
> *java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$*
> at
> org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52)
> at
> org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40)
> at
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:127)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> pyspark.sql.utils.StreamingQueryException: Writing job aborted.
> === Streaming Query ===
> Identifier: [id = 0706dcd1-01de-4d7f-a362-81257b45e38c, runId =
> d61d9807-6f6c-4de1-a60f-8ae31c8a3c36]
> Current Committed Offsets: {}
> Current Available Offsets: {KafkaV2[Subscribe[md]]:
> {"md":{"8":1905351,"2":1907338,"5":1905175,"4":1904978,"7":1907880,"1":1903797,"3":1906072,"6":1904936,"0":1903896}}}
>
> Current State: ACTIVE
> Thread State: RUNNABLE
>
> Logical Plan:
> WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false]
> +- Project [cast(key#8 as string) AS key#24, cast(value#9 as string) AS
> value#25, headers#15]
>    +- StreamingDataSourceV2Relation [key#8, value#9, topic#10,
> partition#11, offset#12L, timestamp#13, timestampType#14, headers#15],
> org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1cf1e26d,
> KafkaV2[Subscribe[md]]
>
> Process finished with exit code 1
>
> The thing is that the class is in the jar file below in $SPARK_HOME/jars
>
>
> find $SPARK_HOME/jars/  -name "*.jar" | xargs grep
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
>
>
> Binary file jars/spark-sql-kafka-0-10_2.12-3.0.1.jar matches
>
> Appreciate any feedback.
>
>
> Thanks
>
>
> Mich
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>

Reply via email to