Guozhang Wang created KAFKA-13371: ------------------------------------- Summary: Consider consolidating Joined / StreamJoined / TableJoined Key: KAFKA-13371 URL: https://issues.apache.org/jira/browse/KAFKA-13371 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang
This is an idea while reviewing KAFKA-13261 (adding TabledJoined). We have now three control objects: Joined, StreamJoined, TableJoined. All of them extends NamedOperations and hence has the `name` field inherited which would be used for the processor node's name and potentially store names. In addition to that * Joined: used in stream-table joins. Contains key and two value serdes used for serializing the bytes for repartitioning (however since today we only repartition one side if needed, the other value serde is never used). * StreamJoined: used in stream-stream joins. It includes the serdes, AND also the store suppliers and other control variables on the store names. * TableJoined: used in table-table foreign key joins. It does not include any serdes but includes the partitioner information. The main difference between these different constructs are: * KTables themselves have embedded a materialized mechanism via `valueGetterSupplier` whenever they are created, either from source, or from aggregate / join operators, so they do not need extra materialization indicators when participated in a follow-up join --- i.e. they either are already materialized from the operators that generate them, or they will "grandfather" back to the upstream KTable on the fly with a logical view when that view is being fetched via the `ValueGetterSupplier`. On the other hand, KStreams do not have materialization mechanism inherently and hence operators that do need to materialize the streams then need to provide such methods. * Table-table foreign-key join has a special needs for partitioners. [~vvcephei] has a good proposal for https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar and as part of that proposal we could consider adding partitioner for source streams / tables and inherit throughout the topology pipeline. Following that idea, we can consider consolidating the above "Joined" objects by isolating the materialization / partitioner variables. More specifically, here's a concrete proposal: 1) `StreamsBuilder.table/stream` would pass in an optional partitioner. 2) And similarly all operators that changes the key would allow an optional partitioner: 2.a) `KStream.repartition/groupBy` and `KTable.groupBy` would allow an optional partitioner in `Repartitioned`, as piggy-backed we would also deprecate `Grouped` with `Repartitioned` since the latter would subsume the former. 2.b) `KStream.map/flatMap/selectKey` stays as is, and similar to serdes, these operators would stop the inheritance of partitioners of the upstream entities. 3) `Repartition` would also add the key/value serdes used for serializing for the repartition topics. 4) `KStream.join(KTable)` and `KStream.join(KStream)` would pass in an optional `Repartitioned` in addition to `Joined` which can be used to encode the partitioner info. 5) Foreign-key `KTable.join(KTable)` would pass in an optional `Repartitioned` which can be used to encode the partitioner info. 7) As a result of all above points, we can then reduce `StreamJoined` / `TableJoined` / `Joined` since all their enwrapped control objects are not separated in `Repartitioned` and `Materialized`: note that for `StreamJoined`, the store suppliers / names / configs would now be wrapped in two Materialized objects which would still not be exposed for IQ. -- This message was sent by Atlassian Jira (v8.3.4#803005)