mjsax commented on code in PR #18721: URL: https://github.com/apache/kafka/pull/18721#discussion_r1934494010
########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -2097,276 +2102,113 @@ <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, * <td><K1:ValueJoiner(C,b)></td> * </tr> * </table> - * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} - * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given + * + * By default, {@code KStream} records are processed by performing a lookup for matching records in the + * <em>current</em> (i.e., processing time) internal {@link KTable} state. + * This default implementation does not handle out-of-order records in either input of the join well. + * See {@link #join(KTable, ValueJoiner, Joined)} on how to configure a stream-table join to handle out-of-order + * data. + * + * <p>Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions (cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}). + * If this is not the case (and if not auto-repartitioning happens for the stream, see further below), you would + * need to call {@link #repartition(Repartitioned)} for this {@code KStream} before doing the join, specifying the + * same number of partitions via {@link Repartitioned} parameter as the given {@link KTable}. + * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). + * Note: Kafka Streams cannot verify the used partitioning strategy, so it is the user's responsibility to ensure + * that the same partitioner is used for both input for the join. + * + * <p>If a key changing operator was used on this {@code KStream} before this operation + * (e.g., {@link #selectKey(KeyValueMapper)}, {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or + * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will automatically repartition the data of this + * {@code KStream}, i.e., it will create an internal repartitioning topic in Kafka and write and re-read + * the data via this topic such that data is correctly partitioned by the {@link KTable}'s key. + * + * <p>The repartitioning topic will be named "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * The number of partitions for the repartition topic is determined based on number of partitions of the * {@link KTable}. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); - * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - * <p> - * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * <p> - * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. + * Furthermore, the topic(s) will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + * <p>You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To explicitly set key/value serdes or to customize the names of the repartition topic, + * use {@link #join(KTable, ValueJoiner, Joined)}. + * For more control over the repartitioning, use {@link #repartition(Repartitioned)} before {@code join()}. + * + * @param table + * the {@link KTable} to be joined with this stream + * @param joiner + * a {@link ValueJoiner} that computes the join result for a pair of matching records + * + * @param <VTable> the value type of the table + * @param <VOut> the value type of the result stream + * + * @return A {@code KStream} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key * - * @param table the {@link KTable} to be joined with this stream - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param <VT> the value type of the table - * @param <VR> the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one for each matched record-pair with the same key * @see #leftJoin(KTable, ValueJoiner) - * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner) */ - <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, - final ValueJoiner<? super V, ? super VT, ? extends VR> joiner); + <VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table, + final ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner); /** - * Join records of this stream with {@link KTable}'s records using non-windowed inner equi join with default - * serializers and deserializers. - * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. - * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time) internal - * {@link KTable} state. - * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and - * will not produce any result records. - * <p> - * For each {@code KStream} record that finds a corresponding record in {@link KTable} the provided - * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. - * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. + * See {@link #join(KTable, ValueJoiner)}. * - * The key of the result record is the same as for both joining input records. - * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join - * operation and thus no output record will be added to the resulting {@code KStream}. - * <p> - * Example: - * <table border='1'> - * <tr> - * <th>KStream</th> - * <th>KTable</th> - * <th>state</th> - * <th>result</th> - * </tr> - * <tr> - * <td><K1:A></td> - * <td></td> - * <td></td> - * <td></td> - * </tr> - * <tr> - * <td></td> - * <td><K1:b></td> - * <td><K1:b></td> - * <td></td> - * </tr> - * <tr> - * <td><K1:C></td> - * <td></td> - * <td><K1:b></td> - * <td><K1:ValueJoinerWithKey(K1,C,b)></td> - * </tr> - * </table> - * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} - * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given - * {@link KTable}. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); - * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}. - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - * <p> - * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * <p> - * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - * - * @param table the {@link KTable} to be joined with this stream - * @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records - * @param <VT> the value type of the table - * @param <VR> the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoinerWithKey}, one for each matched record-pair with the same key - * @see #leftJoin(KTable, ValueJoinerWithKey) - * @see #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey) + * <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning. */ - <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, - final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner); + <VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table, + final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner); /** - * Join records of this stream with {@link KTable}'s records using non-windowed inner equi join with default - * serializers and deserializers. - * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. - * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time) internal - * {@link KTable} state. - * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and - * will not produce any result records. - * <p> - * For each {@code KStream} record that finds a corresponding record in {@link KTable} the provided - * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. - * The key of the result record is the same as for both joining input records. - * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join - * operation and thus no output record will be added to the resulting {@code KStream}. - * <p> - * Example: - * <table border='1'> - * <tr> - * <th>KStream</th> - * <th>KTable</th> - * <th>state</th> - * <th>result</th> - * </tr> - * <tr> - * <td><K1:A></td> - * <td></td> - * <td></td> - * <td></td> - * </tr> - * <tr> - * <td></td> - * <td><K1:b></td> - * <td><K1:b></td> - * <td></td> - * </tr> - * <tr> - * <td><K1:C></td> - * <td></td> - * <td><K1:b></td> - * <td><K1:ValueJoiner(C,b)></td> - * </tr> - * </table> - * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} - * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given - * {@link KTable}. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); - * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - * <p> - * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * <p> - * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. + * Join records of this stream with {@link KTable}'s records using non-windowed inner equi join. + * In contrast to {@link #join(KTable, ValueJoiner)}, but only if the used {@link KTable} is backed by a + * {@link org.apache.kafka.streams.state.VersionedKeyValueStore VersionedKeyValueStore}, the additional + * {@link Joined} parameter allows to specify a join grace-period, to handle out-of-order data gracefully. * - * @param table the {@link KTable} to be joined with this stream - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param joined a {@link Joined} instance that defines the serdes to - * be used to serialize/deserialize inputs of the joined streams - * @param <VT> the value type of the table - * @param <VR> the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one for each matched record-pair with the same key - * @see #leftJoin(KTable, ValueJoiner, Joined) - * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner) + * <p>For details about stream-table semantics, including co-partitioning requirements, (auto-)repartitioning, + * and more see {@link #join(KTable, ValueJoiner)}. + * If you specify a grace-period to handle out-of-order data, see further details below. + * + * <p>To handle out-of-order records, the input {@link KTable} must use a + * {@link org.apache.kafka.streams.state.VersionedKeyValueStore VersionedKeyValueStore} (specified via a + * {@link Materialized} parameter when the {@link KTable} is created), and a join + * {@link Joined#withGracePeriod(Duration) grace-period} must be specified. + * For this case, {@code KStream} records are buffered until the end of the grace period and the {@link KTable} + * lookup is performed with some delay. + * Given that the {@link KTable} state is versioned, the lookup can use "event time", allowing out-of-order + * {@code KStream} records, to join to the right (older) version of a {@link KTable} record with the same key. + * Also, {@link KTable} out-of-order updates are handled correctly by the versioned state store. + * Note, that using a join grace-period introduces the notion of "late records", i.e., records with a timestamp + * smaller than the defined grace-period allows; these "late records" will be dropped, and not join computation + * is triggered. + * Using a versioned state store for the {@link KTable} also implies that the defined "history retention" provides + * a cut-off point, and "late record" will be dropped, not updating the {@link KTable} state. + + * <p>If a join grace-period is specified, the {@code KStream} will be materialized in a local state store. Review Comment: It's this way on purpose. Note, it's about `KStream` side materialization, and we only materialize the stream, if a grace-period is specific -- but we only allow this, if the `KTable` is versioned. For regular/non-versioned KTables, using a grace-period and materializing/buffering the stream records does not make sense -- if there not multiple versions per-key, there is no point in delaying the KTable lookup. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org