lucasbru commented on code in PR #18761: URL: https://github.com/apache/kafka/pull/18761#discussion_r1940899992
########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -1005,23 +1005,25 @@ <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V final Grouped<KOut, V> grouped); /** - * Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default - * serializers and deserializers. - * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. + * Join records of this (left) stream with another (right) {@code KStream}'s records using a windowed inner equi-join. + * The join is computed on using the records' key as join attribute, i.e., {@code leftRecord.key == rightRight.key}. * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps. - * <p> - * For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute - * a value (with arbitrary type) for the result record. + * + * <p>For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to + * compute a value (with arbitrary type) for the result record. * The key of the result record is the same as for both joining input records. - * If an input record key or value is {@code null} the record will not be included in the join operation and thus no - * output record will be added to the resulting {@code KStream}. - * <p> - * Example (assuming all input records belong to the correct windows): + * If you need read access to the join key, use {@link #join(KStream, ValueJoinerWithKey, JoinWindows)}. + * If an input record's key or value is {@code null} the input record will be dropped, and no join computation + * is triggered. + * Similarly, so-call late records, i.e., records with a timestamp belonging to an already closed window (based Review Comment: ```suggestion * Similarly, so-called late records, i.e., records with a timestamp belonging to an already closed window (based ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -1005,23 +1005,25 @@ <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V final Grouped<KOut, V> grouped); /** - * Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default - * serializers and deserializers. - * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. + * Join records of this (left) stream with another (right) {@code KStream}'s records using a windowed inner equi-join. + * The join is computed on using the records' key as join attribute, i.e., {@code leftRecord.key == rightRight.key}. Review Comment: ```suggestion * The join is computed using the records' key as join attribute, i.e., {@code leftRecord.key == rightRight.key}. ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -1040,283 +1042,91 @@ <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V * <td></td> * </tr> * </table> - * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * + * Both {@code KStreams} (or to be more precise, their underlying source topics) need to have the same number of * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before - * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - * <p> - * Repartitioning can happen for one or both of the joining {@code KStream}s. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - * <p> - * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names. + * If this is not the case (and if not auto-repartitioning happens, see further below), + * you would need to call {@link #repartition(Repartitioned)} (for at least one of both Review Comment: ```suggestion * you would need to call {@link #repartition(Repartitioned)} (for at least one of the two ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -1040,283 +1042,91 @@ <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V * <td></td> * </tr> * </table> - * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * + * Both {@code KStreams} (or to be more precise, their underlying source topics) need to have the same number of * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before - * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - * <p> - * Repartitioning can happen for one or both of the joining {@code KStream}s. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - * <p> - * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names. + * If this is not the case (and if not auto-repartitioning happens, see further below), + * you would need to call {@link #repartition(Repartitioned)} (for at least one of both + * {@code KStreams}) before doing the join and specify the "correct" number of partitions via {@link Repartitioned} + * parameter to align the partition count for both inputs to each other. + * Furthermore, both {@code KStreams} need to be co-partitioned on the join key (i.e., use the same partitioner). + * Note: Kafka Streams cannot verify the used partitioning strategy, so it is the user's responsibility to ensure + * that the same partitioner is used for both inputs for the join. + * + * <p>If a key changing operator was used before this operation on either input stream + * (e.g., {@link #selectKey(KeyValueMapper)}, {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or + * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will automatically repartition the data of the + * corresponding input stream, i.e., it will create an internal repartitioning topic in Kafka and write and re-read + * the data via this topic such that data is correctly partitioned by the join key. + * + * <p>The repartitioning topic(s) will be named "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * The number of partitions for the repartition topic(s) is determined based on both upstream topics partition Review Comment: ```suggestion * The number of partitions for the repartition topic(s) is determined based on both upstream topics' partition ``` Not even sure, but `upstream topics partition numbers` is a bit confusing. Maybe `...based on the partition numbers of both upstream topics`. ########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -1040,283 +1042,91 @@ <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V * <td></td> * </tr> * </table> - * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * + * Both {@code KStreams} (or to be more precise, their underlying source topics) need to have the same number of * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before - * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - * <p> - * Repartitioning can happen for one or both of the joining {@code KStream}s. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - * <p> - * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names. + * If this is not the case (and if not auto-repartitioning happens, see further below), + * you would need to call {@link #repartition(Repartitioned)} (for at least one of both + * {@code KStreams}) before doing the join and specify the "correct" number of partitions via {@link Repartitioned} + * parameter to align the partition count for both inputs to each other. + * Furthermore, both {@code KStreams} need to be co-partitioned on the join key (i.e., use the same partitioner). + * Note: Kafka Streams cannot verify the used partitioning strategy, so it is the user's responsibility to ensure + * that the same partitioner is used for both inputs for the join. + * + * <p>If a key changing operator was used before this operation on either input stream + * (e.g., {@link #selectKey(KeyValueMapper)}, {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or + * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will automatically repartition the data of the + * corresponding input stream, i.e., it will create an internal repartitioning topic in Kafka and write and re-read + * the data via this topic such that data is correctly partitioned by the join key. + * + * <p>The repartitioning topic(s) will be named "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * The number of partitions for the repartition topic(s) is determined based on both upstream topics partition + * numbers, and Kafka Streams will automatically align the number of partitions if required for co-partitioning. + * Furthermore, the topic(s) will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + * <p>Both of the joining {@code KStream}s will be materialized in local state stores. * For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. - * The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified - * in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an - * internally generated name, and "-changelog" is a fixed suffix. - * <p> - * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * The changelog topic will be named "${applicationId}-<storename>-changelog", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "storeName" is an internally generated name, and "-changelog" is a fixed suffix. + * + * <p>You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To explicitly set key/value serdes, to customize the names of the repartition and changelog topic, or to + * customize the use state store, use {@link #join(KStream, ValueJoiner, JoinWindows, StreamJoined)}. Review Comment: ```suggestion * customize the used state store, use {@link #join(KStream, ValueJoiner, JoinWindows, StreamJoined)}. ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -1040,283 +1042,91 @@ <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V * <td></td> * </tr> * </table> - * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * + * Both {@code KStreams} (or to be more precise, their underlying source topics) need to have the same number of * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before - * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - * <p> - * Repartitioning can happen for one or both of the joining {@code KStream}s. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - * <p> - * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names. + * If this is not the case (and if not auto-repartitioning happens, see further below), + * you would need to call {@link #repartition(Repartitioned)} (for at least one of both + * {@code KStreams}) before doing the join and specify the "correct" number of partitions via {@link Repartitioned} Review Comment: ```suggestion * {@code KStreams}) before doing the join and specify the matching number of partitions via {@link Repartitioned} ``` ? ########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -1040,283 +1042,91 @@ <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V * <td></td> * </tr> * </table> - * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * + * Both {@code KStreams} (or to be more precise, their underlying source topics) need to have the same number of * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before - * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - * <p> - * Repartitioning can happen for one or both of the joining {@code KStream}s. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - * <p> - * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names. + * If this is not the case (and if not auto-repartitioning happens, see further below), + * you would need to call {@link #repartition(Repartitioned)} (for at least one of both + * {@code KStreams}) before doing the join and specify the "correct" number of partitions via {@link Repartitioned} + * parameter to align the partition count for both inputs to each other. + * Furthermore, both {@code KStreams} need to be co-partitioned on the join key (i.e., use the same partitioner). + * Note: Kafka Streams cannot verify the used partitioning strategy, so it is the user's responsibility to ensure + * that the same partitioner is used for both inputs for the join. + * + * <p>If a key changing operator was used before this operation on either input stream + * (e.g., {@link #selectKey(KeyValueMapper)}, {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or + * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will automatically repartition the data of the + * corresponding input stream, i.e., it will create an internal repartitioning topic in Kafka and write and re-read + * the data via this topic such that data is correctly partitioned by the join key. + * + * <p>The repartitioning topic(s) will be named "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * The number of partitions for the repartition topic(s) is determined based on both upstream topics partition + * numbers, and Kafka Streams will automatically align the number of partitions if required for co-partitioning. + * Furthermore, the topic(s) will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + * <p>Both of the joining {@code KStream}s will be materialized in local state stores. Review Comment: ```suggestion * <p>Both of the joined {@code KStream}s will be materialized in local state stores. ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -1040,283 +1042,91 @@ <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V * <td></td> * </tr> * </table> - * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * + * Both {@code KStreams} (or to be more precise, their underlying source topics) need to have the same number of * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before - * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - * <p> - * Repartitioning can happen for one or both of the joining {@code KStream}s. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - * <p> - * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names. + * If this is not the case (and if not auto-repartitioning happens, see further below), + * you would need to call {@link #repartition(Repartitioned)} (for at least one of both + * {@code KStreams}) before doing the join and specify the "correct" number of partitions via {@link Repartitioned} + * parameter to align the partition count for both inputs to each other. + * Furthermore, both {@code KStreams} need to be co-partitioned on the join key (i.e., use the same partitioner). + * Note: Kafka Streams cannot verify the used partitioning strategy, so it is the user's responsibility to ensure Review Comment: ```suggestion * Note: Kafka Streams cannot verify the used partitioner, so it is the user's responsibility to ensure ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org