I want to write a structured spark streaming Kafka consumer which reads data from a one partition Kafka topic, repartitions the incoming data by "key" to 3 spark partitions while keeping the messages ordered per key, and writes them to another Kafka topic with 3 partitions.
I used Dataframe.repartition(3, $"key") which I believe uses HashPartitioner. When I executed the query with fixed-batch interval trigger type, I visually verified the output messages were in the expected order. My assumption is that order is not guaranteed on the resulting partition. I am looking to receive some affirmation or veto on my assumption in terms of code pointers in the spark code repo or documentation. I also tried using Dataframe.sortWithinPartitions, however this does not seem to be supported on streaming data frame without aggregation. One option I tried was to convert the Dataframe to RDD and apply repartitionAndSortWithinPartitions which repartitions the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. In this case however, I cannot use the resulting RDD in the query.writestream operation to write the result in the output Kafka topic. 1. Is there a data frame repartitioning API that helps sort the repartitioned data in the streaming context? 2. Are there any other alternatives? 3. Does the default trigger type or fixed-interval trigger type for micro-batch execution provide any sort of message ordering guarantees? 4. Is there any ordering possible in the Continuous trigger type? Incoming data: <http://apache-spark-user-list.1001560.n3.nabble.com/file/t9734/TEVcw.png> Code: case class KVOutput(key: String, ts: Long, value: String, spark_partition: Int) val df = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", kafkaBrokers.get) .option("subscribe", Array(kafkaInputTopic.get).mkString(",")) .option("maxOffsetsPerTrigger",30) .load() val inputDf = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)") val resDf = inputDf.repartition(3, $"key") .select(from_json($"value", schema).as("kv")) .selectExpr("kv.key", "kv.ts", "kv.value") .withColumn("spark_partition", spark_partition_id()) .select($"key", $"ts", $"value", $"spark_partition").as[KVOutput] .sortWithinPartitions($"ts", $"value") .select($"key".cast(StringType).as("key"), to_json(struct($"*")).cast(StringType).as("value")) val query = resDf.writeStream .format("kafka") .option("kafka.bootstrap.servers", kafkaBrokers.get) .option("topic", kafkaOutputTopic.get) .option("checkpointLocation", checkpointLocation.get) .start() Error: When I submit this application, it fails with <http://apache-spark-user-list.1001560.n3.nabble.com/file/t9734/gXWvM.png> -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org