lucasbru commented on code in PR #18760: URL: https://github.com/apache/kafka/pull/18760#discussion_r1940875377
########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -758,185 +756,118 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, /** * Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} - * from the auto-generated topic using default serializers, deserializers, and producer's default partitioning strategy. - * The number of partitions is determined based on the upstream topics partition numbers. - * <p> - * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. - * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. - * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * from the auto-generated topic. + * + * <p>The created topic is considered an internal topic and is meant to be used only by the current + * Kafka Streams instance. + * The topic will be named as "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. + * Furthermore, the topic will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + * <p>You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To explicitly set key/value serdes, specify the number of used partitions or the partitioning strategy, + * or to customize the name of the repartition topic, use {@link #repartition(Repartitioned)}. * - * @return {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. + * @return A {@code KStream} that contains the exact same, but repartitioned records as this {@code KStream}. */ KStream<K, V> repartition(); /** - * Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} - * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner}, - * number of partitions, and topic name part as defined by {@link Repartitioned}. - * <p> - * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. - * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. - * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is either provided via {@link Repartitioned#as(String)} or an internally - * generated name, and "-repartition" is a fixed suffix. - * - * @param repartitioned the {@link Repartitioned} instance used to specify {@link Serdes}, - * {@link StreamPartitioner} which determines how records are distributed among partitions of the topic, - * part of the topic name, and number of partitions for a repartition topic. - * @return a {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. + * See {@link #repartition()}. */ KStream<K, V> repartition(final Repartitioned<K, V> repartitioned); /** - * Materialize this stream to a topic using default serializers specified in the config and producer's - * default partitioning strategy. - * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * Materialize this stream to a topic. + * The topic should be manually created before it is used (i.e., before the Kafka Streams application is * started). * - * @param topic the topic name + * <p>To explicitly set key/value serdes or the partitioning strategy, use {@link #to(String, Produced)}. + * + * @param topic + * the output topic name + * + * @see #to(TopicNameExtractor) */ void to(final String topic); /** - * Materialize this stream to a topic using the provided {@link Produced} instance. - * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is - * started). - * - * @param topic the topic name - * @param produced the options to use when producing to the topic + * See {@link #to(String).} */ void to(final String topic, final Produced<K, V> produced); /** - * Dynamically materialize this stream to topics using default serializers specified in the config and producer's - * default partitioning strategy. - * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}. + * Materialize the record of this stream to different topics. + * The provided {@link TopicNameExtractor} is applied to each input record to compute the output topic name. + * All topics should be manually created before they are use (i.e., before the Kafka Streams application is started). Review Comment: ```suggestion * All topics should be manually created before they are used (i.e., before the Kafka Streams application is started). ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -758,185 +756,118 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, /** * Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} - * from the auto-generated topic using default serializers, deserializers, and producer's default partitioning strategy. - * The number of partitions is determined based on the upstream topics partition numbers. - * <p> - * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. - * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. - * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * from the auto-generated topic. + * + * <p>The created topic is considered an internal topic and is meant to be used only by the current + * Kafka Streams instance. + * The topic will be named as "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. + * Furthermore, the topic will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + * <p>You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To explicitly set key/value serdes, specify the number of used partitions or the partitioning strategy, + * or to customize the name of the repartition topic, use {@link #repartition(Repartitioned)}. * - * @return {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. + * @return A {@code KStream} that contains the exact same, but repartitioned records as this {@code KStream}. */ KStream<K, V> repartition(); /** - * Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} - * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner}, - * number of partitions, and topic name part as defined by {@link Repartitioned}. - * <p> - * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. - * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. - * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is either provided via {@link Repartitioned#as(String)} or an internally - * generated name, and "-repartition" is a fixed suffix. - * - * @param repartitioned the {@link Repartitioned} instance used to specify {@link Serdes}, - * {@link StreamPartitioner} which determines how records are distributed among partitions of the topic, - * part of the topic name, and number of partitions for a repartition topic. - * @return a {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. + * See {@link #repartition()}. */ KStream<K, V> repartition(final Repartitioned<K, V> repartitioned); /** - * Materialize this stream to a topic using default serializers specified in the config and producer's - * default partitioning strategy. - * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * Materialize this stream to a topic. + * The topic should be manually created before it is used (i.e., before the Kafka Streams application is * started). * - * @param topic the topic name + * <p>To explicitly set key/value serdes or the partitioning strategy, use {@link #to(String, Produced)}. + * + * @param topic + * the output topic name + * + * @see #to(TopicNameExtractor) */ void to(final String topic); /** - * Materialize this stream to a topic using the provided {@link Produced} instance. - * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is - * started). - * - * @param topic the topic name - * @param produced the options to use when producing to the topic + * See {@link #to(String).} */ void to(final String topic, final Produced<K, V> produced); /** - * Dynamically materialize this stream to topics using default serializers specified in the config and producer's - * default partitioning strategy. - * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}. + * Materialize the record of this stream to different topics. + * The provided {@link TopicNameExtractor} is applied to each input record to compute the output topic name. + * All topics should be manually created before they are use (i.e., before the Kafka Streams application is started). + * + * <p>To explicitly set key/value serdes or the partitioning strategy, use {@link #to(TopicNameExtractor, Produced)}. * - * @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record + * @param topicExtractor + * the extractor to determine the name of the Kafka topic to write to for each record */ void to(final TopicNameExtractor<K, V> topicExtractor); /** - * Dynamically materialize this stream to topics using the provided {@link Produced} instance. - * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}. - * - * @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record - * @param produced the options to use when producing to the topic + * See {@link #to(TopicNameExtractor)}. Review Comment: The parameter `produced` is not explained anywhere now. Same for the other methods. Would it make sense to refer from most generic to the most specific overload instead? ########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -758,185 +756,118 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, /** * Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} - * from the auto-generated topic using default serializers, deserializers, and producer's default partitioning strategy. - * The number of partitions is determined based on the upstream topics partition numbers. - * <p> - * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. - * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. - * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * from the auto-generated topic. + * + * <p>The created topic is considered an internal topic and is meant to be used only by the current + * Kafka Streams instance. + * The topic will be named as "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. + * Furthermore, the topic will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + * <p>You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To explicitly set key/value serdes, specify the number of used partitions or the partitioning strategy, + * or to customize the name of the repartition topic, use {@link #repartition(Repartitioned)}. * - * @return {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. + * @return A {@code KStream} that contains the exact same, but repartitioned records as this {@code KStream}. */ KStream<K, V> repartition(); /** - * Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} - * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner}, - * number of partitions, and topic name part as defined by {@link Repartitioned}. - * <p> - * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. - * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. - * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is either provided via {@link Repartitioned#as(String)} or an internally - * generated name, and "-repartition" is a fixed suffix. - * - * @param repartitioned the {@link Repartitioned} instance used to specify {@link Serdes}, - * {@link StreamPartitioner} which determines how records are distributed among partitions of the topic, - * part of the topic name, and number of partitions for a repartition topic. - * @return a {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. + * See {@link #repartition()}. */ KStream<K, V> repartition(final Repartitioned<K, V> repartitioned); /** - * Materialize this stream to a topic using default serializers specified in the config and producer's - * default partitioning strategy. - * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * Materialize this stream to a topic. + * The topic should be manually created before it is used (i.e., before the Kafka Streams application is * started). * - * @param topic the topic name + * <p>To explicitly set key/value serdes or the partitioning strategy, use {@link #to(String, Produced)}. + * + * @param topic + * the output topic name + * + * @see #to(TopicNameExtractor) */ void to(final String topic); /** - * Materialize this stream to a topic using the provided {@link Produced} instance. - * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is - * started). - * - * @param topic the topic name - * @param produced the options to use when producing to the topic + * See {@link #to(String).} */ void to(final String topic, final Produced<K, V> produced); /** - * Dynamically materialize this stream to topics using default serializers specified in the config and producer's - * default partitioning strategy. - * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}. + * Materialize the record of this stream to different topics. + * The provided {@link TopicNameExtractor} is applied to each input record to compute the output topic name. + * All topics should be manually created before they are use (i.e., before the Kafka Streams application is started). + * + * <p>To explicitly set key/value serdes or the partitioning strategy, use {@link #to(TopicNameExtractor, Produced)}. * - * @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record + * @param topicExtractor + * the extractor to determine the name of the Kafka topic to write to for each record */ void to(final TopicNameExtractor<K, V> topicExtractor); /** - * Dynamically materialize this stream to topics using the provided {@link Produced} instance. - * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}. - * - * @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record - * @param produced the options to use when producing to the topic + * See {@link #to(TopicNameExtractor)}. */ void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced); /** * Convert this stream to a {@link KTable}. - * <p> - * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, - * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or - * {@link #process(ProcessorSupplier, String...)}) an internal repartitioning topic will be created in Kafka. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * The conversion is a logical operation and only changes the "interpretation" of the records, i.e., each record of + * this stream is a "fact/event" and is re-interpreted as a "change/update-per-key" now + * (cf. {@link KStream} vs {@link KTable}). The resulting {@link KTable} is essentially a changelog stream. + * To "upsert" the records of this stream into a materialized {@link KTable} (i.e., into a state store), + * use {@link #toTable(Materialized)}. + * + * <p>Note that {@code null} keys are not supported by {@code KTables} and records with {@code null} key will be dropped. + * + * <p>If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, + * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or {@link #process(ProcessorSupplier, String...)}) + * Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in + * Kafka and write and re-read the data via this topic such that the resulting {@link KTable} is correctly + * partitioned by its key. + * + * <p>This internal repartitioning topic will be named "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. - * <p> - * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * <p> - * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned - * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because - * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of + * The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. + * Furthermore, the topic will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + * <p>Note: If the result {@link KTable} is materialized, it is not possible to apply + * {@link StreamsConfig#REUSE_KTABLE_SOURCE_TOPICS "source topic optimization"}, because Review Comment: I'm not aware of this syntax. Do you know / have you checked that it works? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org