I think the issue here is that you're basically creating a cycle in your
streams topology,
which is generally supposed to be a DAG. If I understand correctly, rather
than writing
the new data to the underlying store you're sending it to the topic from
which the table
is built. Is that right?

The problem is that for this to work, Streams has to recognize that you
sent an
update to the table while processing the stream, and has to somehow know
that
you want that update to be processed before the next record from the stream
is processed.
But there aren't any kind of guarantees as to which will be processed
first, for good reason --
how should Streams know which order you want them to be processed? And the
update to
the table will have some nonzero latency as it has to go to the table's
topic, then be read from
the topic by Streams. Is it supposed to block all processing until it's
received an update to the table?

All that said, what you're trying to do is not impossible, though you might
need to use the
lower-level Processor API (or attach a transformer). Instead of trying to
write to the table
and then hope streams will process the update first, you can attach your
own state store
and write directly to it. Then, any subsequent reads will reflect the
latest updates.

Hope that makes sense!
Sophie

On Fri, Oct 4, 2019 at 2:33 PM Boyang Chen <reluctanthero...@gmail.com>
wrote:

> Hey Trey,
>
> as I was reading, several suggestions I have are:
>
> 1. Could you revert 0ms commit interval to default? It will not help with
> the situation as you will try to commit on every poll()
> 2. I couldn't know how you actually write your code, but  you could try
> something really simple as print statement within the join operation to see
> if your application is actually taking incoming traffic. If you have
> metrics exported, that would also be useful.
>
> Boyang
>
> On Fri, Oct 4, 2019 at 6:42 AM Trey Hutcheson <trey.hutche...@gmail.com>
> wrote:
>
> > This is my third kafka streams application and I'd thought I had gotten
> to
> > know the warts and how to use it correctly. But I'm beating my head
> against
> > something that I just cannot explain. Values written to a table, when
> later
> > read back in a join operation, are stale.
> >
> > Assume the following simplified domain:
> > ModificationEvent - describes some mutation in the system
> > DetailRecord - a detailed record of some occurrence, contains some
> > metadata, and all of the modification events that occurred as part of the
> > occurrence/incident.
> >
> > Very simple topology:
> > KStream<UUID, ModificationEvent>
> > KTable<UUID, DetailRecord>
> > The UUID, in this case, is the id of the incident/occurrence being
> > reported. When there is a "terminal" event, the DetailRecord will be
> > shipped to an external system.
> >
> > So I have a very very simple transformation (pseudo kotlin):
> > eventsStream.join(detailTable) { event, detailRecord ->
> >   detailRecord.events.add(event)
> >   return detailRecord
> > }.to(detailTable)
> >
> > So the ModificationEvent's stream is joined to the DetailRecord's table,
> > and the ModificationEvent is appended to the end of the DetailRecord,
> which
> > is then written back to the table.
> >
> > However, on the next modification event for the same detail record, the
> > detail record is stale and it's list of events is empty. Whats going on
> > here?
> >
> > I've tried temporarily disabling record caching (didn't think that was
> the
> > issue), and even setting the offset commit interval to 0 ms (again didn't
> > think this was the issue). Neither had an effect, other than slowing the
> > stream.
> >
> > Definitely need some help here.
> > Trey
> >
>

Reply via email to