By default Spark assumes that all fields are nullable when creating a dataframe. Your readStream DF does not make a reference to your json schema. Let me give you an example here:
# create your json schema schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType()) # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data) streamingDataFrame = self.spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \ .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \ .option("group.id", config['common']['appName']) \ .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \ .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \ .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \ .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \ .option("subscribe", config['MDVariables']['topic']) \ .option("failOnDataLoss", "false") \ .option("includeHeaders", "true") \ .option("startingOffsets", "latest") \ .load() \ .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) Now you can do something with like validation, adding default value etc to your streamingDataFrame columns in the select statement before writeStream result = streamingDataFrame.select( \ col("parsed_value.rowkey").alias("rowkey") \ , col("parsed_value.ticker").alias("ticker") \ , col("parsed_value.timeissued").alias("timeissued") \ , col("parsed_value.price").alias("price")). \ * writeStream. \* outputMode('append'). \ option("truncate", "false"). \ foreachBatch(sendToSink). \ trigger(processingTime='30 seconds'). \ option('checkpointLocation', checkpoint_path). \ queryName(config['MDVariables']['topic']). \ start() HTH view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 19 Nov 2021 at 14:28, Xiao, Alton <alton.x...@sap.com.invalid> wrote: > Hello, > > I am struggling with a task that should be super simple: > > I define a structType to load json data from kafka with spark > structed streaming, and some fields may have no value, how can I set a > default value for this record? > > For example: > > *StructType*( > *Array*(*StructField*("a", StringType, nullable = true), > *StructField*("b", StringType, nullable = true), > *StructField*("c", StringType, nullable = true)) > ) > > > > spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "input-topic") > .option("failOnDataLoss", "false") > .load() > > > > df.writeStream > .format(format) > .option("checkpointLocation", checkpoint) > .option("path", path) > .outputMode(OutputMode.*Append*) > .trigger(*ProcessingTime*("10 seconds")) > .start() > > > > If input data has no b, how can I set a default value(xxx) , only use udf? > > > > > > > > >