fqaiser94 commented on code in PR #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r1089848517


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java:
##########
@@ -44,18 +42,30 @@ public void setIfUnset(final SerdeGetter getter) {
         }
     }
 
-    @Override
-    public Change<T> deserialize(final String topic, final Headers headers, 
final byte[] data) {
-
-        final byte[] bytes = new byte[data.length - NEWFLAG_SIZE];
-
-        System.arraycopy(data, 0, bytes, 0, bytes.length);
-
-        if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
-            return new Change<>(inner.deserialize(topic, headers, bytes), 
null);
+    private byte[] getData(final ByteBuffer buffer) {
+        final boolean dataIsNull = buffer.get() == (byte) 1;
+        final byte[] data;
+        if (dataIsNull) {
+            data = null;
         } else {
-            return new Change<>(null, inner.deserialize(topic, headers, 
bytes));
+            final int dataLength = buffer.getInt();
+            data = new byte[dataLength];
+            buffer.get(data);
         }
+        return data;
+    }
+
+    @Override
+    public Change<T> deserialize(final String topic, final Headers headers, 
final byte[] data) {
+        // The format we need to deserialize is:

Review Comment:
   Nevermind, think I've figured out the code changes needed to enable smooth 
rolling upgrades. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to