vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r443062971



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -339,7 +341,7 @@ private void restoreBatch(final 
Collection<ConsumerRecord<byte[], byte[]>> batch
                             recordContext
                         )
                     );
-                } else if 
(V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v")))
 {
+                } else if 
(Arrays.equals(record.headers().lastHeader("v").value(), 
V_1_CHANGELOG_HEADER_VALUE)) {

Review comment:
       This is the fix (although it was probably fine before). The 
implementation of Header.equals is not specified by any contract, so it's safer 
to perform a direct comparison on the header values. Just as before, I'm 
comparing byte arrays to avoid deserializing the value.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -286,6 +289,15 @@ private void logTombstone(final Bytes key) {
 
     private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> 
batch) {
         for (final ConsumerRecord<byte[], byte[]> record : batch) {
+            if (record.partition() != partition) {
+                throw new IllegalStateException(
+                    String.format(
+                        "record partition [%d] is being restored by the wrong 
suppress partition [%d]",
+                        record.partition(),
+                        partition
+                    )
+                );
+            }

Review comment:
       On the side, I realized we can consolidate this check and perform it 
first, rather than after we're already written bad data into the buffer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to