Hi Matthias

Do you know when the 3.1 version is going to be released?

I noticed the JoinWindows class has a boolean property called
enableSpuriousResultFix

If I extend the class the set that flag to true will it eliminate spurious
messages in kafka streams 3.0.0 ?


thanks
- Miguel


On Mon, Dec 6, 2021 at 2:49 PM Matthias J. Sax <mj...@apache.org> wrote:

> It's fixed in upcoming 3.1 release.
>
> Cf https://issues.apache.org/jira/browse/KAFKA-10847
>
>
> A stream-(global)table join has different semantics, so I am not sure if
> it would help.
>
> One workaround would be to apply a stateful` faltTransformValues()`
> after the join to "buffer" all NULL-results and only emit them after you
> know no consecutive inner-join result will happen. It's tricky to build
> though.
>
> I would recommend to wait and upgrade to 3.1 after it was releases.
>
>
> -Matthias
>
> On 11/30/21 12:59 AM, Luke Chen wrote:
> > Hi Miguel,
> >> Is there a way to force the behavior I need, meaning... using left join
> > and
> > a JoinWindows output only one message (A,B) or (A, null)
> >
> > I think you can try to achieve it by using *KStream-GlobalKTable left
> join*,
> > where the GlobalKTable should read all records at the right topic, and
> then
> > doing the left join operation. This should then output either (A,B), or
> (A,
> > null).
> >
> > Thank you.
> > Luke
> >
> > On Tue, Nov 30, 2021 at 1:23 AM Miguel González <miguel.gonza...@klar.mx
> >
> > wrote:
> >
> >> Hello
> >>
> >> I have been developing a Kafka Streams app that takes as input two
> topics
> >> as KStreams, processes them in some way and joins them and sends the
> >> combined message to an output topic.
> >>
> >> Here's some code,
> >>
> >> final StreamJoined<String, TransactionEvent, BalanceEvent> joinParams =
> >>      StreamJoined.with(
> >>          STRING_SERDE,
> >>          StreamSerdeConstants.TRANSACTION_EVENT_SERDE,
> >>          StreamSerdeConstants.BALANCE_EVENT_SERDE);
> >>
> >> JoinWindows joinWindows = JoinWindows
> >>      .of(Duration.ofSeconds(streamsProperties.getJoinWindowDuration()))
> >>
> .grace(Duration.ofSeconds(streamsProperties.getJoinGraceDuration()));
> >>
> >> ValueJoiner<TransactionEvent, BalanceEvent, BalanceHistoryEvent>
> >> valueJoiner =
> >>      (transactionEvent, balanceEvent) -> buildMessage(balanceEvent,
> >> transactionEvent);
> >>
> >>
> >> transactions
> >>      // TODO: change to leftJoin
> >>      .join(beWithTransaction, valueJoiner, joinWindows, joinParams)
> >>
> >>
> >> It's pretty simple, but for my use case I need to process in some way
> the
> >> messages that are not joined, so I thought I could use a LEFT JOIN. But
> >> according to my tests and this documentation
> >> https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
> >>
> >> I have seen in the end I could end up with both the combined message as
> the
> >> regular inner join performs and the message with one side as NULL, for
> >> example (A,B) and (A, null)
> >>
> >> I thought the JOIN Window could force the output of the left join to
> just
> >> output if it found a match to just (A,B) not both. Maybe I have a bug
> in my
> >> Window configuration
> >>
> >> Is there a way to force the behavior I need, meaning... using left join
> and
> >> a JoinWindows output only one message (A,B) or (A, null)
> >>
> >> regards
> >> - Miguel
> >>
> >
>

Reply via email to