Hi,

I'm struggling a little with some unintuitive behavior with the Scala API.
(Spark 2.0.2)

I wrote something like

df.orderBy("a", "b")
  .groupBy("group_id")
  .agg(sum("col_to_sum").as("total"),
       last("row_id").as("last_row_id")))

and expected a result with a unique group_id column, a column called
"total" that's the sum of all col_to_sum in each group, and a column called
"last_row_id" that's the last row_id seen in each group when the groups are
sorted by columns a and b.

However, the result is actually non-deterministic and changes based on the
initial sorting and partitioning of df.

I also tried

df.orderBy("group_id", "a", "b")
  .groupBy("group_id")
  .agg(sum("col_to_sum").as("total"),
       last("row_id").as("last_row_id")))

thinking the problem might be that the groupBy does another shuffle that
loses the ordering, but that also doesn't seem to work.

Looking through the code
<https://github.com/apache/spark/blob/branch-2.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala>,
both the Last and First aggregate functions have this comment:

Even if [[Last]] is used on an already sorted column, if
we do partial aggregation and final aggregation
(when mergeExpression
is used) its result will not be deterministic
(unless the input table is sorted and has
a single partition, and we use a single reducer to do the aggregation.).


Some questions:

   1. What's the best way to take some values from the last row in an
   ordered group while performing some other aggregates over the entire group?

   2. Given these comments on last and first, when would these functions be
   useful? It would be rare to bring an entire Spark table to a single
   partition.

Thanks!

Reply via email to