Hi Piotr, Since my current process function already works well for me, except for the fact I don't have access to the mailbox executor, I have simply created a custom operator for injecting that:
``` class MyOperator(myFunction: MyFunction) extends KeyedCoProcessOperator(myFunction) { private lazy val mailboxExecutor = getContainingTask .getMailboxExecutorFactory .createExecutor(getOperatorConfig.getChainIndex) override def open(): Unit = { super.open() userFunction.asInstanceOf[MyFunction].mailboxExecutor = mailboxExecutor } } ``` This way I can send mails just fine...in the main application I use like this ``` .transform("wrapping my function with my operator", new MyOperator(new MyFunction())) ``` So far everything looks good to me, but if you see problems or know a better way, it would be great to hear your thoughts on this again. In particular, the way of getting access to the mailbox executor is a bit clumsy... -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/