Thanks, yes try replacing the KStream-KTable joins with
KStream#transform()s and a store. Not sure why you mean I’d need to buffer
multiple records. The KStream has incoming events, and #transform() will
let me mount the store and use it how I please. Within an application
instance, any other KStream#transform()s using the same store will see the
same data in real time.

Now suppose I have three topics, each with events like this, each on their
own KStream:

T1 join
T2 settings-confirm
T3 settings-update

Will the topology call the join transform before the settings-confirm
transform before the settings-update transform?



вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax <matth...@confluent.io>:

> You have more flexibility of course and thus can get better results. But
> your code must be able to buffer multiple records from the KTable and
> KStream input and also store the corresponding timestamps to perform the
> join correctly. It's not trivial but also also not rocket-science.
>
> If we need stronger guarantees, it's the best way to follow though atm,
> until we have addressed those issues. Planned for 1.2.0 release.
>
> -Matthias
>
>
> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
> > Right now I am thinking of re-writing anything that has these problematic
> > KStream/KTable joins as KStream#transform() wherein the state store is
> > manually used. Does that makes sense as an option for me?
> >
> > -Dmitry
> >
> > On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <dminkov...@gmail.com>
> > wrote:
> >
> >> Earlier today I posted this question to SO
> >> <
> https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly
> >
> >> :
> >>
> >>> I have a topology that looks like this:
> >>
> >>     KTable<ByteString, User> users = topology.table(USERS,
> >> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
> >>
> >>     KStream<ByteString, JoinRequest> joinRequests =
> >> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
> >> joinRequestSerde))
> >>         .mapValues(entityTopologyProcessor::userNew)
> >>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>
> >>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
> >> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
> >>         .join(users, entityTopologyProcessor::userSettingsConfirm,
> >> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
> >>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>
> >>     topology.stream(SETTINGS_UPDATE_REQUESTS,
> >> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
> >>         .join(users, entityTopologyProcessor::userSettingsUpdate,
> >> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
> >>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>
> >>> At runtime this topology works fine. Users are created with join
> >> requests. They confirm their settings with settings confirm requests.
> They
> >> update their settings with settings update requests.
> >>>
> >>> However, reprocessing this topology does not produce the original
> >> results. Specifically, the settings update joiner does not see the user
> >> that resulted from the settings confirm joiner, even though in terms of
> >> timestamps, many seconds elapse from the time the user is created, to
> the
> >> time the user is confirmed to the time the user updates their settings.
> >>>
> >>> I'm at a loss. I've tried turning off caching/logging on the user
> table.
> >> No idea what to do to make this reprocess properly.
> >>
> >> ----
> >>
> >> The response by Matthias, also on SO:
> >>
> >>> A KStream-KTable join is not 100% deterministic (and might never become
> >> 100% deterministic). We are aware of the problem and discuss solutions,
> to
> >> at least mitigate the issue.
> >>>
> >>> One problem is, that if a Consumer fetches from the brokers, we cannot
> >> control easily for which topics and/or partitions the broker returns
> data.
> >> And depending on the order in which we receive data from the broker, the
> >> result might slightly differ.
> >>>
> >>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
> >>>
> >>> This blog post might help, too: https://www.confluent.io/blog/
> >> crossing-streams-joins-apache-kafka/
> >>
> >> ----
> >>
> >> I don't really know what to do with this response. I have been aware of
> >> some "slight" discrepancy that might occur in edge cases with
> >> KStream-KTable joins for some time now, but what I'm seeing is not a
> slight
> >> discrepancy but very different results.
> >>
> >> I looked at the JIRA Matthias linked
> >> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data
> has
> >> no late arriving records. I don't know about the empty buffers. I have
> read
> >> the blog post he linked several times already.
> >>
> >> Can someone please suggest how I may obviate this problem? For example
> >>
> >>    - Would it make sense for me to try launching the topology with fewer
> >>    threads during the reprocess?
> >>    - Would it make sense for launch the topology with fewer input tasks?
> >>    - Would it make sense to increase size of the stream buffer?
> >>
> >> I am at a total loss at this point. I cannot believe that there is
> nothing
> >> I can do to replay this data and perform the migration I am trying to
> >> perform, in order to release a next version of my application. Am I
> totally
> >> screwed?
> >>
> >>
> >> Thank you,
> >> Dmitry
> >>
> >>
> >>
> >>
> >
>
>

Reply via email to