Hi John,
Haven't been able to reinstate the demo yet, but I have been re-reading
the following scenario of yours....
On 1/24/19 11:48 PM, Peter Levart wrote:
Hi John,
On 1/24/19 3:18 PM, John Roesler wrote:
The reason is that, upon restart, the suppression buffer can only
"remember" what got sent & committed to its changelog topic before.
The scenario I have in mind is:
...
* buffer state X
...
* flush state X to buffer changelog
...
* commit transaction T0; start new transaction T1
...
* emit final result X (in uncommitted transaction T1)
...
* crash before flushing to the changelog the fact that state X was
emitted.
Also, transaction T1 gets aborted, since we crash before committing.
...
* restart, restoring state X again from the changelog (because the emit
didn't get committed)
* start transaction T2
* emit final result X again (in uncommitted transaction T2)
...
* commit transaction T2
...
So, the result gets emitted twice, but the first time is in an aborted
transaction. This leads me to another clarifying question:
Based on your first message, it seems like the duplicates you observe
are
in the output topic. When you read the topic, do you configure your
consumer with "read committed" mode? If not, you'll see "results" from
uncommitted transactions, which could explain the duplicates.
...and I was thinking that perhaps the right solution to the suppression
problem would be to use transactional producers for the resulting output
topic AND the store change-log. Is this possible? Does the compaction of
the log on the brokers work for transactional producers as expected? In
that case, the sending of final result and the marking of that fact in
the store change log would together be an atomic operation.
That said, I think there's another problem with suppression which looks
like the supression processor is already processing the input while the
state store has not been fully restored yet or something related... Is
this guaranteed not to happen?
And now something unrelated I wanted to ask...
I'm trying to create my own custom state store. From the API I can see
it is pretty straightforward. One thing that I don't quite understand is
how Kafka Streams know whether to replay the whole change log after the
store registers itself or just a part of it and which part (from which
offset per partition). There doesn't seem to be any API point through
which the store could communicate this information back to Kafka
Streams. Is such bookkeeping performed outside the store? Does Kafka
Streams first invoke flush() on the store and then notes down the
offsets from the change log producer somewhere? So next time the store
is brought up, the log is only replayed from last noted down offset? So
it can happen that the store gets some log entries that have already
been incorporated in it (from the point of one flush before) but never
misses any... In any case there has to be an indication somewhere that
the store didn't survive and has to be rebuilt from scratch. How do
Kafka Streams detect that situation? By placing some marker file into
the directory reserved for store's local storage?
Regards, Peter