mjsax commented on code in PR #11945:
URL: https://github.com/apache/kafka/pull/11945#discussion_r849942073


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java:
##########
@@ -21,22 +21,30 @@
 import java.util.Arrays;
 
 public class SubscriptionResponseWrapper<FV> {
-    final static byte CURRENT_VERSION = 0x00;
+    final static byte CURRENT_VERSION = 0x01;
+    // 0x00

Review Comment:
   nit:
   ```
   // version 0x00 fields:
   ```
   
   Same below:
   ```
   // version 0x01 fields:
   ```
   
   Or just change to `1` instead of `0x01` (compare Subscription.java -- might 
be nice to align both).



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java:
##########
@@ -23,12 +23,15 @@
 
 
 public class SubscriptionWrapper<K> {
-    static final byte CURRENT_VERSION = 0;
+    static final byte CURRENT_VERSION = 1;
 
+    // v0

Review Comment:
   nit:
   ```
   // v0 fields:
   ```
   
   Same for `v1` below



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java:
##########
@@ -141,17 +204,29 @@ public SubscriptionWrapper<K> deserialize(final String 
ignored, final byte[] dat
                 lengthSum += 2 * Long.BYTES;
             }
 
-            final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; 
//The remaining data is the serialized pk
-            buf.get(primaryKeyRaw, 0, primaryKeyRaw.length);
+            final int primaryKeyLength;
+            if (version > 0) {
+                primaryKeyLength = data.length - lengthSum - Integer.BYTES;
+            } else {
+                primaryKeyLength = data.length - lengthSum;
+            }
+            final byte[] primaryKeyRaw = new byte[primaryKeyLength];
+            buf.get(primaryKeyRaw, 0, primaryKeyLength);
 
             if (primaryKeySerializationPseudoTopic == null) {
                 primaryKeySerializationPseudoTopic = 
primaryKeySerializationPseudoTopicSupplier.get();
             }
 
             final K primaryKey = 
primaryKeyDeserializer.deserialize(primaryKeySerializationPseudoTopic,
-                                                                    
primaryKeyRaw);
+                primaryKeyRaw);

Review Comment:
   nit: indention



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java:
##########
@@ -21,22 +21,30 @@
 import java.util.Arrays;
 
 public class SubscriptionResponseWrapper<FV> {
-    final static byte CURRENT_VERSION = 0x00;
+    final static byte CURRENT_VERSION = 0x01;
+    // 0x00
     private final long[] originalValueHash;
     private final FV foreignValue;
     private final byte version;
+    // 0x01
+    private final Integer primaryPartition;

Review Comment:
   While we need this field in this class, so we actually need to serialize it? 
It seems it must not go over the wire, does it?
   
   Thus, for the subscription response, we don't need the dump the version? -- 
We only need it on the right hand side to write the response into the topic, 
but we can do this in-memory only?
   
   If this is correct, we also don't need to update the corresponding Serde.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java:
##########
@@ -62,36 +71,96 @@ public void setIfUnset(final SerdeGetter getter) {
             }
         }
 
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean 
isKey) {
+            this.upgradeFromV0 = upgradeFromV0(configs);
+        }
+
+        private static boolean upgradeFromV0(final Map<String, ?> configs) {
+            final Object upgradeFrom = 
configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+            if (upgradeFrom == null) {
+                return false;
+            }
+
+            switch ((String) upgradeFrom) {
+                case StreamsConfig.UPGRADE_FROM_0100:
+                case StreamsConfig.UPGRADE_FROM_0101:
+                case StreamsConfig.UPGRADE_FROM_0102:
+                case StreamsConfig.UPGRADE_FROM_0110:
+                case StreamsConfig.UPGRADE_FROM_10:
+                case StreamsConfig.UPGRADE_FROM_11:
+                case StreamsConfig.UPGRADE_FROM_20:
+                case StreamsConfig.UPGRADE_FROM_21:
+                case StreamsConfig.UPGRADE_FROM_22:
+                case StreamsConfig.UPGRADE_FROM_23:
+                case StreamsConfig.UPGRADE_FROM_24:
+                case StreamsConfig.UPGRADE_FROM_25:
+                case StreamsConfig.UPGRADE_FROM_26:
+                case StreamsConfig.UPGRADE_FROM_27:
+                case StreamsConfig.UPGRADE_FROM_28:
+                case StreamsConfig.UPGRADE_FROM_30:
+                case StreamsConfig.UPGRADE_FROM_31:
+                case StreamsConfig.UPGRADE_FROM_32:
+                    return true;
+                default:
+                    return false;
+            }
+        }
+
         @Override
         public byte[] serialize(final String topic, final 
SubscriptionResponseWrapper<V> data) {
-            
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized 
data}
+            
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized 
data}{4-bytes-primaryPartition}
 
             //7-bit (0x7F) maximum for data version.
             if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
                 throw new 
UnsupportedVersionException("SubscriptionResponseWrapper version is larger than 
maximum supported 0x7F");
             }
 
+            final int version = data.getVersion();
+            if (upgradeFromV0 || version == 0) {
+                return serializeV0(topic, data);
+            } else if (version == 1) {
+                return serializeV1(topic, data);
+            } else {
+                throw new UnsupportedVersionException("Unsupported 
SubscriptionWrapper version " + data.getVersion());
+            }
+        }
+
+        private ByteBuffer serializeCommon(
+            final String topic,
+            final SubscriptionResponseWrapper<V> data,
+            final byte version,
+            final int extraLength) {
             final byte[] serializedData = data.getForeignValue() == null ? 
null : serializer.serialize(topic, data.getForeignValue());

Review Comment:
   nit (formatting -- hard to see where method signature end and code starts):
   ```
       final int extraLength
   ) {
       final byte[] serializedData = data.getForeignValue() == null ? null : 
serializer.serialize(topic, data.getForeignValue());
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java:
##########
@@ -132,18 +201,31 @@ public SubscriptionResponseWrapper<V> deserialize(final 
String topic, final byte
             }
 
             final V value;
-            if (data.length - lengthSum > 0) {
+            final int valueLength;
+            if (version > 0) {
+                valueLength = data.length - lengthSum - Integer.BYTES;
+

Review Comment:
   nit: remove empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to