mjsax commented on a change in pull request #9075: URL: https://github.com/apache/kafka/pull/9075#discussion_r460302576
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ########## @@ -234,24 +234,18 @@ void initialize() { } void pollAndUpdate() { - try { - final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollTime); - for (final ConsumerRecord<byte[], byte[]> record : received) { - stateMaintainer.update(record); - } - final long now = time.milliseconds(); - if (now >= lastFlush + flushInterval) { - stateMaintainer.flushState(); - lastFlush = now; - } - } catch (final InvalidOffsetException recoverableException) { Review comment: We just let the original exception bubble up, to be able to wipe out the store. -- This is also just a side "improvement"; we could also just die and let users cleanup the state directory manually. However, it seems better to wipe it out directly. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org