Right now I am thinking of re-writing anything that has these problematic
KStream/KTable joins as KStream#transform() wherein the state store is
manually used. Does that makes sense as an option for me?

-Dmitry

On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <dminkov...@gmail.com>
wrote:

> Earlier today I posted this question to SO
> <https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly>
> :
>
> > I have a topology that looks like this:
>
>     KTable<ByteString, User> users = topology.table(USERS,
> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>
>     KStream<ByteString, JoinRequest> joinRequests =
> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
> joinRequestSerde))
>         .mapValues(entityTopologyProcessor::userNew)
>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>
>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
>         .join(users, entityTopologyProcessor::userSettingsConfirm,
> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>
>     topology.stream(SETTINGS_UPDATE_REQUESTS,
> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
>         .join(users, entityTopologyProcessor::userSettingsUpdate,
> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>
> > At runtime this topology works fine. Users are created with join
> requests. They confirm their settings with settings confirm requests. They
> update their settings with settings update requests.
> >
> > However, reprocessing this topology does not produce the original
> results. Specifically, the settings update joiner does not see the user
> that resulted from the settings confirm joiner, even though in terms of
> timestamps, many seconds elapse from the time the user is created, to the
> time the user is confirmed to the time the user updates their settings.
> >
> > I'm at a loss. I've tried turning off caching/logging on the user table.
> No idea what to do to make this reprocess properly.
>
> ----
>
> The response by Matthias, also on SO:
>
> > A KStream-KTable join is not 100% deterministic (and might never become
> 100% deterministic). We are aware of the problem and discuss solutions, to
> at least mitigate the issue.
> >
> > One problem is, that if a Consumer fetches from the brokers, we cannot
> control easily for which topics and/or partitions the broker returns data.
> And depending on the order in which we receive data from the broker, the
> result might slightly differ.
> >
> > One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
> >
> > This blog post might help, too: https://www.confluent.io/blog/
> crossing-streams-joins-apache-kafka/
>
> ----
>
> I don't really know what to do with this response. I have been aware of
> some "slight" discrepancy that might occur in edge cases with
> KStream-KTable joins for some time now, but what I'm seeing is not a slight
> discrepancy but very different results.
>
> I looked at the JIRA Matthias linked
> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data has
> no late arriving records. I don't know about the empty buffers. I have read
> the blog post he linked several times already.
>
> Can someone please suggest how I may obviate this problem? For example
>
>    - Would it make sense for me to try launching the topology with fewer
>    threads during the reprocess?
>    - Would it make sense for launch the topology with fewer input tasks?
>    - Would it make sense to increase size of the stream buffer?
>
> I am at a total loss at this point. I cannot believe that there is nothing
> I can do to replay this data and perform the migration I am trying to
> perform, in order to release a next version of my application. Am I totally
> screwed?
>
>
> Thank you,
> Dmitry
>
>
>
>

Reply via email to