Right now I am thinking of re-writing anything that has these problematic KStream/KTable joins as KStream#transform() wherein the state store is manually used. Does that makes sense as an option for me?
-Dmitry On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <dminkov...@gmail.com> wrote: > Earlier today I posted this question to SO > <https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly> > : > > > I have a topology that looks like this: > > KTable<ByteString, User> users = topology.table(USERS, > Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS)); > > KStream<ByteString, JoinRequest> joinRequests = > topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde, > joinRequestSerde)) > .mapValues(entityTopologyProcessor::userNew) > .to(USERS, Produced.with(byteStringSerde, userSerde)); > > topology.stream(SETTINGS_CONFIRM_REQUESTS, > Consumed.with(byteStringSerde, settingsConfirmRequestSerde)) > .join(users, entityTopologyProcessor::userSettingsConfirm, > Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde)) > .to(USERS, Produced.with(byteStringSerde, userSerde)); > > topology.stream(SETTINGS_UPDATE_REQUESTS, > Consumed.with(byteStringSerde, settingsUpdateRequestSerde)) > .join(users, entityTopologyProcessor::userSettingsUpdate, > Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde)) > .to(USERS, Produced.with(byteStringSerde, userSerde)); > > > At runtime this topology works fine. Users are created with join > requests. They confirm their settings with settings confirm requests. They > update their settings with settings update requests. > > > > However, reprocessing this topology does not produce the original > results. Specifically, the settings update joiner does not see the user > that resulted from the settings confirm joiner, even though in terms of > timestamps, many seconds elapse from the time the user is created, to the > time the user is confirmed to the time the user updates their settings. > > > > I'm at a loss. I've tried turning off caching/logging on the user table. > No idea what to do to make this reprocess properly. > > ---- > > The response by Matthias, also on SO: > > > A KStream-KTable join is not 100% deterministic (and might never become > 100% deterministic). We are aware of the problem and discuss solutions, to > at least mitigate the issue. > > > > One problem is, that if a Consumer fetches from the brokers, we cannot > control easily for which topics and/or partitions the broker returns data. > And depending on the order in which we receive data from the broker, the > result might slightly differ. > > > > One related issue: https://issues.apache.org/jira/browse/KAFKA-3514 > > > > This blog post might help, too: https://www.confluent.io/blog/ > crossing-streams-joins-apache-kafka/ > > ---- > > I don't really know what to do with this response. I have been aware of > some "slight" discrepancy that might occur in edge cases with > KStream-KTable joins for some time now, but what I'm seeing is not a slight > discrepancy but very different results. > > I looked at the JIRA Matthias linked > <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data has > no late arriving records. I don't know about the empty buffers. I have read > the blog post he linked several times already. > > Can someone please suggest how I may obviate this problem? For example > > - Would it make sense for me to try launching the topology with fewer > threads during the reprocess? > - Would it make sense for launch the topology with fewer input tasks? > - Would it make sense to increase size of the stream buffer? > > I am at a total loss at this point. I cannot believe that there is nothing > I can do to replay this data and perform the migration I am trying to > perform, in order to release a next version of my application. Am I totally > screwed? > > > Thank you, > Dmitry > > > >