Hi Piotr,

Since my current process function already works well for me, except for the
fact I don't have access to the mailbox executor, I have simply created a
custom operator for injecting that:

```
class MyOperator(myFunction: MyFunction)
  extends KeyedCoProcessOperator(myFunction)
{

  private lazy val mailboxExecutor = getContainingTask
    .getMailboxExecutorFactory
    .createExecutor(getOperatorConfig.getChainIndex)

  override def open(): Unit = {
    super.open()
    userFunction.asInstanceOf[MyFunction].mailboxExecutor = mailboxExecutor
  }
}
```

This way I can send mails just fine...in the main application I use like
this

```
.transform("wrapping my function with my operator", new MyOperator(new
MyFunction()))
```

So far everything looks good to me, but if you see problems or know a better
way, it would be great to hear your thoughts on this again. In particular,
the way of getting access to the mailbox executor is a bit clumsy...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to