Hi mates, got some questions about using event time for the flink pipeline.

My pipeline consists of two connected streams, one is a stream with business 
rules and another one is a stream with user events. 

I’ve broadcasted stream with business rules and connected it to the stream of 
events, thus I can apply all existing rules to each event.
For those purposes I’ve implemented a KeyedBroadcastProcessFunction, that 
accumulates broadcast state and applies rules from it to each event.
In this function I would like to register event time timers.

I’ve specified a AssignerWithPeriodicWatermarks for the stream of events, that 
extracts event timestamp and uses it as a timestamp and watermark, but sill got 
no success, because the broadcasted stream doesn’t have such assigner and 
always returns Long.MIN as a watermark value, so flink uses the smallest 
watermark, received from both streams, so event time doesn’t updated.

How can I solve this problem and use timestamps from event stream as a pipeline 
event time ?
Here is the configuration of my pipeline.

val bSegments = env
  .addSource(rules)
  .broadcast(CustomerJourneyProcessor.RULES_STATE_DESCRIPTOR)

val keyedEvents = env
  .addSource(events)
  .assignTimestampsAndWatermarks(eventTimeAssigner)
  .keyBy { event => event.getId.getGid }

keyedEvents
  .connect(bSegments)
  .process(customerJourneyProcessor)
  .addSink(sink)

I’ve found a workaround, that works for me, but I’m not sure, that it’s a 
proper decision.

I can add a timestamp/ watermarks assigner to the stream of rules, that will 
always return System.currentTime(), thereby it always will be bigger than event 
timestamp, so, the KeyedBroadcastProcessFunction
will use events stream timestamp as a watermark.

class RuleTimestampAssigner extends 
AssignerWithPeriodicWatermarks[SegmentEvent] {

  override def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis())

  override def extractTimestamp(rule: Rule, previousElementTimestamp: Long): 
Long = rule.created
}

But it looks like a hack and maybe someone can give an advice with the more 
convenient approach.

Thx !

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever

Reply via email to