With the ols spark streaming (example in Scala), this would have been easier through RDD. You could read data
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsValue) dstream.foreachRDD { pricesRDD => if (!pricesRDD.isEmpty) // data exists in RDD { write to DB } Now with structured streaming in Python, you read data into a dataframe with reaSstream and load schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType()) ds = self.spark \ .readStream \ .format("kafka") \ ....... .load() \ .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) ds2 = ds \ .select( \ col("parsed_value.rowkey").alias("rowkey") \ , col("parsed_value.ticker").alias("ticker") \ , col("parsed_value.timeissued").alias("timeissued") \ , col("parsed_value.price").alias("price")). \ withColumn("currency", lit(config['MDVariables']['currency'])). \ withColumn("op_type", lit(config['MDVariables']['op_type'])). \ withColumn("op_time", current_timestamp()) # write to console query = ds2. \ writeStream. \ outputMode("append"). \ format("console"). \ start() ds2.printSchema() But writing to BigQuery through BigQuery API does not work s.writeTableToBQ(ds2, "overwrite", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) query.awaitTermination() So this is the run result and the error root |-- rowkey: string (nullable = true) |-- ticker: string (nullable = true) |-- timeissued: timestamp (nullable = true) |-- price: float (nullable = true) |-- currency: string (nullable = false) |-- op_type: string (nullable = false) |-- op_time: timestamp (nullable = false) *'write' can not be called on streaming Dataset/DataFrame;, quitting* I gather need to create RDD from the dataframe or maybe there is another way to write streaming data to DB directly from the dataframe? Thanks *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.