Hello. I am using Flink 1.12.1 in EMR.

I am processing historical time-series data with the DataStream
API in Batch execution mode.

I must average time series data into a fifteen minute interval
and forward fill missing values.

For example, this input:

name, timestamp, value
a,2019-06-23T00:07:30Z,10
b,2019-06-23T00:05:30Z,7
a,2019-06-23T00:09:30Z,10
a,2019-06-23T00:37:30Z,10

would yield this output:

name, timestamp, value, is_forward_fill
a,2019-06-23T00:15:00Z,20,false
b,2019-06-23T00:15:00Z,7,false
a,2019-06-23T00:30:00Z,20,true
b,2019-06-23T00:30:00Z,7,true
a,2019-06-23T00:45:00Z,5,false
b,2019-06-23T00:30:00Z,7,true

My stream code looks something like this

STREAM PSEUDO CODE

stream<TimeSeries>.keyBy(Tuple with step,name)
    .window(TumblingEventTimeWindows.of(Time.minutes(15)))
    .aggregate(new AggregateFunction(), new
AggregateProcessWindowFunction())
    .name("aggregate")
    .keyBy(Tuple with name)
    .process(new FillKeyedProcessFunction())
    .name("fill");

The documentation (
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
)
suggests that the stream might be sorted by key.

If that's true, my fill function can be greatly simplified if I were able
to leverage that somehow.

I tried implementing a custom pojo key for the fill function like this:

public class FillKey implement Comparable<FillKey>, Serializable  {
    String name;
    Instant timestamp;
    equals // only checks name
    hashcode // only hash name
    compareTo // compares name, timestamp
}

Notice that my key only checks equality on the name, and hashes only the
name, but when
it performs comparisons it orders by name, timestamp.

My stream now looks like this:

stream<TimeSeries>.keyBy(Tuple with step,name)
    .window(TumblingEventTimeWindows.of(Time.minutes(15)))
    .aggregate(new AggregateFunction(), new
AggregateProcessWindowFunction())
    .name("aggregate")
    .keyBy(new KeySelector<TimeSeries, FillKey>() {
        @Override
        public FillKey getKey(TimeSeries value) throws Exception {
            return new FillKey(value.name, value.timestamp);
        }
    }).process(new FillKeyedProcessFunction())
    .name("fill");

and it seemed to arrive sorted, but I am getting the woong output because
my keyed state no longer seems to work. I expected the stream to arrive in
the order of name, step ascending. However, keyed state behaved as though
each element that I thought would share the name was different.

Is there an issue with Pojo Keys that break the keyed state in batch
execution mode?
Is it possible to take advantage of the sort order within the business
logic as I am trying to do?

Reply via email to