Hi John,

Haven't been able to reinstate the demo yet, but I have been re-reading the following scenario of yours....

On 1/24/19 11:48 PM, Peter Levart wrote:
Hi John,

On 1/24/19 3:18 PM, John Roesler wrote:


The reason is that, upon restart, the suppression buffer can only
"remember" what got sent & committed to its changelog topic before.

The scenario I have in mind is:

...
* buffer state X
...
* flush state X to buffer changelog
...
* commit transaction T0; start new transaction T1
...
* emit final result X (in uncommitted transaction T1)
...
* crash before flushing to the changelog the fact that state X was emitted.
Also, transaction T1 gets aborted, since we crash before committing.
...
* restart, restoring state X again from the changelog (because the emit
didn't get committed)
* start transaction T2
* emit final result X again (in uncommitted transaction T2)
...
* commit transaction T2
...

So, the result gets emitted twice, but the first time is in an aborted
transaction. This leads me to another clarifying question:

Based on your first message, it seems like the duplicates you observe are
in the output topic. When you read the topic, do you configure your
consumer with "read committed" mode? If not, you'll see "results" from
uncommitted transactions, which could explain the duplicates.

...and I was thinking that perhaps the right solution to the suppression problem would be to use transactional producers for the resulting output topic AND the store change-log. Is this possible? Does the compaction of the log on the brokers work for transactional producers as expected? In that case, the sending of final result and the marking of that fact in the store change log would together be an atomic operation. That said, I think there's another problem with suppression which looks like the supression processor is already processing the input while the state store has not been fully restored yet or something related... Is this guaranteed not to happen?

And now something unrelated I wanted to ask...

I'm trying to create my own custom state store. From the API I can see it is pretty straightforward. One thing that I don't quite understand is how Kafka Streams know whether to replay the whole change log after the store registers itself or just a part of it and which part (from which offset per partition). There doesn't seem to be any API point through which the store could communicate this information back to Kafka Streams. Is such bookkeeping performed outside the store? Does Kafka Streams first invoke flush() on the store and then notes down the offsets from the change log producer somewhere? So next time the store is brought up, the log is only replayed from last noted down offset? So it can happen that the store gets some log entries that have already been incorporated in it (from the point of one flush before) but never misses any... In any case there has to be an indication somewhere that the store didn't survive and has to be rebuilt from scratch. How do Kafka Streams detect that situation? By placing some marker file into the directory reserved for store's local storage?

Regards, Peter

Reply via email to