HI Rinat,

to my knowledge your workaround is fine & necessary. You can also emit a
Long.MAX_VALUE instead of the processing time to save some CPU cycles.

Cheers,

Konstantin



On Wed, Feb 27, 2019 at 9:32 PM Rinat <r.shari...@cleverdata.ru> wrote:

> Hi mates, got some questions about using event time for the flink pipeline.
>
> My pipeline consists of two connected streams, one is a stream with
> business rules and another one is a stream with user events.
>
> I’ve broadcasted stream with business rules and connected it to the stream
> of events, thus I can apply all existing rules to each event.
> For those purposes I’ve implemented a KeyedBroadcastProcessFunction, that
> accumulates broadcast state and applies rules from it to each event.
> In this function I would like to register event time timers.
>
> I’ve specified a AssignerWithPeriodicWatermarks for the stream of events,
> that extracts event timestamp and uses it as a timestamp and watermark, but
> sill got no success, because the broadcasted stream doesn’t have such
> assigner and always returns Long.MIN as a watermark value, so flink uses
> the smallest watermark, received from both streams, so event time doesn’t
> updated.
>
> How can I solve this problem and use timestamps from event stream as a
> pipeline event time ?
> Here is the configuration of my pipeline.
>
>
>
>
>
>
>
>
>
>
>
>
>
> *val bSegments = env  .addSource(rules)  
> .broadcast(CustomerJourneyProcessor.RULES_STATE_DESCRIPTOR)val keyedEvents = 
> env  .addSource(events)  .assignTimestampsAndWatermarks(eventTimeAssigner)  
> .keyBy { event => event.getId.getGid }keyedEvents  .connect(bSegments)  
> .process(customerJourneyProcessor)  .addSink(sink)*
>
>
> I’ve found a workaround, that works for me, but I’m not sure, that it’s a
> proper decision.
>
> I can add a timestamp/ watermarks assigner to the stream of rules, that
> will always return *System.currentTime()*, thereby it always will be
> bigger than event timestamp, so, the KeyedBroadcastProcessFunction
> will use events stream timestamp as a watermark.
>
>
>
>
>
>
> *class RuleTimestampAssigner extends 
> AssignerWithPeriodicWatermarks[SegmentEvent] {  override def 
> getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())  
> override def extractTimestamp(rule: Rule, previousElementTimestamp: Long): 
> Long = rule.created}*
>
>
> But it looks like a hack and maybe someone can give an advice with the
> more convenient approach.
>
> Thx !
>
> Sincerely yours,
> *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>
> email: r.shari...@cleverdata.ru <a.totma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
>
> CleverDATA
> make your data clever
>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply via email to