Thank you for your response Sophie. What confuses me is that I have already implemented this same pattern (join to table, mutate entity, write back to table) in two other streams applications without any issue whatsoever. After thinking about it, I think the difference here is the input data. In the other applications, the mutations were caused by rest api invocations and were relatively low frequency. In this case, we have course grained buckets of data, semi-batch data, that is decomposed and flatMapped into a bunch of discreet events which are then effectively bursted to the events topic. In this particular case, I don't really care about latency *or* throughput; this is effectively a write-behind reporting application with no time sensitivity.
Anyway, I've considered another option; I might just change the payload of the KStream to KStream<UUID, List<ModificationEvent>>. The individual messages on that topic will be larger, but lower frequency. But that won't solve the problem of replaying from earlier offsets - which effectively will be another bursty scenario. If that doesn't work as I hope, then I'll drop down and implement a transformer that injects to the data store itself. That just feels icky though. On Fri, Oct 4, 2019 at 5:09 PM Sophie Blee-Goldman <sop...@confluent.io> wrote: > I think the issue here is that you're basically creating a cycle in your > streams topology, > which is generally supposed to be a DAG. If I understand correctly, rather > than writing > the new data to the underlying store you're sending it to the topic from > which the table > is built. Is that right? > > The problem is that for this to work, Streams has to recognize that you > sent an > update to the table while processing the stream, and has to somehow know > that > you want that update to be processed before the next record from the stream > is processed. > But there aren't any kind of guarantees as to which will be processed > first, for good reason -- > how should Streams know which order you want them to be processed? And the > update to > the table will have some nonzero latency as it has to go to the table's > topic, then be read from > the topic by Streams. Is it supposed to block all processing until it's > received an update to the table? > > All that said, what you're trying to do is not impossible, though you might > need to use the > lower-level Processor API (or attach a transformer). Instead of trying to > write to the table > and then hope streams will process the update first, you can attach your > own state store > and write directly to it. Then, any subsequent reads will reflect the > latest updates. > > Hope that makes sense! > Sophie > > On Fri, Oct 4, 2019 at 2:33 PM Boyang Chen <reluctanthero...@gmail.com> > wrote: > > > Hey Trey, > > > > as I was reading, several suggestions I have are: > > > > 1. Could you revert 0ms commit interval to default? It will not help with > > the situation as you will try to commit on every poll() > > 2. I couldn't know how you actually write your code, but you could try > > something really simple as print statement within the join operation to > see > > if your application is actually taking incoming traffic. If you have > > metrics exported, that would also be useful. > > > > Boyang > > > > On Fri, Oct 4, 2019 at 6:42 AM Trey Hutcheson <trey.hutche...@gmail.com> > > wrote: > > > > > This is my third kafka streams application and I'd thought I had gotten > > to > > > know the warts and how to use it correctly. But I'm beating my head > > against > > > something that I just cannot explain. Values written to a table, when > > later > > > read back in a join operation, are stale. > > > > > > Assume the following simplified domain: > > > ModificationEvent - describes some mutation in the system > > > DetailRecord - a detailed record of some occurrence, contains some > > > metadata, and all of the modification events that occurred as part of > the > > > occurrence/incident. > > > > > > Very simple topology: > > > KStream<UUID, ModificationEvent> > > > KTable<UUID, DetailRecord> > > > The UUID, in this case, is the id of the incident/occurrence being > > > reported. When there is a "terminal" event, the DetailRecord will be > > > shipped to an external system. > > > > > > So I have a very very simple transformation (pseudo kotlin): > > > eventsStream.join(detailTable) { event, detailRecord -> > > > detailRecord.events.add(event) > > > return detailRecord > > > }.to(detailTable) > > > > > > So the ModificationEvent's stream is joined to the DetailRecord's > table, > > > and the ModificationEvent is appended to the end of the DetailRecord, > > which > > > is then written back to the table. > > > > > > However, on the next modification event for the same detail record, the > > > detail record is stale and it's list of events is empty. Whats going on > > > here? > > > > > > I've tried temporarily disabling record caching (didn't think that was > > the > > > issue), and even setting the offset commit interval to 0 ms (again > didn't > > > think this was the issue). Neither had an effect, other than slowing > the > > > stream. > > > > > > Definitely need some help here. > > > Trey > > > > > >