Update - I tried Sophie's suggestion; I implemented a Transformer that
performs puts on the table's backing store. I hid the complexities behind a
kotlin extension method. So now the code looks like this (pesudocode):

KStream<K, V>.commit() { // transformer is implemented here }

stream.join(table) { event, detailRecord ->
  // append record
}
.commit()
.to(table)

So this approach achieves the desired result - the data is no longer stale.
*However*, my logs are now littered with statements like this:
  WARN  o.a.k.s.k.internals.KTableSource - Detected out-of-order KTable
update for cdr at offset 89, partition 1.

Basically, there's a corresponding warning emitted for every put into the
backing store. I've tried swapping the order of the logical "commit" and
"to" operations, and still receive the warnings.

So, what is the proper approach here? Just keep it as-is and ignore the
warnings?


On Sat, Oct 5, 2019 at 10:23 AM Trey Hutcheson <trey.hutche...@gmail.com>
wrote:

> Thank you for your response Sophie.
>
> What confuses me is that I have already implemented this same pattern
> (join to table, mutate entity, write back to table) in two other streams
> applications without any issue whatsoever. After thinking about it, I think
> the difference here is the input data. In the other applications, the
> mutations were caused by rest api invocations and were relatively low
> frequency. In this case, we have course grained buckets of data, semi-batch
> data, that is decomposed and flatMapped into a bunch of discreet events
> which are then effectively bursted to the events topic. In this particular
> case, I don't really care about latency *or* throughput; this is
> effectively a write-behind reporting application with no time sensitivity.
>
> Anyway, I've considered another option; I might just change the payload of
> the KStream to KStream<UUID, List<ModificationEvent>>. The individual
> messages on that topic will be larger, but lower frequency. But that won't
> solve the problem of replaying from earlier offsets - which effectively
> will be another bursty scenario.
>
> If that doesn't work as I hope, then I'll drop down and implement a
> transformer that injects to the data store itself. That just feels icky
> though.
>
> On Fri, Oct 4, 2019 at 5:09 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
>
>> I think the issue here is that you're basically creating a cycle in your
>> streams topology,
>> which is generally supposed to be a DAG. If I understand correctly, rather
>> than writing
>> the new data to the underlying store you're sending it to the topic from
>> which the table
>> is built. Is that right?
>>
>> The problem is that for this to work, Streams has to recognize that you
>> sent an
>> update to the table while processing the stream, and has to somehow know
>> that
>> you want that update to be processed before the next record from the
>> stream
>> is processed.
>> But there aren't any kind of guarantees as to which will be processed
>> first, for good reason --
>> how should Streams know which order you want them to be processed? And the
>> update to
>> the table will have some nonzero latency as it has to go to the table's
>> topic, then be read from
>> the topic by Streams. Is it supposed to block all processing until it's
>> received an update to the table?
>>
>> All that said, what you're trying to do is not impossible, though you
>> might
>> need to use the
>> lower-level Processor API (or attach a transformer). Instead of trying to
>> write to the table
>> and then hope streams will process the update first, you can attach your
>> own state store
>> and write directly to it. Then, any subsequent reads will reflect the
>> latest updates.
>>
>> Hope that makes sense!
>> Sophie
>>
>> On Fri, Oct 4, 2019 at 2:33 PM Boyang Chen <reluctanthero...@gmail.com>
>> wrote:
>>
>> > Hey Trey,
>> >
>> > as I was reading, several suggestions I have are:
>> >
>> > 1. Could you revert 0ms commit interval to default? It will not help
>> with
>> > the situation as you will try to commit on every poll()
>> > 2. I couldn't know how you actually write your code, but  you could try
>> > something really simple as print statement within the join operation to
>> see
>> > if your application is actually taking incoming traffic. If you have
>> > metrics exported, that would also be useful.
>> >
>> > Boyang
>> >
>> > On Fri, Oct 4, 2019 at 6:42 AM Trey Hutcheson <trey.hutche...@gmail.com
>> >
>> > wrote:
>> >
>> > > This is my third kafka streams application and I'd thought I had
>> gotten
>> > to
>> > > know the warts and how to use it correctly. But I'm beating my head
>> > against
>> > > something that I just cannot explain. Values written to a table, when
>> > later
>> > > read back in a join operation, are stale.
>> > >
>> > > Assume the following simplified domain:
>> > > ModificationEvent - describes some mutation in the system
>> > > DetailRecord - a detailed record of some occurrence, contains some
>> > > metadata, and all of the modification events that occurred as part of
>> the
>> > > occurrence/incident.
>> > >
>> > > Very simple topology:
>> > > KStream<UUID, ModificationEvent>
>> > > KTable<UUID, DetailRecord>
>> > > The UUID, in this case, is the id of the incident/occurrence being
>> > > reported. When there is a "terminal" event, the DetailRecord will be
>> > > shipped to an external system.
>> > >
>> > > So I have a very very simple transformation (pseudo kotlin):
>> > > eventsStream.join(detailTable) { event, detailRecord ->
>> > >   detailRecord.events.add(event)
>> > >   return detailRecord
>> > > }.to(detailTable)
>> > >
>> > > So the ModificationEvent's stream is joined to the DetailRecord's
>> table,
>> > > and the ModificationEvent is appended to the end of the DetailRecord,
>> > which
>> > > is then written back to the table.
>> > >
>> > > However, on the next modification event for the same detail record,
>> the
>> > > detail record is stale and it's list of events is empty. Whats going
>> on
>> > > here?
>> > >
>> > > I've tried temporarily disabling record caching (didn't think that was
>> > the
>> > > issue), and even setting the offset commit interval to 0 ms (again
>> didn't
>> > > think this was the issue). Neither had an effect, other than slowing
>> the
>> > > stream.
>> > >
>> > > Definitely need some help here.
>> > > Trey
>> > >
>> >
>>
>

Reply via email to