Hi again (again), Ties, Sorry for the confusion, but I was talking to someone else about this, and I started to make a ticket to fix it, and realized once I started looking into it that there is actually no repartition topic for a stream-globalTable join.
So, if you do something like: ===== public static void main(String[] args) { final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<Object, Object> left = streamsBuilder.stream("left").selectKey((ok, ov) -> "newK"+ok); final GlobalKTable<Object, Object> right = streamsBuilder.globalTable("right"); final KStream<Object, KeyValue<Object, Object>> join = left.join(right, (ok, ov) -> ok, KeyValue::new); join.to("out"); final Topology build = streamsBuilder.build(); System.out.println(build.describe()); } ==== (namely, the selectKey on the stream) Then, you should get the result you expect. Sorry again for my multiple replies. -John On Mon, Jul 15, 2019 at 11:35 AM John Roesler <j...@confluent.io> wrote: > > Hi again, Ties, > > I think I spoke too soon and also misread your email. > > By any chance, are you doing a join of a KStream and a GlobalKTable? > > In this case, it would make perfect sense to do what you're doing, but > unfortunately the current implementation doesn't support it. > > Your workaround would be to use KStream.selectKey on the left side to > pick a key before the join. Unfortunately, this will create a > repartition topic that is unnecessary when you're joining with a > GlobalKTable. > > On the other hand, you could at that point switch to a regular > KStream/KTable join and reduce the memory/storage requirements, as > each node won't have to host the whole global data set anymore. > > Please feel free to share your code in some form to clear up the > situation in case I got it wrong again. > > Thanks, > -John > > On Mon, Jul 15, 2019 at 10:48 AM John Roesler <j...@confluent.io> wrote: > > > > Hi Ties, > > > > You're on the right track. You need to use `KTable.map` ahead of the > > join to select the new key. This will allow Streams to make sure the > > data is correctly partitioned to perform the join. > > > > Thanks, > > -John > > > > On Mon, Jul 15, 2019 at 10:07 AM Ven, Ties Jens van de > > <ties.van.de....@alliander.com> wrote: > > > > > > I recently started working with kafka streams and I noticed some odd > > > behavior. > > > > > > I was using a KTable left join with a null key, and ofcourse this will > > > not work, since it will join based on keys. > > > But I also supplied a KeyValueMapper, which takes a property from the > > > value and returns this as key, and uses this value to join. > > > > > > It turns out that in the code, it firsts checks if there is a null key, > > > and if so, it skips. > > > Would it be more logical to check the result of the keyMapper for null > > > instead of the actual key? > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java > > > > > > Kind regards > > > > > > Ties