I'm trying to read json messages from kafka and store them in hdfs with spark structured streaming.
I followed the example here: https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html and when my code looks like this: df = spark \ .read \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("subscribe", "topic1") \ .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") df.writeStream.format("json").option("checkpointLocation", "some/hdfs/path").start(/data") Then I get rows with binary values in hdfs. {"value":"BINARY DATA","topic":"test_hdfs2","partition":0,"offset":3463075,"timestamp":"2018-07-24T20:51:33.655Z","timestampType":0} These rows are continually written as expected, but in the binary format. I found this post: https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html and I'm trying to implement this example: schema = StructType().add("a", IntegerType()).add("b", StringType()) df.select( \ col("key").cast("string"), from_json(col("value").cast("string"), schema)) But here I get an odd behvaiur. I have a small file written to hdfs with multiple empty json rows - {} and very quickly the jobs fails with the following excption: 18/07/24 22:25:47 ERROR datasources.FileFormatWriter: Aborting job null. java.lang.IllegalStateException: hdfs://SOME_PATH/_spark_metadata/399.compact doesn't exist when compacting batch 409 (compactInterval: 10) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:173) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:172) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:172) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156) at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:64) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:213) at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) 18/07/24 22:25:47 ERROR streaming.MicroBatchExecution: Query [id = 4f6c4ebc-f330-4697-b2db-7989b93dfba3, runId = 57575397-9fda-4370-9dcb-4550ae1576ec] terminated with error org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224) at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Caused by: java.lang.IllegalStateException: hdfs://SOME_PATH/_spark_metadata/399.compact doesn't exist when compacting batch 409 (compactInterval: 10) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:173) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:172) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:172) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156) at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:64) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:213) ... 17 more Any idea how to implement this in the right way? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org