Hello Hady,
Glad to see that you are testing StateFun!

Regarding that exception, I think that this is not the root cause. The root
cause is as you wrote that the StateFun job failed because it wasn't able
to deliver a message to a remote function in the given time frame.
If you look at the logs you most likely see a
StateFulFunctionInvocationException.

However the Flink Job should recover if checkpointing is enabled or a
restart strategy is set in your flink-conf.yaml.
You can refer the default flink-conf.yaml[1] that we ship in the
official Docker image[2]

One more thing that I'd like to point out is that if the remote functions
are not able to keep up with the stream of request,
you have few strategies to deal with that:

* increase the total call timeout per invocation [3]
* reduce the maxNumberOfBatch requests to a lower value [4]
* reduce the total number of asynchronous in-flight requests: set the
following key in your flink-conf.yaml to something lower than 1024. Try and
experiment with different values.
 statefun.async.max-per-task: 256 # for example 256, 128,...
* enable autoscaling of your remote containers (if your
infrastructure allows)

[1]
https://github.com/apache/flink-statefun/blob/56c2f036a6831af885eb15539bc7962bb730b060/tools/docker/flink-distribution-template/conf/flink-conf.yaml
[2] https://hub.docker.com/r/apache/flink-statefun
[3]
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/http-endpoint/#asynchronous-http-transport-beta
[4]
https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/resources/remote-module/module.yaml#L20

Good luck,
Igal.

On Wed, Nov 3, 2021 at 10:59 AM Hady Januar Willi <hady.wi...@gojek.com>
wrote:

> Hi everyone,
>
> When testing Flink statefun, the job eventually throws the following
> exception after failing to reach the endpoint or if the endpoint fails
> after the exponentially increasing delay.
>
> java.util.concurrent.RejectedExecutionException:
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox$MailboxClosedException:
> Mailbox is in state CLOSED, but is required to be in state OPEN for put
> operations.
>
> How do I recover from this state?
>
> Thank you.
>
> Regards,
> Hady
>

Reply via email to