Hi,

Trying to make PySpark with PyCharm work with Structured Streaming

spark-3.0.1-bin-hadoop3.2
kafka_2.12-1.1.0

Basic code

from __future__ import print_function
from src.config import config, hive_url
import sys
from sparkutils import sparkstuff as s

class MDStreaming:
    def __init__(self, spark_session,spark_context):
        self.spark = spark_session
        self.sc = spark_context
        self.config = config

    def startStreaming(self):
        self.sc.setLogLevel("ERROR")
        try:
            kafkaReaderWithHeaders = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "earliest") \
                .load()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

        kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)",
"CAST(value AS STRING)", "headers") \
            .writeStream \
            .format("console") \
            .option("truncate","false") \
            .start() \
            .awaitTermination()
        kafkaReaderWithHeaders.printSchema()

if __name__ == "__main__":
    appName = config['common']['appName']
    spark_session = s.spark_session(appName)
    spark_context = s.sparkcontext()
    mdstreaming = MDStreaming(spark_session, spark_context)
    mdstreaming.startStreaming()

I have used the following jars in $SYBASE_HOME/jars

  spark-sql-kafka-0-10_2.12-3.0.1.jar
 kafka-clients-2.7.0.jar
 spark-streaming-kafka-0-10_2.12-3.0.1.jar
 spark-token-provider-kafka-0-10_2.12-3.0.1.jar

and also in $SPARK_HOME/conf/spark-defaults.conf

spark.driver.extraClassPath        $SPARK_HOME/jars/*.jar
spark.executor.extraClassPath      $SPARK_HOME/jars/*.jar


The error is this:

2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0 in
stage 0.0 (TID 3)
*java.lang.NoClassDefFoundError: Could not initialize class
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$*
at
org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52)
at
org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40)
at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

pyspark.sql.utils.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = 0706dcd1-01de-4d7f-a362-81257b45e38c, runId =
d61d9807-6f6c-4de1-a60f-8ae31c8a3c36]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[md]]:
{"md":{"8":1905351,"2":1907338,"5":1905175,"4":1904978,"7":1907880,"1":1903797,"3":1906072,"6":1904936,"0":1903896}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false]
+- Project [cast(key#8 as string) AS key#24, cast(value#9 as string) AS
value#25, headers#15]
   +- StreamingDataSourceV2Relation [key#8, value#9, topic#10,
partition#11, offset#12L, timestamp#13, timestampType#14, headers#15],
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1cf1e26d,
KafkaV2[Subscribe[md]]

Process finished with exit code 1

The thing is that the class is in the jar file below in $SPARK_HOME/jars


find $SPARK_HOME/jars/  -name "*.jar" | xargs grep
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer


Binary file jars/spark-sql-kafka-0-10_2.12-3.0.1.jar matches

Appreciate any feedback.


Thanks


Mich





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to