I am afraid it is not possible to leverage the sorting for business logic. The sorting is applied on binary representation of the key as it is not necessary sorting per se, but rather grouping by the same keys. You can find more information in the FLIP of this feature e.g. here[1]
Best, Dawid [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Howtosort/groupkeys On 21/05/2021 09:58, Marco Villalobos wrote: > Hello. I am using Flink 1.12.1 in EMR. > > I am processing historical time-series data with the DataStream > API in Batch execution mode. > > I must average time series data into a fifteen minute interval > and forward fill missing values. > > For example, this input: > > name, timestamp, value > a,2019-06-23T00:07:30Z,10 > b,2019-06-23T00:05:30Z,7 > a,2019-06-23T00:09:30Z,10 > a,2019-06-23T00:37:30Z,10 > > would yield this output: > > name, timestamp, value, is_forward_fill > a,2019-06-23T00:15:00Z,20,false > b,2019-06-23T00:15:00Z,7,false > a,2019-06-23T00:30:00Z,20,true > b,2019-06-23T00:30:00Z,7,true > a,2019-06-23T00:45:00Z,5,false > b,2019-06-23T00:30:00Z,7,true > > My stream code looks something like this > > STREAM PSEUDO CODE > > stream<TimeSeries>.keyBy(Tuple with step,name) > .window(TumblingEventTimeWindows.of(Time.minutes(15))) > .aggregate(new AggregateFunction(), new > AggregateProcessWindowFunction()) > .name("aggregate") > .keyBy(Tuple with name) > .process(new FillKeyedProcessFunction()) > .name("fill"); > > The documentation > (https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html>) > suggests that the stream might be sorted by key. > > If that's true, my fill function can be greatly simplified if I were > able to leverage that somehow. > > I tried implementing a custom pojo key for the fill function like this: > > public class FillKey implement Comparable<FillKey>, Serializable { > String name; > Instant timestamp; > equals // only checks name > hashcode // only hash name > compareTo // compares name, timestamp > } > > Notice that my key only checks equality on the name, and hashes only > the name, but when > it performs comparisons it orders by name, timestamp. > > My stream now looks like this: > > stream<TimeSeries>.keyBy(Tuple with step,name) > .window(TumblingEventTimeWindows.of(Time.minutes(15))) > .aggregate(new AggregateFunction(), new > AggregateProcessWindowFunction()) > .name("aggregate") > .keyBy(new KeySelector<TimeSeries, FillKey>() { > @Override > public FillKey getKey(TimeSeries value) throws Exception { > return new FillKey(value.name <http://value.name>, > value.timestamp); > } > }).process(new FillKeyedProcessFunction()) > .name("fill"); > > and it seemed to arrive sorted, but I am getting the woong output > because my keyed state no longer seems to work. I expected the stream > to arrive in the order of name, step ascending. However, keyed state > behaved as though each element that I thought would share the name was > different. > > Is there an issue with Pojo Keys that break the keyed state in batch > execution mode? > Is it possible to take advantage of the sort order within the business > logic as I am trying to do?
OpenPGP_signature
Description: OpenPGP digital signature