Hi,
are we talking about the Plan View in the JobManager dashboard? If yes,
then I expect there to be only one "box" for the combination of filter and
sink because they are chained together to avoid sending data.

For debugging, could you maybe change check() to always return true and see
if you then yet your messages from the sink?

Cheers,
Aljoscha

On Sat, 15 Oct 2016 at 05:26 Satish Chandra Gupta <scgupt...@gmail.com>
wrote:

> Hi,
>
> In my Flink program, after a couple of map, union and connect, I have a
> final filter and a sink. Something like this (after abstracting out
> details):
>
> val filteredEvents: DataStream[NotificationEvent]
>   = allThisStuffWorking
>       .name("filtered_users")
>
> filteredEvents
>   *.filter(x => check(x.f1, x.f2, someStuff)) //BUG*
>   .addSink(new NotificationSinkFunction(notifier))
>   .name("send_notification")
>
> The check function returns a Boolean and does not access anything other
> than parameters passed. Here is relevant part of Notification Sink Function:
>
> class NotificationSinkFunction(notifier: Notifier)
>       extends SinkFunction[NotificationEvent] {
>
>   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
>
>   def invoke(event: NotificationEvent): Unit = {
>     LOG.info("Log this notification detail")
>     *notifier.send(**event.f1, event.f2) //BUG*
>   }
> }
>
> If I comment out the lines highlighted and marked with //BUG, the Flink
> pipeline works and print the log messages, and Flink shows this execution
> plan at the end:
>
> filtered_users -> Sink: send_notification
>
> [image: Inline image 1]
>
>
> But with either of those two lines marked as BUG above, Flink makes and
> executes plan only till filtered_user and does not print the log message.
>
> [image: Inline image 2]
>
> How can I figure out what is wrong with the check function or notifier
> send function that prevents Flink from making the full plan. What are the
> typical mistakes leading to this?
>
> Thanks,
> +satish
>

Reply via email to