Hi,

In my Flink program, after a couple of map, union and connect, I have a
final filter and a sink. Something like this (after abstracting out
details):

val filteredEvents: DataStream[NotificationEvent]
  = allThisStuffWorking
      .name("filtered_users")

filteredEvents
  *.filter(x => check(x.f1, x.f2, someStuff)) //BUG*
  .addSink(new NotificationSinkFunction(notifier))
  .name("send_notification")

The check function returns a Boolean and does not access anything other
than parameters passed. Here is relevant part of Notification Sink Function:

class NotificationSinkFunction(notifier: Notifier)
      extends SinkFunction[NotificationEvent] {

  val LOG: Logger = LoggerFactory.getLogger(this.getClass)

  def invoke(event: NotificationEvent): Unit = {
    LOG.info("Log this notification detail")
    *notifier.send(**event.f1, event.f2) //BUG*
  }
}

If I comment out the lines highlighted and marked with //BUG, the Flink
pipeline works and print the log messages, and Flink shows this execution
plan at the end:

filtered_users -> Sink: send_notification

[image: Inline image 1]


But with either of those two lines marked as BUG above, Flink makes and
executes plan only till filtered_user and does not print the log message.

[image: Inline image 2]

How can I figure out what is wrong with the check function or notifier send
function that prevents Flink from making the full plan. What are the
typical mistakes leading to this?

Thanks,
+satish

Reply via email to