vvcephei commented on a change in pull request #8905: URL: https://github.com/apache/kafka/pull/8905#discussion_r443062971
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java ########## @@ -339,7 +341,7 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch recordContext ) ); - } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) { + } else if (Arrays.equals(record.headers().lastHeader("v").value(), V_1_CHANGELOG_HEADER_VALUE)) { Review comment: This is the fix (although it was probably fine before). The implementation of Header.equals is not specified by any contract, so it's safer to perform a direct comparison on the header values. Just as before, I'm comparing byte arrays to avoid deserializing the value. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java ########## @@ -286,6 +289,15 @@ private void logTombstone(final Bytes key) { private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch) { for (final ConsumerRecord<byte[], byte[]> record : batch) { + if (record.partition() != partition) { + throw new IllegalStateException( + String.format( + "record partition [%d] is being restored by the wrong suppress partition [%d]", + record.partition(), + partition + ) + ); + } Review comment: On the side, I realized we can consolidate this check and perform it first, rather than after we're already written bad data into the buffer. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org