Hi all, I’m using org.apache.flink.statefun.flink.harness.Harness in some unit test code, where I control the sources so that they are finite.
This is similar to what I found at https://stackoverflow.com/questions/61939681/is-it-possible-to-write-a-unit-test-which-terminates-using-flink-statefun-harnes The problem is that if I shut down all of the sources, it looks like StateFun then starts shutting down some resources prematurely, which results in this error: Caused by: java.lang.IllegalStateException: Mailbox is in state CLOSED, but is required to be in state OPEN for put operations. at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkPutStateConditions(TaskMailboxImpl.java:256) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4] at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.put(TaskMailboxImpl.java:184) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.execute(MailboxExecutorImpl.java:73) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4] at org.apache.flink.streaming.api.operators.MailboxExecutor.execute(MailboxExecutor.java:98) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4] at org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade.execute(MailboxExecutorFacade.java:35) ~[statefun-flink-datastream-2.2.2.jar:2.2.2] at org.apache.flink.statefun.flink.core.feedback.FeedbackChannel$ConsumerTask.scheduleDrainAll(FeedbackChannel.java:114) ~[statefun-flink-datastream-2.2.2.jar:2.2.2] at org.apache.flink.statefun.flink.core.feedback.FeedbackChannel.put(FeedbackChannel.java:64) ~[statefun-flink-datastream-2.2.2.jar:2.2.2] at org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator.processElement(FeedbackSinkOperator.java:58) ~[statefun-flink-datastream-2.2.2.jar:2.2.2] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752) ~[flink-runtime_2.12-1.11.4.jar:1.11.4] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) ~[flink-runtime_2.12-1.11.4.jar:1.11.4] at java.lang.Thread.run(Thread.java:834) ~[?:?] I believe the issue is that messages are still being generated but the target for the feedback channel has been terminated. I currently work around this by turning off just the sources that would trigger new egress results, then I wait for the egresses to all be idle for > 2 seconds, and then I shut down the rest of the sources. But it feels fragile... Thanks, — Ken PS - using StateFun 2.2.2 -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch