Structured streaming, Writing Kafka topic to BigQuery table, throws error

2021-02-23 Thread Mich Talebzadeh
With the ols spark streaming (example in Scala), this would have been
easier through RDD. You could read data

val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](streamingContext, kafkaParams, topicsValue)

dstream.foreachRDD

{ pricesRDD =>

  if (!pricesRDD.isEmpty)  // data exists in RDD

  {

 write to DB
  }


Now with structured streaming in Python, you read data into a dataframe
with reaSstream and load


   schema = StructType().add("rowkey", StringType()).add("ticker",
StringType()).add("timeissued", TimestampType()).add("price", FloatType())

ds = self.spark \

.readStream \

.format("kafka") \

 ...

  .load() \

.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))

   ds2 = ds \

.select( \

 col("parsed_value.rowkey").alias("rowkey") \

   , col("parsed_value.ticker").alias("ticker") \

   , col("parsed_value.timeissued").alias("timeissued") \

   , col("parsed_value.price").alias("price")). \

 withColumn("currency",
lit(config['MDVariables']['currency'])). \

 withColumn("op_type",
lit(config['MDVariables']['op_type'])). \

 withColumn("op_time", current_timestamp())

# write to console

  query = ds2. \
writeStream. \
outputMode("append"). \
format("console"). \
start()
ds2.printSchema()


But writing to BigQuery through BigQuery API does not work


 s.writeTableToBQ(ds2, "overwrite",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])


 query.awaitTermination()


So this is the run result and the error


root

 |-- rowkey: string (nullable = true)

 |-- ticker: string (nullable = true)

 |-- timeissued: timestamp (nullable = true)

 |-- price: float (nullable = true)

 |-- currency: string (nullable = false)

 |-- op_type: string (nullable = false)

 |-- op_time: timestamp (nullable = false)


*'write' can not be called on streaming Dataset/DataFrame;, quitting*

I gather need to create RDD from the dataframe or maybe there is another
way to write streaming data to DB directly from the dataframe?

Thanks


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Structured streaming, Writing Kafka topic to BigQuery table, throws error

2021-02-23 Thread Jungtaek Lim
If your code doesn't require "end to end exactly-once" then you could
leverage foreachBatch which enables you to use batch sink.

If your code requires "end to end exactly-once", then well, that's the
different story. I'm not familiar with BigQuery and even have no idea how
sink is implemented, but from quick googling tells me a transaction with
multiple DML isn't supported, so end to end exactly-once cannot be
implemented in any way.

If you ensure the write in the query is idempotent then no matter at all.

On Tue, Feb 23, 2021 at 10:35 PM Mich Talebzadeh 
wrote:

> With the ols spark streaming (example in Scala), this would have been
> easier through RDD. You could read data
>
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](streamingContext, kafkaParams, topicsValue)
>
> dstream.foreachRDD
>
> { pricesRDD =>
>
>   if (!pricesRDD.isEmpty)  // data exists in RDD
>
>   {
>
>  write to DB
>   }
>
>
> Now with structured streaming in Python, you read data into a dataframe
> with reaSstream and load
>
>
>schema = StructType().add("rowkey", StringType()).add("ticker",
> StringType()).add("timeissued", TimestampType()).add("price", FloatType())
>
> ds = self.spark \
>
> .readStream \
>
> .format("kafka") \
>
>  ...
>
>   .load() \
>
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>ds2 = ds \
>
> .select( \
>
>  col("parsed_value.rowkey").alias("rowkey") \
>
>, col("parsed_value.ticker").alias("ticker") \
>
>, col("parsed_value.timeissued").alias("timeissued") \
>
>, col("parsed_value.price").alias("price")). \
>
>  withColumn("currency",
> lit(config['MDVariables']['currency'])). \
>
>  withColumn("op_type",
> lit(config['MDVariables']['op_type'])). \
>
>  withColumn("op_time", current_timestamp())
>
> # write to console
>
>   query = ds2. \
> writeStream. \
> outputMode("append"). \
> format("console"). \
> start()
> ds2.printSchema()
>
>
> But writing to BigQuery through BigQuery API does not work
>
>
>  s.writeTableToBQ(ds2, "overwrite",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>
>
>  query.awaitTermination()
>
>
> So this is the run result and the error
>
>
> root
>
>  |-- rowkey: string (nullable = true)
>
>  |-- ticker: string (nullable = true)
>
>  |-- timeissued: timestamp (nullable = true)
>
>  |-- price: float (nullable = true)
>
>  |-- currency: string (nullable = false)
>
>  |-- op_type: string (nullable = false)
>
>  |-- op_time: timestamp (nullable = false)
>
>
> *'write' can not be called on streaming Dataset/DataFrame;, quitting*
>
> I gather need to create RDD from the dataframe or maybe there is another
> way to write streaming data to DB directly from the dataframe?
>
> Thanks
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>