Are you writing multiple streaming query output to the same location? If so, I can see this error occurring. Multiple streaming queries writing to the same directory is not supported.
On Tue, Jul 24, 2018 at 3:38 PM, dddaaa <[email protected]> wrote: > I'm trying to read json messages from kafka and store them in hdfs with > spark > structured streaming. > > I followed the example here: > https://spark.apache.org/docs/2.1.0/structured-streaming- > kafka-integration.html > > and when my code looks like this: > > df = spark \ > .read \ > .format("kafka") \ > .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ > .option("subscribe", "topic1") \ > .load() > df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > df.writeStream.format("json").option("checkpointLocation", > "some/hdfs/path").start(/data") > Then I get rows with binary values in hdfs. > > {"value":"BINARY > DATA","topic":"test_hdfs2","partition":0,"offset":3463075, > "timestamp":"2018-07-24T20:51:33.655Z","timestampType":0} > > These rows are continually written as expected, but in the binary format. > > I found this post: > > https://databricks.com/blog/2017/04/26/processing-data-in- > apache-kafka-with-structured-streaming-in-apache-spark-2-2.html > > and I'm trying to implement this example: > > schema = StructType().add("a", IntegerType()).add("b", StringType()) > df.select( \ > col("key").cast("string"), > from_json(col("value").cast("string"), schema)) > But here I get an odd behvaiur. I have a small file written to hdfs with > multiple empty json rows - {} > > and very quickly the jobs fails with the following excption: > > 18/07/24 22:25:47 ERROR datasources.FileFormatWriter: Aborting job null. > java.lang.IllegalStateException: > hdfs://SOME_PATH/_spark_metadata/399.compact doesn't exist when compacting > batch 409 (compactInterval: 10) at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$ > anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174) > at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$ > anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174) > at scala.Option.getOrElse(Option.scala:121) at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$ > anonfun$4.apply(CompactibleFileStreamLog.scala:173) > at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$ > anonfun$4.apply(CompactibleFileStreamLog.scala:172) > at > scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact( > CompactibleFileStreamLog.scala:172) > at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add( > CompactibleFileStreamLog.scala:156) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol. > commitJob(ManifestFileCommitProtocol.scala:64) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write( > FileFormatWriter.scala:213) > at > org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch( > FileStreamSink.scala:123) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$ > org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$ > 3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId( > SQLExecution.scala:77) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$ > org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$ > 3.apply(MicroBatchExecution.scala:475) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class. > reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken( > StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$ > apache$spark$sql$execution$streaming$MicroBatchExecution$ > $runBatch(MicroBatchExecution.scala:474) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$ > runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp( > MicroBatchExecution.scala:133) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$ > runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply( > MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$ > runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply( > MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class. > reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken( > StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$ > runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor. > execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution. > runActivatedStream(MicroBatchExecution.scala:117) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$ > spark$sql$execution$streaming$StreamExecution$$runStream( > StreamExecution.scala:279) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run( > StreamExecution.scala:189) > 18/07/24 22:25:47 ERROR streaming.MicroBatchExecution: Query [id = > 4f6c4ebc-f330-4697-b2db-7989b93dfba3, runId = > 57575397-9fda-4370-9dcb-4550ae1576ec] terminated with error > org.apache.spark.SparkException: Job aborted. at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write( > FileFormatWriter.scala:224) > at > org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch( > FileStreamSink.scala:123) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$ > org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$ > 3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId( > SQLExecution.scala:77) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$ > org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$ > 3.apply(MicroBatchExecution.scala:475) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class. > reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken( > StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$ > apache$spark$sql$execution$streaming$MicroBatchExecution$ > $runBatch(MicroBatchExecution.scala:474) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$ > runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp( > MicroBatchExecution.scala:133) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$ > runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply( > MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$ > runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply( > MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class. > reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken( > StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$ > runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor. > execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution. > runActivatedStream(MicroBatchExecution.scala:117) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$ > spark$sql$execution$streaming$StreamExecution$$runStream( > StreamExecution.scala:279) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run( > StreamExecution.scala:189) > Caused by: java.lang.IllegalStateException: > hdfs://SOME_PATH/_spark_metadata/399.compact doesn't exist when compacting > batch 409 (compactInterval: 10) at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$ > anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174) > at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$ > anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174) > at scala.Option.getOrElse(Option.scala:121) at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$ > anonfun$4.apply(CompactibleFileStreamLog.scala:173) > at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$ > anonfun$4.apply(CompactibleFileStreamLog.scala:172) > at > scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact( > CompactibleFileStreamLog.scala:172) > at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add( > CompactibleFileStreamLog.scala:156) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol. > commitJob(ManifestFileCommitProtocol.scala:64) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write( > FileFormatWriter.scala:213) > ... 17 more > > Any idea how to implement this in the right way? > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: [email protected] > >
