So it's been awhile since I poked at the streaming code base, but I don't
think we make an promises about stable sort during repartition, and there's
notes in there about how some of these components should be re-written into
core so even if we did have stable sort I wouldn't depend on it unless it
was in the docs as promise (implementations details can and will change).
It's possible I've just missed something in the docs though.


One possible solution I thought of initial but requires complete output
mode, would be rather than using a hash partitioner use an range
partitioner of the primary key you care about with the second attribute you
want to keep in order (although you could get a split on a the primary key
that way between partions). Then you can apply a "global" sort which if it
matches should not have to do a second shuffle.

A kind of ugly approach that I think would work would be to first add
partion indexes to the elements, then re-partion, then do a groupBy +
custom UDAF which ensure the order within the partion. This is a little
ugly but doesn't depend too much on implementation details. You'd do the
aggregate on a window the same size of your input window and no waiting for
late records.

That being said, while we don't support global sort operations on append or
update updates for fairly clear reasons, it seems like it might be
reasonable to relax this and support sorting within partitions (e.g.
non-global) but that will require a code change and we can take that
discussion to the dev@ list.

On Mon, Dec 3, 2018 at 2:22 PM pmatpadi <pmatp...@gmail.com> wrote:

> I want to write a structured spark streaming Kafka consumer which reads
> data
> from a one partition Kafka topic, repartitions the incoming data by "key"
> to
> 3 spark partitions while keeping the messages ordered per key, and writes
> them to another Kafka topic with 3 partitions.
>
> I used Dataframe.repartition(3, $"key") which I believe uses
> HashPartitioner.
>
> When I executed the query with fixed-batch interval trigger type, I
> visually
> verified the output messages were in the expected order. My assumption is
> that order is not guaranteed on the resulting partition. I am looking to
> receive some affirmation or veto on my assumption in terms of code pointers
> in the spark code repo or documentation.
>
> I also tried using Dataframe.sortWithinPartitions, however this does not
> seem to be supported on streaming data frame without aggregation.
>
> One option I tried was to convert the Dataframe to RDD and apply
> repartitionAndSortWithinPartitions which repartitions the RDD according to
> the given partitioner and, within each resulting partition, sort records by
> their keys. In this case however, I cannot use the resulting RDD in the
> query.writestream operation to write the result in the output Kafka topic.
>
> 1. Is there a data frame repartitioning API that helps sort the
> repartitioned data in the streaming context?
> 2. Are there any other alternatives?
> 3. Does the default trigger type or fixed-interval trigger type for
> micro-batch execution provide any sort of message ordering guarantees?
> 4. Is there any ordering possible in the Continuous trigger type?
>
> Incoming data:
>
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/t9734/TEVcw.png>
>
> Code:
>
> case class KVOutput(key: String, ts: Long, value: String, spark_partition:
> Int)
>
> val df = spark.readStream.format("kafka")
>   .option("kafka.bootstrap.servers", kafkaBrokers.get)
>   .option("subscribe", Array(kafkaInputTopic.get).mkString(","))
>   .option("maxOffsetsPerTrigger",30)
>   .load()
>
> val inputDf = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
> val resDf = inputDf.repartition(3, $"key")
>   .select(from_json($"value", schema).as("kv"))
>   .selectExpr("kv.key", "kv.ts", "kv.value")
>   .withColumn("spark_partition", spark_partition_id())
>   .select($"key", $"ts", $"value", $"spark_partition").as[KVOutput]
>   .sortWithinPartitions($"ts", $"value")
>   .select($"key".cast(StringType).as("key"),
> to_json(struct($"*")).cast(StringType).as("value"))
>
> val query = resDf.writeStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", kafkaBrokers.get)
>   .option("topic", kafkaOutputTopic.get)
>   .option("checkpointLocation", checkpointLocation.get)
>   .start()
>
> Error:
>
> When I submit this application, it fails with
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/t9734/gXWvM.png>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Twitter: https://twitter.com/holdenkarau
Books (Learning Spark, High Performance Spark, etc.):
https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
YouTube Live Streams: https://www.youtube.com/user/holdenkarau

Reply via email to