Hi, I'm working on a batch job (roughly 10 billion records of input, 10 million groups) that is essentially a 'fold' over each group, that is, I have a function
AggregateT addToAggrate(AggregateT agg, RecordT record) {...} and want to fold this over each group in my DataSet. My understanding is that I cannot use .groupBy(0).reduce(...) since the ReduceFunction only supports the case where AggregateT is the same as RecordT. A simple solution using .reduceGroup(...) works, but spills all input data in the reduce step, which produces a lot of slow & expensive Disk IO. Therefore, we tried using .combineGroup(...).reduceGroup(...), but experienced a similar amount of spilling. Checking the source of the *Combine drivers, it seems that they accumulate events in a buffer, sort the buffer by key, and combine adjacent records in the same group. This does not work in my case due to the large number of groups - the records in the buffer are most likely to all belong to different groups. The "combine" phase therefore becomes a noop turning a single RecordT into an AggregateT, and the reduce phase has 10 billion AggregateTs to combine. Is there a way of modelling this computation efficiently with the DataSet API? Alternatively, can I turn this into a DataStream job? (The implementation there would simply be a MapFunction on a KeyedStream with the AggregateT residing in keyed state, although I don't know how I would emit this state at the end of the data stream only.) Thanks, Urs -- Urs Schönenberger TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082