mjsax commented on code in PR #11945: URL: https://github.com/apache/kafka/pull/11945#discussion_r849942073
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java: ########## @@ -21,22 +21,30 @@ import java.util.Arrays; public class SubscriptionResponseWrapper<FV> { - final static byte CURRENT_VERSION = 0x00; + final static byte CURRENT_VERSION = 0x01; + // 0x00 Review Comment: nit: ``` // version 0x00 fields: ``` Same below: ``` // version 0x01 fields: ``` Or just change to `1` instead of `0x01` (compare Subscription.java -- might be nice to align both). ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java: ########## @@ -23,12 +23,15 @@ public class SubscriptionWrapper<K> { - static final byte CURRENT_VERSION = 0; + static final byte CURRENT_VERSION = 1; + // v0 Review Comment: nit: ``` // v0 fields: ``` Same for `v1` below ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java: ########## @@ -141,17 +204,29 @@ public SubscriptionWrapper<K> deserialize(final String ignored, final byte[] dat lengthSum += 2 * Long.BYTES; } - final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; //The remaining data is the serialized pk - buf.get(primaryKeyRaw, 0, primaryKeyRaw.length); + final int primaryKeyLength; + if (version > 0) { + primaryKeyLength = data.length - lengthSum - Integer.BYTES; + } else { + primaryKeyLength = data.length - lengthSum; + } + final byte[] primaryKeyRaw = new byte[primaryKeyLength]; + buf.get(primaryKeyRaw, 0, primaryKeyLength); if (primaryKeySerializationPseudoTopic == null) { primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get(); } final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerializationPseudoTopic, - primaryKeyRaw); + primaryKeyRaw); Review Comment: nit: indention ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java: ########## @@ -21,22 +21,30 @@ import java.util.Arrays; public class SubscriptionResponseWrapper<FV> { - final static byte CURRENT_VERSION = 0x00; + final static byte CURRENT_VERSION = 0x01; + // 0x00 private final long[] originalValueHash; private final FV foreignValue; private final byte version; + // 0x01 + private final Integer primaryPartition; Review Comment: While we need this field in this class, so we actually need to serialize it? It seems it must not go over the wire, does it? Thus, for the subscription response, we don't need the dump the version? -- We only need it on the right hand side to write the response into the topic, but we can do this in-memory only? If this is correct, we also don't need to update the corresponding Serde. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java: ########## @@ -62,36 +71,96 @@ public void setIfUnset(final SerdeGetter getter) { } } + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + this.upgradeFromV0 = upgradeFromV0(configs); + } + + private static boolean upgradeFromV0(final Map<String, ?> configs) { + final Object upgradeFrom = configs.get(StreamsConfig.UPGRADE_FROM_CONFIG); + if (upgradeFrom == null) { + return false; + } + + switch ((String) upgradeFrom) { + case StreamsConfig.UPGRADE_FROM_0100: + case StreamsConfig.UPGRADE_FROM_0101: + case StreamsConfig.UPGRADE_FROM_0102: + case StreamsConfig.UPGRADE_FROM_0110: + case StreamsConfig.UPGRADE_FROM_10: + case StreamsConfig.UPGRADE_FROM_11: + case StreamsConfig.UPGRADE_FROM_20: + case StreamsConfig.UPGRADE_FROM_21: + case StreamsConfig.UPGRADE_FROM_22: + case StreamsConfig.UPGRADE_FROM_23: + case StreamsConfig.UPGRADE_FROM_24: + case StreamsConfig.UPGRADE_FROM_25: + case StreamsConfig.UPGRADE_FROM_26: + case StreamsConfig.UPGRADE_FROM_27: + case StreamsConfig.UPGRADE_FROM_28: + case StreamsConfig.UPGRADE_FROM_30: + case StreamsConfig.UPGRADE_FROM_31: + case StreamsConfig.UPGRADE_FROM_32: + return true; + default: + return false; + } + } + @Override public byte[] serialize(final String topic, final SubscriptionResponseWrapper<V> data) { - //{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data} + //{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}{4-bytes-primaryPartition} //7-bit (0x7F) maximum for data version. if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) { throw new UnsupportedVersionException("SubscriptionResponseWrapper version is larger than maximum supported 0x7F"); } + final int version = data.getVersion(); + if (upgradeFromV0 || version == 0) { + return serializeV0(topic, data); + } else if (version == 1) { + return serializeV1(topic, data); + } else { + throw new UnsupportedVersionException("Unsupported SubscriptionWrapper version " + data.getVersion()); + } + } + + private ByteBuffer serializeCommon( + final String topic, + final SubscriptionResponseWrapper<V> data, + final byte version, + final int extraLength) { final byte[] serializedData = data.getForeignValue() == null ? null : serializer.serialize(topic, data.getForeignValue()); Review Comment: nit (formatting -- hard to see where method signature end and code starts): ``` final int extraLength ) { final byte[] serializedData = data.getForeignValue() == null ? null : serializer.serialize(topic, data.getForeignValue()); ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java: ########## @@ -132,18 +201,31 @@ public SubscriptionResponseWrapper<V> deserialize(final String topic, final byte } final V value; - if (data.length - lengthSum > 0) { + final int valueLength; + if (version > 0) { + valueLength = data.length - lengthSum - Integer.BYTES; + Review Comment: nit: remove empty line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org