Hi Matthias
Thank you for your help. So, per my understanding maybe if I materialize
the first join and then use join it with the other topic, this should
change?
What I don't understand is why sending the old value AFTER the new value?
That still looks wrong to me. Specially because in a leftJoin we can't know
if this is an old or new value like we do in aggregations (where we have an
adder and a subtractor).

Anyway, I managed to overcome this with the help of this SO question:
https://stackoverflow.com/questions/51565727/kafka-stream-chained-leftjoin-processing-previous-old-message-again-after-the,
where there's a comment that says: " When joining, I need to initialize and
return a new object rather than assign value to the old object".

In my scenario, when I receive an update on the right table, I am UPDATING
the value from the left table, and returning it. Then the joinner would be
called again with the old value from the right table, and the left table's
value would be updated with the old, wrong value.
This can be fixed by cloning the value that will be modified, so that the
second call will not incorrectly modify it.
For reference, the KafkaStreams code that calls my joiner is this:


            newValue = joiner.apply(change.newValue, value2);


            *if* (sendOldValues) {

                oldValue = joiner.apply(change.oldValue, value2);

            }


            context().forward(key, *new* Change<>(newValue, oldValue));


https://github.com/axbaretto/kafka/blob/master/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java#L94

In my situation, value2 is modified inside "joiner" and returned (thus
assigned to newValue). Then on the second call when we change value2 again,
with the oldValue, we are also changing the newValue.

Although cloning value2 fixes my problem, I still think it's an issue.
Mainly because this behaviour doesn't affect single (non-cascading) left
joins, which sounds like an inconsistency to me. And also because the order
of the operations seems wrong to me. Calling a joiner with the old value
AFTER the new value, does not allow this function to know what's the last
known value for that message.
Still, even if you are not convinced this is an issue, I believe at least
the "leftJoin"'s Javadoc should mention the side effects of modifying and
returning the same instance received as argument.

Thanks
Murilo




On Fri, 1 Feb 2019 at 14:38, Matthias J. Sax <matth...@confluent.io> wrote:

> Sounds like expected behavior to me.
>
> Note, that by default, the result KTable for a KTable-KTable join is not
> materialized. To be able to compute the correct result for this case, we
> need to send the old and new join result downstream to allow the
> downstream join to compute the correct result. It's storage/computation
> trade-off.
>
> Does this answer your question?
>
>
> -Matthias
>
> On 1/29/19 1:51 PM, Murilo Tavares wrote:
> > Adding a bit more to this, it looks like an issue with cascading
> > leftJoins...
> > I can see that internally, the ValueJoiner is being called by a
> > KTableKTableRightJoin$KTableKTableRightJoinProcessor, with sendOldValues
> =
> > true.
> > Not quite sure, but it looks to me that when we create "tmp
> > = A.leftJoin(B)", internally KafkaStreams also creates B.rightJoin(A),
> and
> > sets sendOldValues on A. Later, by calling tmp.leftJoin(C), it could be
> > setting sendOldValues on B, or something like that...
> > Is there any known issues on cascading leftJoins?
> > Thanks
> > Murilo
> >
> >
> >
> > On Tue, 29 Jan 2019 at 14:10, Murilo Tavares <murilo...@gmail.com>
> wrote:
> >
> >> Hi
> >> I am trying to understand why a KTable to KTable left join is being
> called
> >> twice when I receive a message on the right table.
> >> Here is my Topology:
> >>
> >> Serde<Author> authorSerde = ...
> >> Serde<Set<Book>> bSetSerde = ...
> >> Serde<Set<AutorPublisherAssociation>> apSetSerde = ...
> >> KTable<String, Author> authorTable = builder.table(AUTHOR_TOPIC,
> >> Consumed.with(Serdes.String(), authorSerde));
> >> KTable<String, Set<Book>> booksByAuthorTable =
> >> builder.table(BOOKS_BY_AUTHOR,
> >> Consumed.with(Serdes.String(), bSetSerde));
> >> KTable<String, Set<AutorPublisherAssociation>> apTable =
> >> builder.table(PUBLISHER_ASSOCIATIONS_BY_AUTHOR,
> >> Consumed.with(Serdes.String(), apSetSerde));
> >> KTable<String, Author> enrichedAuthorTable =
> authorTable.leftJoin(apTable,
> >> (a,apSet) -> {
> >> if (apSet == null) {
> >> a.setPublishers(new HashSet<>());
> >> } else {
> >> a.setPublishers(apSet.stream().map(ap ->
> >> ap.getPublisher()).collect(Collectors.toSet()));
> >> }
> >> return a;
> >> }).leftJoin(booksByAuthorTable, (a, b)-> {
> >> a.setBooks(b);
> >> return a;
> >> });
> >> enrichedAuthorTable.toStream().to(ENRICHED_AUTHORS,
> >> Produced.with(Serdes.String(),authorSerde));
> >>
> >> Note I have 3 topics, all of them keyed by Author:
> >> - AUTHOR_TOPIC is keyed by the authorKey and has the Author message;
> >> - BOOKS_BY_AUTHOR is keyed by authorKey and has a Set of Books;
> >> - PUBLISHER_ASSOCIATIONS_BY_AUTHOR is keyed by authorKey and has a set
> of
> >> AutorPublisherAssociation (this is a Pojo that links one author to one
> >> publisher);
> >>
> >> Also note that the IF is intended to avoid NPEs, and also to deal with
> >> tombstones, where if I want to delete the list of publishers associated
> to
> >> an author, an tombstone to the PUBLISHER_ASSOCIATIONS topics would
> override
> >> the list of Publishers on the Author.
> >>
> >> In my simple testcase, I am not sending any updates, just one message on
> >> each topic, on this order: author, booksByAuthor, publisherByAuthor.
> >> When author arrives, both ValueJoiners are called with author message
> and
> >> null for the right table.
> >> When a set of books arrive, both joins will be called ONCE, the first
> >> joiner receives an author and null, the second joiner receives an author
> >> and the set of books.
> >> The problem comes next:
> >> When the set of  AutorPublisherAssociation arrives, the first
> ValueJoiner
> >> is called TWICE, one with author and apSet, and the second time it's
> called
> >> with author and null.
> >>
> >> I don't understand why in this scenario the ValueJoiner is called twice,
> >> with a null instead of the message at last, overriding the correct
> value.
> >>
> >> Thanks
> >> Murilo
> >>
> >>
> >>
> >
>
>

Reply via email to