gharris1727 commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1544787082
########## streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Objects; + +/** + * An enum representing the side of a join operation. + * It provides methods to create instances of {@link LeftOrRightValue} based on the side specified. + */ +@SuppressWarnings("unchecked") Review Comment: It looks like this is just moving the unchecked warning around, rather than fixing it. `<V, V1, V2> LeftOrRightValue<V1, V2> make(final V leftValue);` is impossible to type in a similar way to `<V> LeftOrRightValue make(final boolean isLeftSide, final V value)`, because it will always require casting V to V1 or V2. It might not be possible to do this with Enum, as it can't take type arguments. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java: ########## @@ -89,8 +74,8 @@ public V2 getRightValue() { @Override public String toString() { return "<" - + ((leftValue != null) ? "left," + leftValue : "right," + rightValue) - + ">"; + + ((leftValue != null) ? JoinSide.LEFT + "," + leftValue : JoinSide.RIGHT + "," + rightValue) Review Comment: This doesn't seem necessary. LeftOrRightValue can be completely unaware of the JoinSide enum. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java: ########## @@ -62,13 +62,13 @@ public LeftOrRightValue<V1, V2> deserialize(final String topic, final byte[] dat } return (data[0] == 1) - ? LeftOrRightValue.makeLeftValue(leftDeserializer.deserialize(topic, rawValue(data))) Review Comment: Same comment as in LeftOrRightValue, I don't think this is necessary. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Objects; + +/** + * An enum representing the side of a join operation. + * It provides methods to create instances of {@link LeftOrRightValue} based on the side specified. + */ +@SuppressWarnings("unchecked") +public enum JoinSide { + LEFT("left") { + /** + * Create a new {@link LeftOrRightValue} instance with the V1 value as {@code leftValue} and V2 value as null. + * + * @param leftValue the left V1 value + * @param <V1> the type of the value + * @return a new {@link LeftOrRightValue} instance + */ + @Override + public <V, V1, V2> LeftOrRightValue<V1, V2> make(final V leftValue) { + Objects.requireNonNull(leftValue, "The left join value is null"); + return (LeftOrRightValue<V1, V2>) new LeftOrRightValue<>(leftValue, null); + } + }, + + RIGHT("right") { + /** + * Create a new {@link LeftOrRightValue} instance with the V2 value as {@code rightValue} and V1 value as null. + * + * @param rightValue the right V2 value + * @param <V2> the type of the value + * @return a new {@link LeftOrRightValue} instance + */ + @Override + public <V, V1, V2> LeftOrRightValue<V1, V2> make(final V rightValue) { + Objects.requireNonNull(rightValue, "The left join value is null"); + return (LeftOrRightValue<V1, V2>) new LeftOrRightValue<>(null, rightValue); + } + + }; + + private final String joinSideName; + + JoinSide(final String joinSideName) { + this.joinSideName = joinSideName; + } + + public abstract <V, V1, V2> LeftOrRightValue<V1, V2> make(final V value); + + /** + * Returns true if this JoinSide represents the left side. + * + * @return true if this JoinSide represents the left side, otherwise false + */ + public boolean isLeftSide() { Review Comment: It looks like you inlined the `== LEFT` check, which doesn't really fix the problem. There really shouldn't be any `if (left)` statements in the caller at all. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java: ########## @@ -173,7 +174,7 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs, final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows); final KStreamKStreamJoin<K, V1, V2, VOut> joinThis = new KStreamKStreamJoin<>( - true, + JoinSide.LEFT, Review Comment: Sorry, I should have clarified that I wasn't necessarily asking you to change this, I was just making an observation. The ProcessorParameters and StreamStreamJoinNodeBuilder use "this" and "other", and I don't know whether it makes sense to change the naming there either. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java: ########## Review Comment: Sure, I don't think that we have to apply this change to other classes or look for more call-sites. I was just thinking out loud about the benefits of trying to make the fix reusable. Even if we're only changing KStreamKStreamJoin, I would much rather merge a large diff, with a type-safe and simple implementation than a small diff that is type-unsafe and complicated. I love diff-golfing for bugfixes, but I don't think it's appropriate for refactoring. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java: ########## @@ -41,6 +41,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.kafka.streams.state.internals.JoinSide; Review Comment: I just noticed that the JoinSide package is `state`, while this class is `internals`. Since this enum is so tightly bound to this class, it should probably be in the same package. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org