With the ols spark streaming (example in Scala), this would have been
easier through RDD. You could read data

val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](streamingContext, kafkaParams, topicsValue)

    dstream.foreachRDD

    { pricesRDD =>

      if (!pricesRDD.isEmpty)  // data exists in RDD

      {

         write to DB
          }


Now with structured streaming in Python, you read data into a dataframe
with reaSstream and load


       schema = StructType().add("rowkey", StringType()).add("ticker",
StringType()).add("timeissued", TimestampType()).add("price", FloatType())

            ds = self.spark \

                .readStream \

                .format("kafka") \

             .......

              .load() \

                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))

       ds2 = ds \

            .select( \

                     col("parsed_value.rowkey").alias("rowkey") \

                   , col("parsed_value.ticker").alias("ticker") \

                   , col("parsed_value.timeissued").alias("timeissued") \

                   , col("parsed_value.price").alias("price")). \

                     withColumn("currency",
lit(config['MDVariables']['currency'])). \

                     withColumn("op_type",
lit(config['MDVariables']['op_type'])). \

                     withColumn("op_time", current_timestamp())

# write to console

      query = ds2. \
                    writeStream. \
                    outputMode("append"). \
                    format("console"). \
                    start()
        ds2.printSchema()


But writing to BigQuery through BigQuery API does not work


 s.writeTableToBQ(ds2, "overwrite",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])


 query.awaitTermination()


So this is the run result and the error


root

 |-- rowkey: string (nullable = true)

 |-- ticker: string (nullable = true)

 |-- timeissued: timestamp (nullable = true)

 |-- price: float (nullable = true)

 |-- currency: string (nullable = false)

 |-- op_type: string (nullable = false)

 |-- op_time: timestamp (nullable = false)


*'write' can not be called on streaming Dataset/DataFrame;, quitting*

I gather need to create RDD from the dataframe or maybe there is another
way to write streaming data to DB directly from the dataframe?

Thanks


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to