Hi,

I am trying to aggregate a key based on some timestamp, and I believe that
spilling to disk is changing the order of the data fed into the combiner.

I have some timeseries data that is of the form: ("key", "date", "other
data")

    Partition 1
    ("A", 2, ...)
    ("B", 4, ...)
    ("A", 1, ...)
    ("A", 3, ...)
    ("B", 6, ...)

which I then partition by key, then sort within the partition:

    Partition 1
    ("A", 1, ...)
    ("A", 2, ...)
    ("A", 3, ...)
    ("A", 4, ...)

    Partition 2
    ("B", 4, ...)
    ("B", 6, ...)

If I run a combineByKey with the same partitioner, then the items for each
key will be fed into the ExternalAppendOnlyMap in the correct order.
However, if I spill, then the time slices are spilled to disk as multiple
partial combiners. When its time to merge the spilled combiners for each
key, the combiners are combined in the wrong order.

For example, if during a groupByKey, [("A", 1, ...), ("A", 2...)] and
[("A", 3, ...), ("A", 4, ...)] are spilled separately, it's possible that
the combiners can be combined in the wrong order, like [("A", 3, ...),
("A", 4, ...), ("A", 1, ...), ("A", 2, ...)], which invalidates the
invariant that all the values for A are passed in order to the combiners.

I'm not an expert, but I suspect that this is because we use a heap ordered
by key when iterating, which doesn't retain the order the spilled
combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index),
where spill_index is incremented each time we spill? This would mean that
we would pop and merge the combiners of each key in order, resulting in
[("A", 1, ...), ("A", 2, ...), ("A", 3, ...), ("A", 4, ...)].

Thanks in advance for the help! If there is a way to do this already in
Spark 1.2, can someone point it out to me?

Best,

Justin

Reply via email to