hi, I have one read stream to consume data from a Kafka topic , and based on an attribute value in each of the incoming messages, I have to write data to either of the 2 different locations in S3 (if value1 write to location1, otherwise to location2). On a high level below is what I have for doing that,
Dataset<Row> *kafkaStreamSet* = sparkSession .readStream() .format("kafka") .option("kafka.bootstrap.servers", kafkaBootstrap) .option("subscribe", kafkaTopic) .option("startingOffsets", "latest") .option("failOnDataLoss", false) .option("maxOffsetsPerTrigger", offsetsPerTrigger) .load(); //raw message to ClickStream Dataset<ClickStream> ds1 = *kafkaStreamSet*.mapPartitions(processClickStreamMessages, Encoders.bean(ClickStream.class)); ClickStream.java has 2 child objects within it and only one of them will be populated at a time depending on if the message attribute value is either *value1* or *value2*, 1) BookingRequest.java if *value1*, 2) PropertyPageView.java if *value2* , which I then separate out as below from clickstream to write to 2 diff locations in S3, //fetch BookingRequests in the ClickStream Dataset<BookingRequest> ds2 = ds1.map(filterBookingRequests,Encoders.bean(BookingRequest.class)); //fetch PropertyPageViews in the ClickStream Dataset<PropertyPageView> ds3 = ds1.map(filterPropertyPageViews,Encoders.bean(PropertyPageView.class)); finally ds2 and ds3 are written to 2 different locations , StreamingQuery bookingRequestsParquetStreamWriter = ds2.writeStream().outputMode("append") .format("parquet") .trigger(ProcessingTime.create(bookingRequestProcessingTime, TimeUnit.MILLISECONDS)) .option("checkpointLocation", "s3://" + s3Bucket+ "/checkpoint/bookingRequests") .partitionBy("eventDate") .start("s3://" + s3Bucket+ "/" + bookingRequestPath); StreamingQuery PageViewsParquetStreamWriter = ds3.writeStream().outputMode("append") .format("parquet") .trigger(ProcessingTime.create(pageViewProcessingTime, TimeUnit.MILLISECONDS)) .option("checkpointLocation", "s3://" + s3Bucket+ "/checkpoint/PageViews") .partitionBy("eventDate") .start("s3://" + s3Bucket+ "/" + pageViewPath); bookingRequestsParquetStreamWriter.awaitTermination(); PageViewsParquetStreamWriter.awaitTermination(); it seems to work fine and I see data written to different paths when the apps deployed. But, whenever the job is restarted on failure or on manual stops and starts, it keeps failing with below exception (where userSessionEventJoin.global is my topic name), Caused by: java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got {"userSessionEventJoin.global":{"92":154362528,"101 at org.apache.spark.sql.kafka010.JsonUtils$.partitionOffsets(JsonUtils.scala:74) at org.apache.spark.sql.kafka010.KafkaSourceOffset$.apply(KafkaSourceOffset.scala:59) at org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:134) at org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:123) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:237) at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:138) if I delete all the checkpointing information, then it starts again and starts new checkpointing in the given 2 locations, but that means I have to start processing from the latest offset again and lose all previous offsets. The spark version is 2.1. Please suggest any resolutions, thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StructuredStreaming-org-apache-spark-sql-streaming-StreamingQueryException-tp28749.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org