BTW, after stopping the app gracefully (Stream#close()), this error shows
up repeatedly:

2019-03-01 17:18:07,819 WARN
[XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
[0_0] Failed to write offset checkpoint file to
[/tmp/kafka-stream/XXX/0_0/.checkpoint]

java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
(No such file or directory)

at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]

at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]

at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[?:1.8.0_191]

at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[?:1.8.0_191]

at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
StreamTask.java:599) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.StreamTask.close(
StreamTask.java:721) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
TaskManager.java:267) [kafka-streams-2.2.0.jar:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.StreamThread.run(
StreamThread.java:786) [kafka-streams-2.2.0.jar:?]


However, I have checked and the folder created starts with: *1_*

ls -lha /tmp/kafka-stream/XXX/1_1
total 8
drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
-rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
-rw-r--r--   1 a  b     0B  1 Mar 16:05 .lock
drwxr-xr-x   3 a  b    96B  1 Mar 16:43
KSTREAM-REDUCE-STATE-STORE-0000000005



Cheers!
--
Jonathan



On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <jonathansanti...@gmail.com>
wrote:

> Hello John, hope you are well.
> I have tested the version 2.2 release candidate (although I know it has
> been postponed).
> I have been following this email thread because I think am experiencing
> the same issue. I have reported in an email to this list and also all the
> details are in OS (
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> ).
>
> After the test, the result is the same as before (at least for my case),
> already processed records are passed again to the output topic causing the
> data duplication:
>
> ...
> 2019-03-01 16:55:23,808 INFO  
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] No
> checkpoint found for task 1_10 state store
> KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS turned on. 
> *Reinitializing
> the task and restore its state from the beginning.*
>
> ...
>
>
> I was hoping for this to be fixed, but is not the case, at least for my
> case.
>
> If you can, please take a look at the question in SO, I was in contact
> with Matthias about it, he also points me the place where probably the
> potential but could be present.
>
> Please, let me know any thoughts.
>
>
> Cheers!
> --
> Jonathan
>
>
> On Tue, Feb 26, 2019 at 5:23 PM John Roesler <j...@confluent.io> wrote:
>
>> Hi again, Peter,
>>
>> Just to close the loop about the bug in Suppress, we did get the
>> (apparent)
>> same report from a few other people:
>> https://issues.apache.org/jira/browse/KAFKA-7895
>>
>> I also managed to reproduce the duplicate-result behavior, which could
>> cause it to emit both intermediate results and duplicate final results.
>>
>> There's a patch for it in the 2.2 release candidate. Perhaps you can try
>> it
>> out and see if it resolves the issue for you?
>>
>> I'm backporting the fix to 2.1 as well, but I unfortunately missed the
>> last
>> 2.1 bugfix release.
>>
>> Thanks,
>> -John
>>
>> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <j...@confluent.io> wrote:
>>
>> > Hi Peter,
>> >
>> > Thanks for the replies.
>> >
>> > Regarding transactions:
>> > Yes, actually, with EOS enabled, the changelog and the output topics are
>> > all produced with the same transactional producer, within the same
>> > transactions. So it should already be atomic.
>> >
>> > Regarding restore:
>> > Streams doesn't put the store into service until the restore is
>> completed,
>> > so it should be guaranteed not to happen. But there's of course no
>> > guarantee that I didn't mess something up. I'll take a hard look at it.
>> >
>> > Regarding restoration and offsets:
>> > Your guess is correct: Streams tracks the latest stored offset outside
>> of
>> > the store implementation itself, specifically by writing a file (called
>> a
>> > Checkpoint File) in the state directory. If the file is there, it reads
>> > that offset and restores from that point. If the file is missing, it
>> > restores from the beginning of the stream. So it should "just work" for
>> > you. Just for completeness, there have been several edge cases
>> discovered
>> > where this mechanism isn't completely safe, so in the case of EOS, I
>> > believe we actually disregard that checkpoint file and the prior state
>> and
>> > always rebuild from the earliest offset in the changelog.
>> >
>> > Personally, I would like to see us provide the ability to store the
>> > checkpoint inside the state store, so that checkpoint updates are
>> > linearized correctly w.r.t. data updates, but I actually haven't
>> mentioned
>> > this thought to anyone until now ;)
>> >
>> > Finally, regarding your prior email:
>> > Yes, I was thinking that the "wrong" output values might be part of
>> > rolled-back transactions and therefore enabling read-committed mode on
>> the
>> > consumer might tell a different story that what you've seen to date.
>> >
>> > I'm honestly still baffled about those intermediate results that are
>> > sneaking out. I wonder if it's something specific to your data stream,
>> like
>> > maybe if there is maybe an edge case when two records have exactly the
>> same
>> > timestamp? I'll have to stare at the code some more...
>> >
>> > Regardless, in order to reap the benefits of running the app with EOS,
>> you
>> > really have to also set your consumers to read_committed. Otherwise,
>> you'll
>> > be seeing output data from aborted (aka rolled-back) transactions, and
>> you
>> > miss the intended "exactly once" guarantee.
>> >
>> > Thanks,
>> > -John
>> >
>> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <peter.lev...@gmail.com>
>> > wrote:
>> >
>> >> Hi John,
>> >>
>> >> Haven't been able to reinstate the demo yet, but I have been re-reading
>> >> the following scenario of yours....
>> >>
>> >> On 1/24/19 11:48 PM, Peter Levart wrote:
>> >> > Hi John,
>> >> >
>> >> > On 1/24/19 3:18 PM, John Roesler wrote:
>> >> >
>> >> >>
>> >> >> The reason is that, upon restart, the suppression buffer can only
>> >> >> "remember" what got sent & committed to its changelog topic before.
>> >> >>
>> >> >> The scenario I have in mind is:
>> >> >>
>> >> >> ...
>> >> >> * buffer state X
>> >> >> ...
>> >> >> * flush state X to buffer changelog
>> >> >> ...
>> >> >> * commit transaction T0; start new transaction T1
>> >> >> ...
>> >> >> * emit final result X (in uncommitted transaction T1)
>> >> >> ...
>> >> >> * crash before flushing to the changelog the fact that state X was
>> >> >> emitted.
>> >> >> Also, transaction T1 gets aborted, since we crash before committing.
>> >> >> ...
>> >> >> * restart, restoring state X again from the changelog (because the
>> emit
>> >> >> didn't get committed)
>> >> >> * start transaction T2
>> >> >> * emit final result X again (in uncommitted transaction T2)
>> >> >> ...
>> >> >> * commit transaction T2
>> >> >> ...
>> >> >>
>> >> >> So, the result gets emitted twice, but the first time is in an
>> aborted
>> >> >> transaction. This leads me to another clarifying question:
>> >> >>
>> >> >> Based on your first message, it seems like the duplicates you
>> observe
>> >> >> are
>> >> >> in the output topic. When you read the topic, do you configure your
>> >> >> consumer with "read committed" mode? If not, you'll see "results"
>> from
>> >> >> uncommitted transactions, which could explain the duplicates.
>> >>
>> >> ...and I was thinking that perhaps the right solution to the
>> suppression
>> >> problem would be to use transactional producers for the resulting
>> output
>> >> topic AND the store change-log. Is this possible? Does the compaction
>> of
>> >> the log on the brokers work for transactional producers as expected? In
>> >> that case, the sending of final result and the marking of that fact in
>> >> the store change log would together be an atomic operation.
>> >> That said, I think there's another problem with suppression which looks
>> >> like the supression processor is already processing the input while the
>> >> state store has not been fully restored yet or something related... Is
>> >> this guaranteed not to happen?
>> >>
>> >> And now something unrelated I wanted to ask...
>> >>
>> >> I'm trying to create my own custom state store. From the API I can see
>> >> it is pretty straightforward. One thing that I don't quite understand
>> is
>> >> how Kafka Streams know whether to replay the whole change log after the
>> >> store registers itself or just a part of it and which part (from which
>> >> offset per partition). There doesn't seem to be any API point through
>> >> which the store could communicate this information back to Kafka
>> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
>> >> Streams first invoke flush() on the store and then notes down the
>> >> offsets from the change log producer somewhere? So next time the store
>> >> is brought up, the log is only replayed from last noted down offset? So
>> >> it can happen that the store gets some log entries that have already
>> >> been incorporated in it (from the point of one flush before) but never
>> >> misses any... In any case there has to be an indication somewhere that
>> >> the store didn't survive and has to be rebuilt from scratch. How do
>> >> Kafka Streams detect that situation? By placing some marker file into
>> >> the directory reserved for store's local storage?
>> >>
>> >> Regards, Peter
>> >>
>> >>
>>
>
>
> --
> Santilli Jonathan
>


-- 
Santilli Jonathan

Reply via email to