I meant “Thanks, yes I will try replacing...” вт, 16 янв. 2018 г. в 22:12, Dmitry Minkovsky <dminkov...@gmail.com>:
> Thanks, yes try replacing the KStream-KTable joins with > KStream#transform()s and a store. Not sure why you mean I’d need to buffer > multiple records. The KStream has incoming events, and #transform() will > let me mount the store and use it how I please. Within an application > instance, any other KStream#transform()s using the same store will see the > same data in real time. > > Now suppose I have three topics, each with events like this, each on their > own KStream: > > T1 join > T2 settings-confirm > T3 settings-update > > Will the topology call the join transform before the settings-confirm > transform before the settings-update transform? > > > > вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax <matth...@confluent.io>: > >> You have more flexibility of course and thus can get better results. But >> your code must be able to buffer multiple records from the KTable and >> KStream input and also store the corresponding timestamps to perform the >> join correctly. It's not trivial but also also not rocket-science. >> >> If we need stronger guarantees, it's the best way to follow though atm, >> until we have addressed those issues. Planned for 1.2.0 release. >> >> -Matthias >> >> >> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote: >> > Right now I am thinking of re-writing anything that has these >> problematic >> > KStream/KTable joins as KStream#transform() wherein the state store is >> > manually used. Does that makes sense as an option for me? >> > >> > -Dmitry >> > >> > On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <dminkov...@gmail.com >> > >> > wrote: >> > >> >> Earlier today I posted this question to SO >> >> < >> https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly >> > >> >> : >> >> >> >>> I have a topology that looks like this: >> >> >> >> KTable<ByteString, User> users = topology.table(USERS, >> >> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS)); >> >> >> >> KStream<ByteString, JoinRequest> joinRequests = >> >> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde, >> >> joinRequestSerde)) >> >> .mapValues(entityTopologyProcessor::userNew) >> >> .to(USERS, Produced.with(byteStringSerde, userSerde)); >> >> >> >> topology.stream(SETTINGS_CONFIRM_REQUESTS, >> >> Consumed.with(byteStringSerde, settingsConfirmRequestSerde)) >> >> .join(users, entityTopologyProcessor::userSettingsConfirm, >> >> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde)) >> >> .to(USERS, Produced.with(byteStringSerde, userSerde)); >> >> >> >> topology.stream(SETTINGS_UPDATE_REQUESTS, >> >> Consumed.with(byteStringSerde, settingsUpdateRequestSerde)) >> >> .join(users, entityTopologyProcessor::userSettingsUpdate, >> >> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde)) >> >> .to(USERS, Produced.with(byteStringSerde, userSerde)); >> >> >> >>> At runtime this topology works fine. Users are created with join >> >> requests. They confirm their settings with settings confirm requests. >> They >> >> update their settings with settings update requests. >> >>> >> >>> However, reprocessing this topology does not produce the original >> >> results. Specifically, the settings update joiner does not see the user >> >> that resulted from the settings confirm joiner, even though in terms of >> >> timestamps, many seconds elapse from the time the user is created, to >> the >> >> time the user is confirmed to the time the user updates their settings. >> >>> >> >>> I'm at a loss. I've tried turning off caching/logging on the user >> table. >> >> No idea what to do to make this reprocess properly. >> >> >> >> ---- >> >> >> >> The response by Matthias, also on SO: >> >> >> >>> A KStream-KTable join is not 100% deterministic (and might never >> become >> >> 100% deterministic). We are aware of the problem and discuss >> solutions, to >> >> at least mitigate the issue. >> >>> >> >>> One problem is, that if a Consumer fetches from the brokers, we cannot >> >> control easily for which topics and/or partitions the broker returns >> data. >> >> And depending on the order in which we receive data from the broker, >> the >> >> result might slightly differ. >> >>> >> >>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514 >> >>> >> >>> This blog post might help, too: https://www.confluent.io/blog/ >> >> crossing-streams-joins-apache-kafka/ >> >> >> >> ---- >> >> >> >> I don't really know what to do with this response. I have been aware of >> >> some "slight" discrepancy that might occur in edge cases with >> >> KStream-KTable joins for some time now, but what I'm seeing is not a >> slight >> >> discrepancy but very different results. >> >> >> >> I looked at the JIRA Matthias linked >> >> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data >> has >> >> no late arriving records. I don't know about the empty buffers. I have >> read >> >> the blog post he linked several times already. >> >> >> >> Can someone please suggest how I may obviate this problem? For example >> >> >> >> - Would it make sense for me to try launching the topology with >> fewer >> >> threads during the reprocess? >> >> - Would it make sense for launch the topology with fewer input >> tasks? >> >> - Would it make sense to increase size of the stream buffer? >> >> >> >> I am at a total loss at this point. I cannot believe that there is >> nothing >> >> I can do to replay this data and perform the migration I am trying to >> >> perform, in order to release a next version of my application. Am I >> totally >> >> screwed? >> >> >> >> >> >> Thank you, >> >> Dmitry >> >> >> >> >> >> >> >> >> > >> >>