Thanks, yes try replacing the KStream-KTable joins with KStream#transform()s and a store. Not sure why you mean I’d need to buffer multiple records. The KStream has incoming events, and #transform() will let me mount the store and use it how I please. Within an application instance, any other KStream#transform()s using the same store will see the same data in real time.
Now suppose I have three topics, each with events like this, each on their own KStream: T1 join T2 settings-confirm T3 settings-update Will the topology call the join transform before the settings-confirm transform before the settings-update transform? вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax <matth...@confluent.io>: > You have more flexibility of course and thus can get better results. But > your code must be able to buffer multiple records from the KTable and > KStream input and also store the corresponding timestamps to perform the > join correctly. It's not trivial but also also not rocket-science. > > If we need stronger guarantees, it's the best way to follow though atm, > until we have addressed those issues. Planned for 1.2.0 release. > > -Matthias > > > On 1/16/18 5:34 PM, Dmitry Minkovsky wrote: > > Right now I am thinking of re-writing anything that has these problematic > > KStream/KTable joins as KStream#transform() wherein the state store is > > manually used. Does that makes sense as an option for me? > > > > -Dmitry > > > > On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <dminkov...@gmail.com> > > wrote: > > > >> Earlier today I posted this question to SO > >> < > https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly > > > >> : > >> > >>> I have a topology that looks like this: > >> > >> KTable<ByteString, User> users = topology.table(USERS, > >> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS)); > >> > >> KStream<ByteString, JoinRequest> joinRequests = > >> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde, > >> joinRequestSerde)) > >> .mapValues(entityTopologyProcessor::userNew) > >> .to(USERS, Produced.with(byteStringSerde, userSerde)); > >> > >> topology.stream(SETTINGS_CONFIRM_REQUESTS, > >> Consumed.with(byteStringSerde, settingsConfirmRequestSerde)) > >> .join(users, entityTopologyProcessor::userSettingsConfirm, > >> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde)) > >> .to(USERS, Produced.with(byteStringSerde, userSerde)); > >> > >> topology.stream(SETTINGS_UPDATE_REQUESTS, > >> Consumed.with(byteStringSerde, settingsUpdateRequestSerde)) > >> .join(users, entityTopologyProcessor::userSettingsUpdate, > >> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde)) > >> .to(USERS, Produced.with(byteStringSerde, userSerde)); > >> > >>> At runtime this topology works fine. Users are created with join > >> requests. They confirm their settings with settings confirm requests. > They > >> update their settings with settings update requests. > >>> > >>> However, reprocessing this topology does not produce the original > >> results. Specifically, the settings update joiner does not see the user > >> that resulted from the settings confirm joiner, even though in terms of > >> timestamps, many seconds elapse from the time the user is created, to > the > >> time the user is confirmed to the time the user updates their settings. > >>> > >>> I'm at a loss. I've tried turning off caching/logging on the user > table. > >> No idea what to do to make this reprocess properly. > >> > >> ---- > >> > >> The response by Matthias, also on SO: > >> > >>> A KStream-KTable join is not 100% deterministic (and might never become > >> 100% deterministic). We are aware of the problem and discuss solutions, > to > >> at least mitigate the issue. > >>> > >>> One problem is, that if a Consumer fetches from the brokers, we cannot > >> control easily for which topics and/or partitions the broker returns > data. > >> And depending on the order in which we receive data from the broker, the > >> result might slightly differ. > >>> > >>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514 > >>> > >>> This blog post might help, too: https://www.confluent.io/blog/ > >> crossing-streams-joins-apache-kafka/ > >> > >> ---- > >> > >> I don't really know what to do with this response. I have been aware of > >> some "slight" discrepancy that might occur in edge cases with > >> KStream-KTable joins for some time now, but what I'm seeing is not a > slight > >> discrepancy but very different results. > >> > >> I looked at the JIRA Matthias linked > >> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data > has > >> no late arriving records. I don't know about the empty buffers. I have > read > >> the blog post he linked several times already. > >> > >> Can someone please suggest how I may obviate this problem? For example > >> > >> - Would it make sense for me to try launching the topology with fewer > >> threads during the reprocess? > >> - Would it make sense for launch the topology with fewer input tasks? > >> - Would it make sense to increase size of the stream buffer? > >> > >> I am at a total loss at this point. I cannot believe that there is > nothing > >> I can do to replay this data and perform the migration I am trying to > >> perform, in order to release a next version of my application. Am I > totally > >> screwed? > >> > >> > >> Thank you, > >> Dmitry > >> > >> > >> > >> > > > >