Hi, In my Flink program, after a couple of map, union and connect, I have a final filter and a sink. Something like this (after abstracting out details):
val filteredEvents: DataStream[NotificationEvent] = allThisStuffWorking .name("filtered_users") filteredEvents *.filter(x => check(x.f1, x.f2, someStuff)) //BUG* .addSink(new NotificationSinkFunction(notifier)) .name("send_notification") The check function returns a Boolean and does not access anything other than parameters passed. Here is relevant part of Notification Sink Function: class NotificationSinkFunction(notifier: Notifier) extends SinkFunction[NotificationEvent] { val LOG: Logger = LoggerFactory.getLogger(this.getClass) def invoke(event: NotificationEvent): Unit = { LOG.info("Log this notification detail") *notifier.send(**event.f1, event.f2) //BUG* } } If I comment out the lines highlighted and marked with //BUG, the Flink pipeline works and print the log messages, and Flink shows this execution plan at the end: filtered_users -> Sink: send_notification [image: Inline image 1] But with either of those two lines marked as BUG above, Flink makes and executes plan only till filtered_user and does not print the log message. [image: Inline image 2] How can I figure out what is wrong with the check function or notifier send function that prevents Flink from making the full plan. What are the typical mistakes leading to this? Thanks, +satish