Hi,

I have a Pyspark program that uses *Spark 3.0.1* to read Kafka topic and
write it to Google BigQuery. This works fine on Premise and loops over
micro-batch of data.

 def fetch_data(self):
        self.sc.setLogLevel("ERROR")
        #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT",
"timeissued":"2021-02-23T08:42:23", "price":31.12}
        schema = StructType().add("rowkey", StringType()).add("ticker",
StringType()).add("timeissued", TimestampType()).add("price", FloatType())
        try:
            # construct a streaming dataframe streamingDataFrame that
subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))

            #streamingDataFrame.printSchema()

            """
               "foreach" performs custom write logic on each row and
"foreachBatch" performs custom write logic on each micro-batch through
SendToBigQuery function
                foreachBatch(SendToBigQuery) expects 2 parameters, first:
micro-batch as DataFrame or Dataset and second: unique id for each batch
               Using foreachBatch, we write each micro batch to storage
defined in our custom logic. In this case, we store the output of our
streaming application to Google BigQuery table.
               Note that we are appending data and column "rowkey" is
defined as UUID so it can be used as the primary key
            """
            result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.ticker").alias("ticker") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.price").alias("price")). \
                     withColumn("currency",
lit(config['MDVariables']['currency'])). \
                     withColumn("op_type",
lit(config['MDVariables']['op_type'])). \
                     withColumn("op_time", current_timestamp()). \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     format('console'). \
                     start()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)
        result.awaitTermination()

With this output


-------------------------------------------

Batch: 0

-------------------------------------------

+------+------+----------+-----+--------+-------+-------+

|rowkey|ticker|timeissued|price|currency|op_type|op_time|

+------+------+----------+-----+--------+-------+-------+

+------+------+----------+-----+--------+-------+-------+


-------------------------------------------

Batch: 1

-------------------------------------------

+------------------------------------+------+-------------------+------+--------+-------+-----------------------+

|rowkey                              |ticker|timeissued         |price
|currency|op_type|op_time                |

+------------------------------------+------+-------------------+------+--------+-------+-----------------------+

|35bc0378-a782-4183-999f-561a1dc162aa|MRW   |2021-02-27
17:15:49|300.75|GBP     |1      |2021-02-27 17:16:24.472|

|39c55b09-7f50-43fe-a0a1-f88e5bdd51e1|ORCL  |2021-02-27 17:15:49|23.75
|GBP     |1      |2021-02-27 17:16:24.472|

|22dfaf4f-2335-4658-aa74-3c0e4f05cc46|MKS   |2021-02-27 17:15:49|441.9
|GBP     |1      |2021-02-27 17:16:24.472|


However, GCP offers Dataproc compute servers that use Spark 3.1.1.


The same code is stuck in BatchId 0 and does not move on.



Streaming DataFrame :  True

21/02/27 18:01:09 WARN
org.apache.spark.sql.streaming.StreamingQueryManager:
spark.sql.adaptive.enabled is not supported in streaming
DataFrames/Datasets and will be disabled.

-------------------------------------------

Batch: 0

-------------------------------------------

+------+------+----------+-----+--------+-------+-------+

|rowkey|ticker|timeissued|price|currency|op_type|op_time|

+------+------+----------+-----+--------+-------+-------+

+------+------+----------+-----+--------+-------+-------+

I am getting one additional warning line highlighted above. Does that
signify anything. Also is there anything else I can do to debug it. FYI, I
can see that the data is coming through Kafka topic output

 $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper ctpcluster-m:2181,
ctpcluster-w-0:2181, ctpcluster-w-1:2181 --topic md

"rowkey":"56e9ef90-5113-4731-9f6e-1f91d5849799","ticker":"MSFT",
"timeissued":"2021-02-27T18:20:42", "price":27.02}


In GCP we have zookeepers and Kafka brokers on containers but that should
not matter?


Thanks



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to