Not sure what version you are using, but it say `Thrwoable` in `trunk`
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L577
-Matthias
On 11/18/21 6:09 AM, John Roesler wrote:
Thanks for pointing that out, Scott!
You’re totally right; that should be a Throwable.
Just to put it out there, do you want to just send a quick PR? If not, no
worries. I’m just asking because it seems like you’ve already done the hard
part and it might be nice to get the contribution credit.
Thanks,
John
On Thu, Nov 18, 2021, at 08:00, Sinclair Scott wrote:
Hi there,
I'm a big fan of KStreams - thanks for all the great work!!
I unfortunately (my fault) had a StackOverflowError bug in my KStream
transformer which meant that the KStream died without reporting any
Exception at all.
The first log message showed some polling activity and then you see
later the State transition to PENDING_SHUTDOWN
Main Consumer poll completed in 2 ms and fetched 1 records
Flushing all global globalStores registered in the state manager
Idempotently invoking restoration logic in state RUNNING
Finished restoring all changelogs []
Idempotent restore call done. Thread state has not changed.
Processing tasks with 1 iterations.
Flushing all global globalStores registered in the state manager
State transition from RUNNING to PENDING_SHUTDOWN
This is because the StreamThread.run() method catches Exception only.
I ended up recompiling the kstreams and changing the catch to Throwable
so I can see what was going on. Then I discovered my bad recursive call
:(
Can we please change the Catch to catch Throwable , so that we are
always guaranteed some output?
StreamThread.java
@Override
public void run() {
log.info("Starting");
if (setState(State.STARTING) == null) {
log.info("StreamThread already shutdown. Not running");
return;
}
boolean cleanRun = false;
try {
runLoop();
cleanRun = true;
} catch (final Exception e) {
// we have caught all Kafka related exceptions, and other
runtime exceptions
// should be due to user application errors
if (e instanceof UnsupportedVersionException) {
final String errorMessage = e.getMessage();
if (errorMessage != null &&
errorMessage.startsWith("Broker unexpectedly doesn't
support requireStable flag on version ")) {
log.error("Shutting down because the Kafka cluster
seems to be on a too old version. " +
"Setting {}=\"{}\" requires broker version 2.5 or
higher.",
StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
EXACTLY_ONCE_BETA);
throw e;
}
}
log.error("Encountered the following exception during processing " +
"and the thread is going to shut down: ", e);
throw e;
} finally {
completeShutdown(cleanRun);
}
}
Thanks and kind regards
Scott Sinclair