I'm trying to read json messages from kafka and store them in hdfs with spark
structured streaming.

I followed the example here:
https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

and when my code looks like this:

    df = spark \
      .read \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
      .option("subscribe", "topic1") \
      .load()
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.writeStream.format("json").option("checkpointLocation",
"some/hdfs/path").start(/data")
Then I get rows with binary values in hdfs.

{"value":"BINARY
DATA","topic":"test_hdfs2","partition":0,"offset":3463075,"timestamp":"2018-07-24T20:51:33.655Z","timestampType":0}

These rows are continually written as expected, but in the binary format.

I found this post:

https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

and I'm trying to implement this example:

schema = StructType().add("a", IntegerType()).add("b", StringType())
df.select( \
  col("key").cast("string"),
  from_json(col("value").cast("string"), schema))
But here I get an odd behvaiur. I have a small file written to hdfs with
multiple empty json rows - {}

and very quickly the jobs fails with the following excption:

18/07/24 22:25:47 ERROR datasources.FileFormatWriter: Aborting job null.
java.lang.IllegalStateException:
hdfs://SOME_PATH/_spark_metadata/399.compact doesn't exist when compacting
batch 409 (compactInterval: 10) at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
at scala.Option.getOrElse(Option.scala:121) at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:173)
at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:172)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:172)
at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
at
org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:64)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:213)
at
org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
18/07/24 22:25:47 ERROR streaming.MicroBatchExecution: Query [id =
4f6c4ebc-f330-4697-b2db-7989b93dfba3, runId =
57575397-9fda-4370-9dcb-4550ae1576ec] terminated with error
org.apache.spark.SparkException: Job aborted. at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
at
org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.IllegalStateException:
hdfs://SOME_PATH/_spark_metadata/399.compact doesn't exist when compacting
batch 409 (compactInterval: 10) at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
at scala.Option.getOrElse(Option.scala:121) at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:173)
at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:172)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:172)
at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
at
org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:64)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:213)
... 17 more

Any idea how to implement this in the right way?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to