Hi all,
I'm exploring Flink for our new project.
Currently I'm playing with Session Windows with dynamic Gap. In short, I
would like to be able to change the value of the gap on demand, for example
on config update.
So I'm having this code:
messageStream
.keyBy(tradeKeySelector)
.window(ProcessingTimeSessionWindows.withDynamicGap(new
SessionWindowTimeGapExtractor<EnrichedMessage>() {
@Override
public long extract(EnrichedMessage element) {
* // Try to dynamically change the gap here
// milliseconds.
return 5000;*
}
}))
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");
I would assume something like "broadcast pattern" here, although this is
related to operators and we are interested with
SessionWindowTimeGapExtractor here.
Probably we will keep the gap size in a Flink State, not sure if it has to
be keyed state or "operator state". Updates will come from external system.
So I guess, what i need here is actually an operator that will implements
SessionWindowTimeGapExtractor interface. Instance of this operator will
keep/update the state based on Config updates and returns the gap size like
SessionWindowTimeGapExtractor.
Would it be a valid approach for this use case? Is it any other way to have
such a config in Flink state?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/