Thanks Konstantin ! > On 28 Feb 2019, at 02:33, Konstantin Knauf <konstan...@ververica.com> wrote: > > HI Rinat, > > to my knowledge your workaround is fine & necessary. You can also emit a > Long.MAX_VALUE instead of the processing time to save some CPU cycles. > > Cheers, > > Konstantin > > > > On Wed, Feb 27, 2019 at 9:32 PM Rinat <r.shari...@cleverdata.ru > <mailto:r.shari...@cleverdata.ru>> wrote: > Hi mates, got some questions about using event time for the flink pipeline. > > My pipeline consists of two connected streams, one is a stream with business > rules and another one is a stream with user events. > > I’ve broadcasted stream with business rules and connected it to the stream of > events, thus I can apply all existing rules to each event. > For those purposes I’ve implemented a KeyedBroadcastProcessFunction, that > accumulates broadcast state and applies rules from it to each event. > In this function I would like to register event time timers. > > I’ve specified a AssignerWithPeriodicWatermarks for the stream of events, > that extracts event timestamp and uses it as a timestamp and watermark, but > sill got no success, because the broadcasted stream doesn’t have such > assigner and always returns Long.MIN as a watermark value, so flink uses the > smallest watermark, received from both streams, so event time doesn’t updated. > > How can I solve this problem and use timestamps from event stream as a > pipeline event time ? > Here is the configuration of my pipeline. > > val bSegments = env > .addSource(rules) > .broadcast(CustomerJourneyProcessor.RULES_STATE_DESCRIPTOR) > > val keyedEvents = env > .addSource(events) > .assignTimestampsAndWatermarks(eventTimeAssigner) > .keyBy { event => event.getId.getGid } > > keyedEvents > .connect(bSegments) > .process(customerJourneyProcessor) > .addSink(sink) > > I’ve found a workaround, that works for me, but I’m not sure, that it’s a > proper decision. > > I can add a timestamp/ watermarks assigner to the stream of rules, that will > always return System.currentTime(), thereby it always will be bigger than > event timestamp, so, the KeyedBroadcastProcessFunction > will use events stream timestamp as a watermark. > > class RuleTimestampAssigner extends > AssignerWithPeriodicWatermarks[SegmentEvent] { > > override def getCurrentWatermark: Watermark = new > Watermark(System.currentTimeMillis()) > > override def extractTimestamp(rule: Rule, previousElementTimestamp: Long): > Long = rule.created > } > > But it looks like a hack and maybe someone can give an advice with the more > convenient approach. > > Thx ! > > Sincerely yours, > Rinat Sharipov > Software Engineer at 1DMP CORE Team > > email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru> > mobile: +7 (925) 416-37-26 > > CleverDATA > make your data clever > > > > -- > Konstantin Knauf | Solutions Architect > +49 160 91394525 > <https://www.ververica.com/> > Follow us @VervericaData > -- > Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
Sincerely yours, Rinat Sharipov Software Engineer at 1DMP CORE Team email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru> mobile: +7 (925) 416-37-26 CleverDATA make your data clever