Hi mates, got some questions about using event time for the flink pipeline.
My pipeline consists of two connected streams, one is a stream with business rules and another one is a stream with user events. I’ve broadcasted stream with business rules and connected it to the stream of events, thus I can apply all existing rules to each event. For those purposes I’ve implemented a KeyedBroadcastProcessFunction, that accumulates broadcast state and applies rules from it to each event. In this function I would like to register event time timers. I’ve specified a AssignerWithPeriodicWatermarks for the stream of events, that extracts event timestamp and uses it as a timestamp and watermark, but sill got no success, because the broadcasted stream doesn’t have such assigner and always returns Long.MIN as a watermark value, so flink uses the smallest watermark, received from both streams, so event time doesn’t updated. How can I solve this problem and use timestamps from event stream as a pipeline event time ? Here is the configuration of my pipeline. val bSegments = env .addSource(rules) .broadcast(CustomerJourneyProcessor.RULES_STATE_DESCRIPTOR) val keyedEvents = env .addSource(events) .assignTimestampsAndWatermarks(eventTimeAssigner) .keyBy { event => event.getId.getGid } keyedEvents .connect(bSegments) .process(customerJourneyProcessor) .addSink(sink) I’ve found a workaround, that works for me, but I’m not sure, that it’s a proper decision. I can add a timestamp/ watermarks assigner to the stream of rules, that will always return System.currentTime(), thereby it always will be bigger than event timestamp, so, the KeyedBroadcastProcessFunction will use events stream timestamp as a watermark. class RuleTimestampAssigner extends AssignerWithPeriodicWatermarks[SegmentEvent] { override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis()) override def extractTimestamp(rule: Rule, previousElementTimestamp: Long): Long = rule.created } But it looks like a hack and maybe someone can give an advice with the more convenient approach. Thx ! Sincerely yours, Rinat Sharipov Software Engineer at 1DMP CORE Team email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru> mobile: +7 (925) 416-37-26 CleverDATA make your data clever