Hi Matthias Do you know when the 3.1 version is going to be released?
I noticed the JoinWindows class has a boolean property called enableSpuriousResultFix If I extend the class the set that flag to true will it eliminate spurious messages in kafka streams 3.0.0 ? thanks - Miguel On Mon, Dec 6, 2021 at 2:49 PM Matthias J. Sax <mj...@apache.org> wrote: > It's fixed in upcoming 3.1 release. > > Cf https://issues.apache.org/jira/browse/KAFKA-10847 > > > A stream-(global)table join has different semantics, so I am not sure if > it would help. > > One workaround would be to apply a stateful` faltTransformValues()` > after the join to "buffer" all NULL-results and only emit them after you > know no consecutive inner-join result will happen. It's tricky to build > though. > > I would recommend to wait and upgrade to 3.1 after it was releases. > > > -Matthias > > On 11/30/21 12:59 AM, Luke Chen wrote: > > Hi Miguel, > >> Is there a way to force the behavior I need, meaning... using left join > > and > > a JoinWindows output only one message (A,B) or (A, null) > > > > I think you can try to achieve it by using *KStream-GlobalKTable left > join*, > > where the GlobalKTable should read all records at the right topic, and > then > > doing the left join operation. This should then output either (A,B), or > (A, > > null). > > > > Thank you. > > Luke > > > > On Tue, Nov 30, 2021 at 1:23 AM Miguel González <miguel.gonza...@klar.mx > > > > wrote: > > > >> Hello > >> > >> I have been developing a Kafka Streams app that takes as input two > topics > >> as KStreams, processes them in some way and joins them and sends the > >> combined message to an output topic. > >> > >> Here's some code, > >> > >> final StreamJoined<String, TransactionEvent, BalanceEvent> joinParams = > >> StreamJoined.with( > >> STRING_SERDE, > >> StreamSerdeConstants.TRANSACTION_EVENT_SERDE, > >> StreamSerdeConstants.BALANCE_EVENT_SERDE); > >> > >> JoinWindows joinWindows = JoinWindows > >> .of(Duration.ofSeconds(streamsProperties.getJoinWindowDuration())) > >> > .grace(Duration.ofSeconds(streamsProperties.getJoinGraceDuration())); > >> > >> ValueJoiner<TransactionEvent, BalanceEvent, BalanceHistoryEvent> > >> valueJoiner = > >> (transactionEvent, balanceEvent) -> buildMessage(balanceEvent, > >> transactionEvent); > >> > >> > >> transactions > >> // TODO: change to leftJoin > >> .join(beWithTransaction, valueJoiner, joinWindows, joinParams) > >> > >> > >> It's pretty simple, but for my use case I need to process in some way > the > >> messages that are not joined, so I thought I could use a LEFT JOIN. But > >> according to my tests and this documentation > >> https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/ > >> > >> I have seen in the end I could end up with both the combined message as > the > >> regular inner join performs and the message with one side as NULL, for > >> example (A,B) and (A, null) > >> > >> I thought the JOIN Window could force the output of the left join to > just > >> output if it found a match to just (A,B) not both. Maybe I have a bug > in my > >> Window configuration > >> > >> Is there a way to force the behavior I need, meaning... using left join > and > >> a JoinWindows output only one message (A,B) or (A, null) > >> > >> regards > >> - Miguel > >> > > >