Hi all, I'm exploring Flink for our new project. Currently I'm playing with Session Windows with dynamic Gap. In short, I would like to be able to change the value of the gap on demand, for example on config update.
So I'm having this code: messageStream .keyBy(tradeKeySelector) .window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<EnrichedMessage>() { @Override public long extract(EnrichedMessage element) { * // Try to dynamically change the gap here // milliseconds. return 5000;* } })) .process(new CumulativeTransactionOperator()) .name("Aggregate Transaction Builder"); I would assume something like "broadcast pattern" here, although this is related to operators and we are interested with SessionWindowTimeGapExtractor here. Probably we will keep the gap size in a Flink State, not sure if it has to be keyed state or "operator state". Updates will come from external system. So I guess, what i need here is actually an operator that will implements SessionWindowTimeGapExtractor interface. Instance of this operator will keep/update the state based on Config updates and returns the gap size like SessionWindowTimeGapExtractor. Would it be a valid approach for this use case? Is it any other way to have such a config in Flink state? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/