Hi Andrew, Thanks for your response! For our use case, we aren't actually grouping, but rather updating running aggregates. I just picked grouping because it made the example easier to write out. However, when we merge combiners, the combiners have to have data that are adjacent to each other in the original partition.
I feel that requiring groupByKey/cogroup to insert values into the correct place is quite expensive, and may not be possible for combiners that are trying to collapse down the data while assuming order. Would it be really expensive or perilous to the API if we just had combineByKey merge combiners in the same order as the data slices they represent? I have a very simple prototype that adds this additional semantic (it's only a 16 line diff): https://github.com/justinuang/spark/commit/b92ee6a6dbf70207eca68296289cb62c3cea76b8 It looks like the additional comparison is trivial to runtime, and this doesn't break any backcompat. Thanks, Justin On Tue, Jan 20, 2015 at 8:03 PM, Andrew Or <and...@databricks.com> wrote: > Hi Justin, > > I believe the intended semantics of groupByKey or cogroup is that the > ordering *within a key *is not preserved if you spill. In fact, the test > cases for the ExternalAppendOnlyMap only assert that the Set representation > of the results is as expected (see this line > <https://github.com/apache/spark/blob/9d9294aebf7208a76f43d8fc5a0489a83d7215f4/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala#L265>). > This is because these Spark primitives literally just group the values by a > key but does not provide any ordering guarantees. > > However, if ordering within a key is a requirement for your application, > then you may need to write your own PairRDDFunction that calls > combineByKey. You can model your method after groupByKey, but change the > combiner function slightly to take ordering into account. This may add some > overhead to your application since you need to insert every value in the > appropriate place, but since you're spilling anyway the overhead will > likely be shadowed by disk I/O. > > Let me know if that works. > -Andrew > > > 2015-01-20 9:18 GMT-08:00 Justin Uang <justin.u...@gmail.com>: > > Hi, >> >> I am trying to aggregate a key based on some timestamp, and I believe >> that spilling to disk is changing the order of the data fed into the >> combiner. >> >> I have some timeseries data that is of the form: ("key", "date", "other >> data") >> >> Partition 1 >> ("A", 2, ...) >> ("B", 4, ...) >> ("A", 1, ...) >> ("A", 3, ...) >> ("B", 6, ...) >> >> which I then partition by key, then sort within the partition: >> >> Partition 1 >> ("A", 1, ...) >> ("A", 2, ...) >> ("A", 3, ...) >> ("A", 4, ...) >> >> Partition 2 >> ("B", 4, ...) >> ("B", 6, ...) >> >> If I run a combineByKey with the same partitioner, then the items for >> each key will be fed into the ExternalAppendOnlyMap in the correct order. >> However, if I spill, then the time slices are spilled to disk as multiple >> partial combiners. When its time to merge the spilled combiners for each >> key, the combiners are combined in the wrong order. >> >> For example, if during a groupByKey, [("A", 1, ...), ("A", 2...)] and >> [("A", 3, ...), ("A", 4, ...)] are spilled separately, it's possible that >> the combiners can be combined in the wrong order, like [("A", 3, ...), >> ("A", 4, ...), ("A", 1, ...), ("A", 2, ...)], which invalidates the >> invariant that all the values for A are passed in order to the combiners. >> >> I'm not an expert, but I suspect that this is because we use a heap >> ordered by key when iterating, which doesn't retain the order the spilled >> combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index), >> where spill_index is incremented each time we spill? This would mean that >> we would pop and merge the combiners of each key in order, resulting in >> [("A", 1, ...), ("A", 2, ...), ("A", 3, ...), ("A", 4, ...)]. >> >> Thanks in advance for the help! If there is a way to do this already in >> Spark 1.2, can someone point it out to me? >> >> Best, >> >> Justin >> > >