You should include commons-pool2-2.9.0.jar and remove spark-streaming-kafka-0-10_2.12-3.0.1.jar (unnecessary jar).
On Mon, Feb 22, 2021 at 12:42 PM Mich Talebzadeh <[email protected]> wrote: > Hi, > > Trying to make PySpark with PyCharm work with Structured Streaming > > spark-3.0.1-bin-hadoop3.2 > kafka_2.12-1.1.0 > > Basic code > > from __future__ import print_function > from src.config import config, hive_url > import sys > from sparkutils import sparkstuff as s > > class MDStreaming: > def __init__(self, spark_session,spark_context): > self.spark = spark_session > self.sc = spark_context > self.config = config > > def startStreaming(self): > self.sc.setLogLevel("ERROR") > try: > kafkaReaderWithHeaders = self.spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", > config['MDVariables']['bootstrapServers'],) \ > .option("schema.registry.url", > config['MDVariables']['schemaRegistryURL']) \ > .option("group.id", config['common']['appName']) \ > .option("zookeeper.connection.timeout.ms", > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ > .option("rebalance.backoff.ms", > config['MDVariables']['rebalanceBackoffMS']) \ > .option("zookeeper.session.timeout.ms", > config['MDVariables']['zookeeperSessionTimeOutMs']) \ > .option("auto.commit.interval.ms", > config['MDVariables']['autoCommitIntervalMS']) \ > .option("subscribe", config['MDVariables']['topic']) \ > .option("failOnDataLoss", "false") \ > .option("includeHeaders", "true") \ > .option("startingOffsets", "earliest") \ > .load() > except Exception as e: > print(f"""{e}, quitting""") > sys.exit(1) > > kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)", > "CAST(value AS STRING)", "headers") \ > .writeStream \ > .format("console") \ > .option("truncate","false") \ > .start() \ > .awaitTermination() > kafkaReaderWithHeaders.printSchema() > > if __name__ == "__main__": > appName = config['common']['appName'] > spark_session = s.spark_session(appName) > spark_context = s.sparkcontext() > mdstreaming = MDStreaming(spark_session, spark_context) > mdstreaming.startStreaming() > > I have used the following jars in $SYBASE_HOME/jars > > spark-sql-kafka-0-10_2.12-3.0.1.jar > kafka-clients-2.7.0.jar > spark-streaming-kafka-0-10_2.12-3.0.1.jar > spark-token-provider-kafka-0-10_2.12-3.0.1.jar > > and also in $SPARK_HOME/conf/spark-defaults.conf > > spark.driver.extraClassPath $SPARK_HOME/jars/*.jar > spark.executor.extraClassPath $SPARK_HOME/jars/*.jar > > > The error is this: > > 2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0 in > stage 0.0 (TID 3) > *java.lang.NoClassDefFoundError: Could not initialize class > org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$* > at > org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52) > at > org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40) > at > org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:127) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > pyspark.sql.utils.StreamingQueryException: Writing job aborted. > === Streaming Query === > Identifier: [id = 0706dcd1-01de-4d7f-a362-81257b45e38c, runId = > d61d9807-6f6c-4de1-a60f-8ae31c8a3c36] > Current Committed Offsets: {} > Current Available Offsets: {KafkaV2[Subscribe[md]]: > {"md":{"8":1905351,"2":1907338,"5":1905175,"4":1904978,"7":1907880,"1":1903797,"3":1906072,"6":1904936,"0":1903896}}} > > Current State: ACTIVE > Thread State: RUNNABLE > > Logical Plan: > WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false] > +- Project [cast(key#8 as string) AS key#24, cast(value#9 as string) AS > value#25, headers#15] > +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, > partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], > org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1cf1e26d, > KafkaV2[Subscribe[md]] > > Process finished with exit code 1 > > The thing is that the class is in the jar file below in $SPARK_HOME/jars > > > find $SPARK_HOME/jars/ -name "*.jar" | xargs grep > org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer > > > Binary file jars/spark-sql-kafka-0-10_2.12-3.0.1.jar matches > > Appreciate any feedback. > > > Thanks > > > Mich > > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > >
