mjsax commented on code in PR #18721:
URL: https://github.com/apache/kafka/pull/18721#discussion_r1934494010


##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -2097,276 +2102,113 @@ <VO, VR> KStream<K, VR> outerJoin(final KStream<K, 
VO> otherStream,
      * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
      * </tr>
      * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
+     *
+     * By default, {@code KStream} records are processed by performing a 
lookup for matching records in the
+     * <em>current</em> (i.e., processing time) internal {@link KTable} state.
+     * This default implementation does not handle out-of-order records in 
either input of the join well.
+     * See {@link #join(KTable, ValueJoiner, Joined)} on how to configure a 
stream-table join to handle out-of-order
+     * data.
+     *
+     * <p>Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
+     * partitions (cf. {@link #join(GlobalKTable, KeyValueMapper, 
ValueJoiner)}).
+     * If this is not the case (and if not auto-repartitioning happens for the 
stream, see further below), you would
+     * need to call {@link #repartition(Repartitioned)} for this {@code 
KStream} before doing the join, specifying the
+     * same number of partitions via {@link Repartitioned} parameter as the 
given {@link KTable}.
+     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner).
+     * Note: Kafka Streams cannot verify the used partitioning strategy, so it 
is the user's responsibility to ensure
+     * that the same partitioner is used for both input for the join.
+     *
+     * <p>If a key changing operator was used on this {@code KStream} before 
this operation
+     * (e.g., {@link #selectKey(KeyValueMapper)}, {@link 
#map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
+     * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will 
automatically repartition the data of this
+     * {@code KStream}, i.e., it will create an internal repartitioning topic 
in Kafka and write and re-read
+     * the data via this topic such that data is correctly partitioned by the 
{@link KTable}'s key.
+     *
+     * <p>The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition",
+     * where "applicationId" is user-specified in {@link StreamsConfig} via 
parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "&lt;name&gt;" is an internally generated name, and "-repartition" is a 
fixed suffix.
+     * The number of partitions for the repartition topic is determined based 
on number of partitions of the
      * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
+     * Furthermore, the topic(s) will be created with infinite retention time 
and data will be automatically purged
+     * by Kafka Streams.
+     *
+     * <p>You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
+     * To explicitly set key/value serdes or to customize the names of the 
repartition topic,
+     * use {@link #join(KTable, ValueJoiner, Joined)}.
+     * For more control over the repartitioning, use {@link 
#repartition(Repartitioned)} before {@code join()}.
+     *
+     * @param table
+     *        the {@link KTable} to be joined with this stream
+     * @param joiner
+     *        a {@link ValueJoiner} that computes the join result for a pair 
of matching records
+     *
+     * @param <VTable> the value type of the table
+     * @param <VOut> the value type of the result stream
+     *
+     * @return A {@code KStream} that contains join-records for each key and 
values computed by the given
+     *         {@link ValueJoiner}, one for each matched record-pair with the 
same key
      *
-     * @param table  the {@link KTable} to be joined with this stream
-     * @param joiner a {@link ValueJoiner} that computes the join result for a 
pair of matching records
-     * @param <VT>   the value type of the table
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoiner}, one for each matched record-pair with the same key
      * @see #leftJoin(KTable, ValueJoiner)
-     * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
      */
-    <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
-                                 final ValueJoiner<? super V, ? super VT, ? 
extends VR> joiner);
+    <VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table,
+                                         final ValueJoiner<? super V, ? super 
VTable, ? extends VOut> joiner);
 
     /**
-     * Join records of this stream with {@link KTable}'s records using 
non-windowed inner equi join with default
-     * serializers and deserializers.
-     * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
-     * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
-     * This is done by performing a lookup for matching records in the 
<em>current</em> (i.e., processing time) internal
-     * {@link KTable} state.
-     * In contrast, processing {@link KTable} input records will only update 
the internal {@link KTable} state and
-     * will not produce any result records.
-     * <p>
-     * For each {@code KStream} record that finds a corresponding record in 
{@link KTable} the provided
-     * {@link ValueJoinerWithKey} will be called to compute a value (with 
arbitrary type) for the result record.
-     * Note that the key is read-only and should not be modified, as this can 
lead to undefined behaviour.
+     * See {@link #join(KTable, ValueJoiner)}.
      *
-     * The key of the result record is the same as for both joining input 
records.
-     * If an {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
-     * operation and thus no output record will be added to the resulting 
{@code KStream}.
-     * <p>
-     * Example:
-     * <table border='1'>
-     * <tr>
-     * <th>KStream</th>
-     * <th>KTable</th>
-     * <th>state</th>
-     * <th>result</th>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:A&gt;</td>
-     * <td></td>
-     * <td></td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:C&gt;</td>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:ValueJoinerWithKey(K1,C,b)&gt;</td>
-     * </tr>
-     * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
-     * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
-     *
-     * @param table  the {@link KTable} to be joined with this stream
-     * @param joiner a {@link ValueJoinerWithKey} that computes the join 
result for a pair of matching records
-     * @param <VT>   the value type of the table
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoinerWithKey}, one for each matched record-pair with the 
same key
-     * @see #leftJoin(KTable, ValueJoinerWithKey)
-     * @see #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
+     * <p>Note that the key is read-only and must not be modified, as this can 
lead to corrupt partitioning.
      */
-    <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
-                                 final ValueJoinerWithKey<? super K, ? super 
V, ? super VT, ? extends VR> joiner);
+    <VTable, VOut> KStream<K, VOut> join(final KTable<K, VTable> table,
+                                         final ValueJoinerWithKey<? super K, ? 
super V, ? super VTable, ? extends VOut> joiner);
 
     /**
-     * Join records of this stream with {@link KTable}'s records using 
non-windowed inner equi join with default
-     * serializers and deserializers.
-     * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
-     * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
-     * This is done by performing a lookup for matching records in the 
<em>current</em> (i.e., processing time) internal
-     * {@link KTable} state.
-     * In contrast, processing {@link KTable} input records will only update 
the internal {@link KTable} state and
-     * will not produce any result records.
-     * <p>
-     * For each {@code KStream} record that finds a corresponding record in 
{@link KTable} the provided
-     * {@link ValueJoiner} will be called to compute a value (with arbitrary 
type) for the result record.
-     * The key of the result record is the same as for both joining input 
records.
-     * If an {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
-     * operation and thus no output record will be added to the resulting 
{@code KStream}.
-     * <p>
-     * Example:
-     * <table border='1'>
-     * <tr>
-     * <th>KStream</th>
-     * <th>KTable</th>
-     * <th>state</th>
-     * <th>result</th>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:A&gt;</td>
-     * <td></td>
-     * <td></td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:C&gt;</td>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
-     * </tr>
-     * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
-     * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
+     * Join records of this stream with {@link KTable}'s records using 
non-windowed inner equi join.
+     * In contrast to {@link #join(KTable, ValueJoiner)}, but only if the used 
{@link KTable} is backed by a
+     * {@link org.apache.kafka.streams.state.VersionedKeyValueStore 
VersionedKeyValueStore}, the additional
+     * {@link Joined} parameter allows to specify a join grace-period, to 
handle out-of-order data gracefully.
      *
-     * @param table  the {@link KTable} to be joined with this stream
-     * @param joiner a {@link ValueJoiner} that computes the join result for a 
pair of matching records
-     * @param joined      a {@link Joined} instance that defines the serdes to
-     *                    be used to serialize/deserialize inputs of the 
joined streams
-     * @param <VT>   the value type of the table
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoiner}, one for each matched record-pair with the same key
-     * @see #leftJoin(KTable, ValueJoiner, Joined)
-     * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
+     * <p>For details about stream-table semantics, including co-partitioning 
requirements, (auto-)repartitioning,
+     * and more see {@link #join(KTable, ValueJoiner)}.
+     * If you specify a grace-period to handle out-of-order data, see further 
details below.
+     *
+     * <p>To handle out-of-order records, the input {@link KTable} must use a
+     * {@link org.apache.kafka.streams.state.VersionedKeyValueStore 
VersionedKeyValueStore} (specified via a
+     * {@link Materialized} parameter when the {@link KTable} is created), and 
a join
+     * {@link Joined#withGracePeriod(Duration) grace-period} must be specified.
+     * For this case, {@code KStream} records are buffered until the end of 
the grace period and the {@link KTable}
+     * lookup is performed with some delay.
+     * Given that the {@link KTable} state is versioned, the lookup can use 
"event time", allowing out-of-order
+     * {@code KStream} records, to join to the right (older) version of a 
{@link KTable} record with the same key.
+     * Also, {@link KTable} out-of-order updates are handled correctly by the 
versioned state store.
+     * Note, that using a join grace-period introduces the notion of "late 
records", i.e., records with a timestamp
+     * smaller than the defined grace-period allows; these "late records" will 
be dropped, and not join computation
+     * is triggered.
+     * Using a versioned state store for the {@link KTable} also implies that 
the defined "history retention" provides
+     * a cut-off point, and "late record" will be dropped, not updating the 
{@link KTable} state.
+
+     * <p>If a join grace-period is specified, the {@code KStream} will be 
materialized in a local state store.

Review Comment:
   It's this way on purpose. Note, it's about `KStream` side materialization, 
and we only materialize the stream, if a grace-period is specific -- but we 
only allow this, if the `KTable` is versioned.
   
   For regular/non-versioned KTables, using a grace-period and 
materializing/buffering the stream records does not make sense -- if there not 
multiple versions per-key, there is no point in delaying the KTable lookup.
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to