lucasbru commented on code in PR #18761:
URL: https://github.com/apache/kafka/pull/18761#discussion_r1940899992


##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -1005,23 +1005,25 @@ <KOut> KGroupedStream<KOut, V> groupBy(final 
KeyValueMapper<? super K, ? super V
                                            final Grouped<KOut, V> grouped);
 
     /**
-     * Join records of this stream with another {@code KStream}'s records 
using windowed inner equi join with default
-     * serializers and deserializers.
-     * The join is computed on the records' key with join attribute {@code 
thisKStream.key == otherKStream.key}.
+     * Join records of this (left) stream with another (right) {@code 
KStream}'s records using a windowed inner equi-join.
+     * The join is computed on using the records' key as join attribute, i.e., 
{@code leftRecord.key == rightRight.key}.
      * Furthermore, two records are only joined if their timestamps are close 
to each other as defined by the given
      * {@link JoinWindows}, i.e., the window defines an additional join 
predicate on the record timestamps.
-     * <p>
-     * For each pair of records meeting both join predicates the provided 
{@link ValueJoiner} will be called to compute
-     * a value (with arbitrary type) for the result record.
+     *
+     * <p>For each pair of records meeting both join predicates the provided 
{@link ValueJoiner} will be called to
+     * compute a value (with arbitrary type) for the result record.
      * The key of the result record is the same as for both joining input 
records.
-     * If an input record key or value is {@code null} the record will not be 
included in the join operation and thus no
-     * output record will be added to the resulting {@code KStream}.
-     * <p>
-     * Example (assuming all input records belong to the correct windows):
+     * If you need read access to the join key, use {@link #join(KStream, 
ValueJoinerWithKey, JoinWindows)}.
+     * If an input record's key or value is {@code null} the input record will 
be dropped, and no join computation
+     * is triggered.
+     * Similarly, so-call late records, i.e., records with a timestamp 
belonging to an already closed window (based

Review Comment:
   ```suggestion
        * Similarly, so-called late records, i.e., records with a timestamp 
belonging to an already closed window (based
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -1005,23 +1005,25 @@ <KOut> KGroupedStream<KOut, V> groupBy(final 
KeyValueMapper<? super K, ? super V
                                            final Grouped<KOut, V> grouped);
 
     /**
-     * Join records of this stream with another {@code KStream}'s records 
using windowed inner equi join with default
-     * serializers and deserializers.
-     * The join is computed on the records' key with join attribute {@code 
thisKStream.key == otherKStream.key}.
+     * Join records of this (left) stream with another (right) {@code 
KStream}'s records using a windowed inner equi-join.
+     * The join is computed on using the records' key as join attribute, i.e., 
{@code leftRecord.key == rightRight.key}.

Review Comment:
   ```suggestion
        * The join is computed using the records' key as join attribute, i.e., 
{@code leftRecord.key == rightRight.key}.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -1040,283 +1042,91 @@ <KOut> KGroupedStream<KOut, V> groupBy(final 
KeyValueMapper<? super K, ? super V
      * <td></td>
      * </tr>
      * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
+     *
+     * Both {@code KStreams} (or to be more precise, their underlying source 
topics) need to have the same number of
      * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} (for one input stream) before
-     * doing the join and specify the "correct" number of partitions via 
{@link Repartitioned} parameter.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner).
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * Repartitioning can happen for one or both of the joining {@code 
KStream}s.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
-     * <p>
-     * Both of the joining {@code KStream}s will be materialized in local 
state stores with auto-generated store names.
+     * If this is not the case (and if not auto-repartitioning happens, see 
further below),
+     * you would need to call {@link #repartition(Repartitioned)} (for at 
least one of both

Review Comment:
   ```suggestion
        * you would need to call {@link #repartition(Repartitioned)} (for at 
least one of the two
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -1040,283 +1042,91 @@ <KOut> KGroupedStream<KOut, V> groupBy(final 
KeyValueMapper<? super K, ? super V
      * <td></td>
      * </tr>
      * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
+     *
+     * Both {@code KStreams} (or to be more precise, their underlying source 
topics) need to have the same number of
      * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} (for one input stream) before
-     * doing the join and specify the "correct" number of partitions via 
{@link Repartitioned} parameter.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner).
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * Repartitioning can happen for one or both of the joining {@code 
KStream}s.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
-     * <p>
-     * Both of the joining {@code KStream}s will be materialized in local 
state stores with auto-generated store names.
+     * If this is not the case (and if not auto-repartitioning happens, see 
further below),
+     * you would need to call {@link #repartition(Repartitioned)} (for at 
least one of both
+     * {@code KStreams}) before doing the join and specify the "correct" 
number of partitions via {@link Repartitioned}
+     * parameter to align the partition count for both inputs to each other.
+     * Furthermore, both {@code KStreams} need to be co-partitioned on the 
join key (i.e., use the same partitioner).
+     * Note: Kafka Streams cannot verify the used partitioning strategy, so it 
is the user's responsibility to ensure
+     * that the same partitioner is used for both inputs for the join.
+     *
+     * <p>If a key changing operator was used before this operation on either 
input stream
+     * (e.g., {@link #selectKey(KeyValueMapper)}, {@link 
#map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
+     * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will 
automatically repartition the data of the
+     * corresponding input stream, i.e., it will create an internal 
repartitioning topic in Kafka and write and re-read
+     * the data via this topic such that data is correctly partitioned by the 
join key.
+     *
+     * <p>The repartitioning topic(s) will be named 
"${applicationId}-&lt;name&gt;-repartition",
+     * where "applicationId" is user-specified in {@link StreamsConfig} via 
parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "&lt;name&gt;" is an internally generated name, and "-repartition" is a 
fixed suffix.
+     * The number of partitions for the repartition topic(s) is determined 
based on both upstream topics partition

Review Comment:
   ```suggestion
        * The number of partitions for the repartition topic(s) is determined 
based on both upstream topics' partition
   ```
   
   Not even sure, but `upstream topics partition numbers` is a bit confusing. 
Maybe `...based on the partition numbers of both upstream topics`.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -1040,283 +1042,91 @@ <KOut> KGroupedStream<KOut, V> groupBy(final 
KeyValueMapper<? super K, ? super V
      * <td></td>
      * </tr>
      * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
+     *
+     * Both {@code KStreams} (or to be more precise, their underlying source 
topics) need to have the same number of
      * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} (for one input stream) before
-     * doing the join and specify the "correct" number of partitions via 
{@link Repartitioned} parameter.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner).
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * Repartitioning can happen for one or both of the joining {@code 
KStream}s.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
-     * <p>
-     * Both of the joining {@code KStream}s will be materialized in local 
state stores with auto-generated store names.
+     * If this is not the case (and if not auto-repartitioning happens, see 
further below),
+     * you would need to call {@link #repartition(Repartitioned)} (for at 
least one of both
+     * {@code KStreams}) before doing the join and specify the "correct" 
number of partitions via {@link Repartitioned}
+     * parameter to align the partition count for both inputs to each other.
+     * Furthermore, both {@code KStreams} need to be co-partitioned on the 
join key (i.e., use the same partitioner).
+     * Note: Kafka Streams cannot verify the used partitioning strategy, so it 
is the user's responsibility to ensure
+     * that the same partitioner is used for both inputs for the join.
+     *
+     * <p>If a key changing operator was used before this operation on either 
input stream
+     * (e.g., {@link #selectKey(KeyValueMapper)}, {@link 
#map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
+     * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will 
automatically repartition the data of the
+     * corresponding input stream, i.e., it will create an internal 
repartitioning topic in Kafka and write and re-read
+     * the data via this topic such that data is correctly partitioned by the 
join key.
+     *
+     * <p>The repartitioning topic(s) will be named 
"${applicationId}-&lt;name&gt;-repartition",
+     * where "applicationId" is user-specified in {@link StreamsConfig} via 
parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "&lt;name&gt;" is an internally generated name, and "-repartition" is a 
fixed suffix.
+     * The number of partitions for the repartition topic(s) is determined 
based on both upstream topics partition
+     * numbers, and Kafka Streams will automatically align the number of 
partitions if required for co-partitioning.
+     * Furthermore, the topic(s) will be created with infinite retention time 
and data will be automatically purged
+     * by Kafka Streams.
+     *
+     * <p>Both of the joining {@code KStream}s will be materialized in local 
state stores.
      * For failure and recovery each store will be backed by an internal 
changelog topic that will be created in Kafka.
-     * The changelog topic will be named 
"${applicationId}-&lt;storename&gt;-changelog", where "applicationId" is 
user-specified
-     * in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"storeName" is an
-     * internally generated name, and "-changelog" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
+     * The changelog topic will be named 
"${applicationId}-&lt;storename&gt;-changelog",
+     * where "applicationId" is user-specified in {@link StreamsConfig} via 
parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "storeName" is an internally generated name, and "-changelog" is a 
fixed suffix.
+     *
+     * <p>You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
+     * To explicitly set key/value serdes, to customize the names of the 
repartition and changelog topic, or to
+     * customize the use state store, use {@link #join(KStream, ValueJoiner, 
JoinWindows, StreamJoined)}.

Review Comment:
   ```suggestion
        * customize the used state store, use {@link #join(KStream, 
ValueJoiner, JoinWindows, StreamJoined)}.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -1040,283 +1042,91 @@ <KOut> KGroupedStream<KOut, V> groupBy(final 
KeyValueMapper<? super K, ? super V
      * <td></td>
      * </tr>
      * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
+     *
+     * Both {@code KStreams} (or to be more precise, their underlying source 
topics) need to have the same number of
      * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} (for one input stream) before
-     * doing the join and specify the "correct" number of partitions via 
{@link Repartitioned} parameter.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner).
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * Repartitioning can happen for one or both of the joining {@code 
KStream}s.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
-     * <p>
-     * Both of the joining {@code KStream}s will be materialized in local 
state stores with auto-generated store names.
+     * If this is not the case (and if not auto-repartitioning happens, see 
further below),
+     * you would need to call {@link #repartition(Repartitioned)} (for at 
least one of both
+     * {@code KStreams}) before doing the join and specify the "correct" 
number of partitions via {@link Repartitioned}

Review Comment:
   ```suggestion
        * {@code KStreams}) before doing the join and specify the matching 
number of partitions via {@link Repartitioned}
   ```
   
   ?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -1040,283 +1042,91 @@ <KOut> KGroupedStream<KOut, V> groupBy(final 
KeyValueMapper<? super K, ? super V
      * <td></td>
      * </tr>
      * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
+     *
+     * Both {@code KStreams} (or to be more precise, their underlying source 
topics) need to have the same number of
      * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} (for one input stream) before
-     * doing the join and specify the "correct" number of partitions via 
{@link Repartitioned} parameter.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner).
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * Repartitioning can happen for one or both of the joining {@code 
KStream}s.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
-     * <p>
-     * Both of the joining {@code KStream}s will be materialized in local 
state stores with auto-generated store names.
+     * If this is not the case (and if not auto-repartitioning happens, see 
further below),
+     * you would need to call {@link #repartition(Repartitioned)} (for at 
least one of both
+     * {@code KStreams}) before doing the join and specify the "correct" 
number of partitions via {@link Repartitioned}
+     * parameter to align the partition count for both inputs to each other.
+     * Furthermore, both {@code KStreams} need to be co-partitioned on the 
join key (i.e., use the same partitioner).
+     * Note: Kafka Streams cannot verify the used partitioning strategy, so it 
is the user's responsibility to ensure
+     * that the same partitioner is used for both inputs for the join.
+     *
+     * <p>If a key changing operator was used before this operation on either 
input stream
+     * (e.g., {@link #selectKey(KeyValueMapper)}, {@link 
#map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
+     * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will 
automatically repartition the data of the
+     * corresponding input stream, i.e., it will create an internal 
repartitioning topic in Kafka and write and re-read
+     * the data via this topic such that data is correctly partitioned by the 
join key.
+     *
+     * <p>The repartitioning topic(s) will be named 
"${applicationId}-&lt;name&gt;-repartition",
+     * where "applicationId" is user-specified in {@link StreamsConfig} via 
parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "&lt;name&gt;" is an internally generated name, and "-repartition" is a 
fixed suffix.
+     * The number of partitions for the repartition topic(s) is determined 
based on both upstream topics partition
+     * numbers, and Kafka Streams will automatically align the number of 
partitions if required for co-partitioning.
+     * Furthermore, the topic(s) will be created with infinite retention time 
and data will be automatically purged
+     * by Kafka Streams.
+     *
+     * <p>Both of the joining {@code KStream}s will be materialized in local 
state stores.

Review Comment:
   ```suggestion
        * <p>Both of the joined {@code KStream}s will be materialized in local 
state stores.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -1040,283 +1042,91 @@ <KOut> KGroupedStream<KOut, V> groupBy(final 
KeyValueMapper<? super K, ? super V
      * <td></td>
      * </tr>
      * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
+     *
+     * Both {@code KStreams} (or to be more precise, their underlying source 
topics) need to have the same number of
      * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} (for one input stream) before
-     * doing the join and specify the "correct" number of partitions via 
{@link Repartitioned} parameter.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner).
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * Repartitioning can happen for one or both of the joining {@code 
KStream}s.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
-     * <p>
-     * Both of the joining {@code KStream}s will be materialized in local 
state stores with auto-generated store names.
+     * If this is not the case (and if not auto-repartitioning happens, see 
further below),
+     * you would need to call {@link #repartition(Repartitioned)} (for at 
least one of both
+     * {@code KStreams}) before doing the join and specify the "correct" 
number of partitions via {@link Repartitioned}
+     * parameter to align the partition count for both inputs to each other.
+     * Furthermore, both {@code KStreams} need to be co-partitioned on the 
join key (i.e., use the same partitioner).
+     * Note: Kafka Streams cannot verify the used partitioning strategy, so it 
is the user's responsibility to ensure

Review Comment:
   ```suggestion
        * Note: Kafka Streams cannot verify the used partitioner, so it is the 
user's responsibility to ensure
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to