Hi,
we are migrating some jobs from Dstream to Structured Stream.

Currently to handle aggregations we call map and reducebyKey on each RDD
like
rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))

The final output of each RDD is merged to the sink with support for
aggregation at the sink( Like co-processor at HBase ).

In the new DataSet API, I am not finding any suitable API to aggregate over
the micro-batch.
Most of the aggregation API uses state-store and provide global
aggregations. ( with append mode it does not give the change in existing
buckets )
Problems we are suspecting are :
 1) state-store is tightly linked to the job definitions. while in our case
we want may edit the job while keeping the older calculated aggregate as it
is.

The desired result can be achieved with below dataset APIs.
dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr))
while on observing the physical plan it does not call any merge before sort.

 Anyone aware of API or other workarounds to get the desired result?

Reply via email to