So it's been awhile since I poked at the streaming code base, but I don't think we make an promises about stable sort during repartition, and there's notes in there about how some of these components should be re-written into core so even if we did have stable sort I wouldn't depend on it unless it was in the docs as promise (implementations details can and will change). It's possible I've just missed something in the docs though.
One possible solution I thought of initial but requires complete output mode, would be rather than using a hash partitioner use an range partitioner of the primary key you care about with the second attribute you want to keep in order (although you could get a split on a the primary key that way between partions). Then you can apply a "global" sort which if it matches should not have to do a second shuffle. A kind of ugly approach that I think would work would be to first add partion indexes to the elements, then re-partion, then do a groupBy + custom UDAF which ensure the order within the partion. This is a little ugly but doesn't depend too much on implementation details. You'd do the aggregate on a window the same size of your input window and no waiting for late records. That being said, while we don't support global sort operations on append or update updates for fairly clear reasons, it seems like it might be reasonable to relax this and support sorting within partitions (e.g. non-global) but that will require a code change and we can take that discussion to the dev@ list. On Mon, Dec 3, 2018 at 2:22 PM pmatpadi <pmatp...@gmail.com> wrote: > I want to write a structured spark streaming Kafka consumer which reads > data > from a one partition Kafka topic, repartitions the incoming data by "key" > to > 3 spark partitions while keeping the messages ordered per key, and writes > them to another Kafka topic with 3 partitions. > > I used Dataframe.repartition(3, $"key") which I believe uses > HashPartitioner. > > When I executed the query with fixed-batch interval trigger type, I > visually > verified the output messages were in the expected order. My assumption is > that order is not guaranteed on the resulting partition. I am looking to > receive some affirmation or veto on my assumption in terms of code pointers > in the spark code repo or documentation. > > I also tried using Dataframe.sortWithinPartitions, however this does not > seem to be supported on streaming data frame without aggregation. > > One option I tried was to convert the Dataframe to RDD and apply > repartitionAndSortWithinPartitions which repartitions the RDD according to > the given partitioner and, within each resulting partition, sort records by > their keys. In this case however, I cannot use the resulting RDD in the > query.writestream operation to write the result in the output Kafka topic. > > 1. Is there a data frame repartitioning API that helps sort the > repartitioned data in the streaming context? > 2. Are there any other alternatives? > 3. Does the default trigger type or fixed-interval trigger type for > micro-batch execution provide any sort of message ordering guarantees? > 4. Is there any ordering possible in the Continuous trigger type? > > Incoming data: > > <http://apache-spark-user-list.1001560.n3.nabble.com/file/t9734/TEVcw.png> > > Code: > > case class KVOutput(key: String, ts: Long, value: String, spark_partition: > Int) > > val df = spark.readStream.format("kafka") > .option("kafka.bootstrap.servers", kafkaBrokers.get) > .option("subscribe", Array(kafkaInputTopic.get).mkString(",")) > .option("maxOffsetsPerTrigger",30) > .load() > > val inputDf = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)") > val resDf = inputDf.repartition(3, $"key") > .select(from_json($"value", schema).as("kv")) > .selectExpr("kv.key", "kv.ts", "kv.value") > .withColumn("spark_partition", spark_partition_id()) > .select($"key", $"ts", $"value", $"spark_partition").as[KVOutput] > .sortWithinPartitions($"ts", $"value") > .select($"key".cast(StringType).as("key"), > to_json(struct($"*")).cast(StringType).as("value")) > > val query = resDf.writeStream > .format("kafka") > .option("kafka.bootstrap.servers", kafkaBrokers.get) > .option("topic", kafkaOutputTopic.get) > .option("checkpointLocation", checkpointLocation.get) > .start() > > Error: > > When I submit this application, it fails with > <http://apache-spark-user-list.1001560.n3.nabble.com/file/t9734/gXWvM.png> > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Twitter: https://twitter.com/holdenkarau Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9> YouTube Live Streams: https://www.youtube.com/user/holdenkarau