HI Rinat, to my knowledge your workaround is fine & necessary. You can also emit a Long.MAX_VALUE instead of the processing time to save some CPU cycles.
Cheers, Konstantin On Wed, Feb 27, 2019 at 9:32 PM Rinat <r.shari...@cleverdata.ru> wrote: > Hi mates, got some questions about using event time for the flink pipeline. > > My pipeline consists of two connected streams, one is a stream with > business rules and another one is a stream with user events. > > I’ve broadcasted stream with business rules and connected it to the stream > of events, thus I can apply all existing rules to each event. > For those purposes I’ve implemented a KeyedBroadcastProcessFunction, that > accumulates broadcast state and applies rules from it to each event. > In this function I would like to register event time timers. > > I’ve specified a AssignerWithPeriodicWatermarks for the stream of events, > that extracts event timestamp and uses it as a timestamp and watermark, but > sill got no success, because the broadcasted stream doesn’t have such > assigner and always returns Long.MIN as a watermark value, so flink uses > the smallest watermark, received from both streams, so event time doesn’t > updated. > > How can I solve this problem and use timestamps from event stream as a > pipeline event time ? > Here is the configuration of my pipeline. > > > > > > > > > > > > > > *val bSegments = env .addSource(rules) > .broadcast(CustomerJourneyProcessor.RULES_STATE_DESCRIPTOR)val keyedEvents = > env .addSource(events) .assignTimestampsAndWatermarks(eventTimeAssigner) > .keyBy { event => event.getId.getGid }keyedEvents .connect(bSegments) > .process(customerJourneyProcessor) .addSink(sink)* > > > I’ve found a workaround, that works for me, but I’m not sure, that it’s a > proper decision. > > I can add a timestamp/ watermarks assigner to the stream of rules, that > will always return *System.currentTime()*, thereby it always will be > bigger than event timestamp, so, the KeyedBroadcastProcessFunction > will use events stream timestamp as a watermark. > > > > > > > *class RuleTimestampAssigner extends > AssignerWithPeriodicWatermarks[SegmentEvent] { override def > getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis()) > override def extractTimestamp(rule: Rule, previousElementTimestamp: Long): > Long = rule.created}* > > > But it looks like a hack and maybe someone can give an advice with the > more convenient approach. > > Thx ! > > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: r.shari...@cleverdata.ru <a.totma...@cleverdata.ru> > mobile: +7 (925) 416-37-26 > > CleverDATA > make your data clever > > -- Konstantin Knauf | Solutions Architect +49 160 91394525 <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen