You could implement your own based sorting algorithm using the low level processor api, i.e, you have a processor that keeps a sorted list of records and then, periodically, perhaps on punctuate, it emits the sorted messages downstream. You could do something like:
builder.stream("topic").transform(new TransformerSupplier() { @Override public Transformer get() { return new TheTransformer(); } }).groupByKey().reduce(..); Where the TheTransformer might look something like: private static class TheTransformer<K, V, R> implements Transformer<K, V, R> { private ProcessorContext context; private TreeMap<K, V> sorted = new TreeMap<>(); @Override public void init(final ProcessorContext context) { this.context = context; context.schedule(1000); // punctuate every 1 second of streams-time } @Override public R transform(final K key, final V value) { // do stuff sorted.put(key, value); } @Override public R punctuate(final long timestamp) { for (final Map.Entry<K, V> kvEntry : sorted.entrySet()) { context.forward(kvEntry.getKey(), kvEntry.getValue()); } sorted.clear(); return null; } @Override public void close() { } } On Wed, 1 Mar 2017 at 13:04 Ofir Sharony <ofir.shar...@myheritage.com> wrote: > Is there any way to sort grouped records before sending them to the > reducer? > > *Ofir Sharony* > BackEnd Tech Lead > > Mobile: +972-54-7560277 <+972%2054-756-0277> | ofir.shar...@myheritage.com > | www.myheritage.com > MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel > > <http://www.myheritage.com/> > > <https://www.facebook.com/myheritage> > <https://twitter.com/myheritage> <http://blog.myheritage.com/> > <https://www.youtube.com/user/MyHeritageLtd> > > > On Wed, Mar 1, 2017 at 3:03 PM, Damian Guy <damian....@gmail.com> wrote: > > > Hi, > > > > The TimestampExtractor won't effect the order the records arrive in. It > > just provides a way for developers to use a timestamp other than the > > default. > > > > Thanks, > > Damian > > > > On Wed, 1 Mar 2017 at 12:34 Ofir Sharony <ofir.shar...@myheritage.com> > > wrote: > > > > > Hi, > > > > > > I have the following code on a stream: > > > > > > .selectKey(...) > > > .groupByKey(...) > > > .reduce(...) > > > > > > The records arrived to the Reducer function in the same order they were > > > consumed from Kafka > > > I have implemented a TimestampExtractor, extracting the wanted > timestamp > > > from each record, but unfortunately this didn't have any effect on the > > > order the messages were received in the Reducer. > > > > > > Any thoughts on that? > > > Thanks, > > > > > > *Ofir Sharony* > > > BackEnd Tech Lead > > > > > > Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277> > <+972%2054-756-0277> | > > > ofir.shar...@myheritage.com > > > | www.myheritage.com > > > MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel > > > > > > <http://www.myheritage.com/> > > > > > > <https://www.facebook.com/myheritage> > > > <https://twitter.com/myheritage> <http://blog.myheritage.com/> > > > <https://www.youtube.com/user/MyHeritageLtd> > > > > > >