I think the issue here is that you're basically creating a cycle in your streams topology, which is generally supposed to be a DAG. If I understand correctly, rather than writing the new data to the underlying store you're sending it to the topic from which the table is built. Is that right?
The problem is that for this to work, Streams has to recognize that you sent an update to the table while processing the stream, and has to somehow know that you want that update to be processed before the next record from the stream is processed. But there aren't any kind of guarantees as to which will be processed first, for good reason -- how should Streams know which order you want them to be processed? And the update to the table will have some nonzero latency as it has to go to the table's topic, then be read from the topic by Streams. Is it supposed to block all processing until it's received an update to the table? All that said, what you're trying to do is not impossible, though you might need to use the lower-level Processor API (or attach a transformer). Instead of trying to write to the table and then hope streams will process the update first, you can attach your own state store and write directly to it. Then, any subsequent reads will reflect the latest updates. Hope that makes sense! Sophie On Fri, Oct 4, 2019 at 2:33 PM Boyang Chen <reluctanthero...@gmail.com> wrote: > Hey Trey, > > as I was reading, several suggestions I have are: > > 1. Could you revert 0ms commit interval to default? It will not help with > the situation as you will try to commit on every poll() > 2. I couldn't know how you actually write your code, but you could try > something really simple as print statement within the join operation to see > if your application is actually taking incoming traffic. If you have > metrics exported, that would also be useful. > > Boyang > > On Fri, Oct 4, 2019 at 6:42 AM Trey Hutcheson <trey.hutche...@gmail.com> > wrote: > > > This is my third kafka streams application and I'd thought I had gotten > to > > know the warts and how to use it correctly. But I'm beating my head > against > > something that I just cannot explain. Values written to a table, when > later > > read back in a join operation, are stale. > > > > Assume the following simplified domain: > > ModificationEvent - describes some mutation in the system > > DetailRecord - a detailed record of some occurrence, contains some > > metadata, and all of the modification events that occurred as part of the > > occurrence/incident. > > > > Very simple topology: > > KStream<UUID, ModificationEvent> > > KTable<UUID, DetailRecord> > > The UUID, in this case, is the id of the incident/occurrence being > > reported. When there is a "terminal" event, the DetailRecord will be > > shipped to an external system. > > > > So I have a very very simple transformation (pseudo kotlin): > > eventsStream.join(detailTable) { event, detailRecord -> > > detailRecord.events.add(event) > > return detailRecord > > }.to(detailTable) > > > > So the ModificationEvent's stream is joined to the DetailRecord's table, > > and the ModificationEvent is appended to the end of the DetailRecord, > which > > is then written back to the table. > > > > However, on the next modification event for the same detail record, the > > detail record is stale and it's list of events is empty. Whats going on > > here? > > > > I've tried temporarily disabling record caching (didn't think that was > the > > issue), and even setting the offset commit interval to 0 ms (again didn't > > think this was the issue). Neither had an effect, other than slowing the > > stream. > > > > Definitely need some help here. > > Trey > > >