Thanks Konstantin !

> On 28 Feb 2019, at 02:33, Konstantin Knauf <konstan...@ververica.com> wrote:
> 
> HI Rinat, 
> 
> to my knowledge your workaround is fine & necessary. You can also emit a 
> Long.MAX_VALUE instead of the processing time to save some CPU cycles.
> 
> Cheers,
> 
> Konstantin
> 
> 
> 
> On Wed, Feb 27, 2019 at 9:32 PM Rinat <r.shari...@cleverdata.ru 
> <mailto:r.shari...@cleverdata.ru>> wrote:
> Hi mates, got some questions about using event time for the flink pipeline.
> 
> My pipeline consists of two connected streams, one is a stream with business 
> rules and another one is a stream with user events. 
> 
> I’ve broadcasted stream with business rules and connected it to the stream of 
> events, thus I can apply all existing rules to each event.
> For those purposes I’ve implemented a KeyedBroadcastProcessFunction, that 
> accumulates broadcast state and applies rules from it to each event.
> In this function I would like to register event time timers.
> 
> I’ve specified a AssignerWithPeriodicWatermarks for the stream of events, 
> that extracts event timestamp and uses it as a timestamp and watermark, but 
> sill got no success, because the broadcasted stream doesn’t have such 
> assigner and always returns Long.MIN as a watermark value, so flink uses the 
> smallest watermark, received from both streams, so event time doesn’t updated.
> 
> How can I solve this problem and use timestamps from event stream as a 
> pipeline event time ?
> Here is the configuration of my pipeline.
> 
> val bSegments = env
>   .addSource(rules)
>   .broadcast(CustomerJourneyProcessor.RULES_STATE_DESCRIPTOR)
> 
> val keyedEvents = env
>   .addSource(events)
>   .assignTimestampsAndWatermarks(eventTimeAssigner)
>   .keyBy { event => event.getId.getGid }
> 
> keyedEvents
>   .connect(bSegments)
>   .process(customerJourneyProcessor)
>   .addSink(sink)
> 
> I’ve found a workaround, that works for me, but I’m not sure, that it’s a 
> proper decision.
> 
> I can add a timestamp/ watermarks assigner to the stream of rules, that will 
> always return System.currentTime(), thereby it always will be bigger than 
> event timestamp, so, the KeyedBroadcastProcessFunction
> will use events stream timestamp as a watermark.
> 
> class RuleTimestampAssigner extends 
> AssignerWithPeriodicWatermarks[SegmentEvent] {
> 
>   override def getCurrentWatermark: Watermark = new 
> Watermark(System.currentTimeMillis())
> 
>   override def extractTimestamp(rule: Rule, previousElementTimestamp: Long): 
> Long = rule.created
> }
> 
> But it looks like a hack and maybe someone can give an advice with the more 
> convenient approach.
> 
> Thx !
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 
> 
> 
> -- 
> Konstantin Knauf | Solutions Architect
> +49 160 91394525
>  <https://www.ververica.com/>
> Follow us @VervericaData
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever

Reply via email to