Hi all,

I’m using org.apache.flink.statefun.flink.harness.Harness in some unit test 
code, where I control the sources so that they are finite.

This is similar to what I found at 
https://stackoverflow.com/questions/61939681/is-it-possible-to-write-a-unit-test-which-terminates-using-flink-statefun-harnes

The problem is that if I shut down all of the sources, it looks like StateFun 
then starts shutting down some resources prematurely, which results in this 
error:

Caused by: java.lang.IllegalStateException: Mailbox is in state CLOSED, but is 
required to be in state OPEN for put operations.
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkPutStateConditions(TaskMailboxImpl.java:256)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.put(TaskMailboxImpl.java:184)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.execute(MailboxExecutorImpl.java:73)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
        at 
org.apache.flink.streaming.api.operators.MailboxExecutor.execute(MailboxExecutor.java:98)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
        at 
org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade.execute(MailboxExecutorFacade.java:35)
 ~[statefun-flink-datastream-2.2.2.jar:2.2.2]
        at 
org.apache.flink.statefun.flink.core.feedback.FeedbackChannel$ConsumerTask.scheduleDrainAll(FeedbackChannel.java:114)
 ~[statefun-flink-datastream-2.2.2.jar:2.2.2]
        at 
org.apache.flink.statefun.flink.core.feedback.FeedbackChannel.put(FeedbackChannel.java:64)
 ~[statefun-flink-datastream-2.2.2.jar:2.2.2]
        at 
org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator.processElement(FeedbackSinkOperator.java:58)
 ~[statefun-flink-datastream-2.2.2.jar:2.2.2]
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
 ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574) 
~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752) 
~[flink-runtime_2.12-1.11.4.jar:1.11.4]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) 
~[flink-runtime_2.12-1.11.4.jar:1.11.4]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]

I believe the issue is that messages are still being generated but the target 
for the feedback channel has been terminated.

I currently work around this by turning off just the sources that would trigger 
new egress results, then I wait for the egresses to all be idle for > 2 
seconds, and then I shut down the rest of the sources.

But it feels fragile...

Thanks,

— Ken

PS - using StateFun 2.2.2

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to