BTW, after stopping the app gracefully (Stream#close()), this error shows up repeatedly:
2019-03-01 17:18:07,819 WARN [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] internals.ProcessorStateManager (ProcessorStateManager.java:327) - task [0_0] Failed to write offset checkpoint file to [/tmp/kafka-stream/XXX/0_0/.checkpoint] java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191] at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191] at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[?:1.8.0_191] at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[?:1.8.0_191] at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write( OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint( ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.suspend( StreamTask.java:599) [kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.close( StreamTask.java:721) [kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks.close( AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.shutdown( TaskManager.java:267) [kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown( StreamThread.java:1209) [kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run( StreamThread.java:786) [kafka-streams-2.2.0.jar:?] However, I have checked and the folder created starts with: *1_* ls -lha /tmp/kafka-stream/XXX/1_1 total 8 drwxr-xr-x 5 a b 160B 1 Mar 17:18 . drwxr-xr-x 34 a b 1.1K 1 Mar 17:15 .. -rw-r--r-- 1 a b 2.9K 1 Mar 17:18 .checkpoint -rw-r--r-- 1 a b 0B 1 Mar 16:05 .lock drwxr-xr-x 3 a b 96B 1 Mar 16:43 KSTREAM-REDUCE-STATE-STORE-0000000005 Cheers! -- Jonathan On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <jonathansanti...@gmail.com> wrote: > Hello John, hope you are well. > I have tested the version 2.2 release candidate (although I know it has > been postponed). > I have been following this email thread because I think am experiencing > the same issue. I have reported in an email to this list and also all the > details are in OS ( > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio > ). > > After the test, the result is the same as before (at least for my case), > already processed records are passed again to the output topic causing the > data duplication: > > ... > 2019-03-01 16:55:23,808 INFO > [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] > internals.StoreChangelogReader (StoreChangelogReader.java:221) - > stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] No > checkpoint found for task 1_10 state store > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS turned on. > *Reinitializing > the task and restore its state from the beginning.* > > ... > > > I was hoping for this to be fixed, but is not the case, at least for my > case. > > If you can, please take a look at the question in SO, I was in contact > with Matthias about it, he also points me the place where probably the > potential but could be present. > > Please, let me know any thoughts. > > > Cheers! > -- > Jonathan > > > On Tue, Feb 26, 2019 at 5:23 PM John Roesler <j...@confluent.io> wrote: > >> Hi again, Peter, >> >> Just to close the loop about the bug in Suppress, we did get the >> (apparent) >> same report from a few other people: >> https://issues.apache.org/jira/browse/KAFKA-7895 >> >> I also managed to reproduce the duplicate-result behavior, which could >> cause it to emit both intermediate results and duplicate final results. >> >> There's a patch for it in the 2.2 release candidate. Perhaps you can try >> it >> out and see if it resolves the issue for you? >> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed the >> last >> 2.1 bugfix release. >> >> Thanks, >> -John >> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <j...@confluent.io> wrote: >> >> > Hi Peter, >> > >> > Thanks for the replies. >> > >> > Regarding transactions: >> > Yes, actually, with EOS enabled, the changelog and the output topics are >> > all produced with the same transactional producer, within the same >> > transactions. So it should already be atomic. >> > >> > Regarding restore: >> > Streams doesn't put the store into service until the restore is >> completed, >> > so it should be guaranteed not to happen. But there's of course no >> > guarantee that I didn't mess something up. I'll take a hard look at it. >> > >> > Regarding restoration and offsets: >> > Your guess is correct: Streams tracks the latest stored offset outside >> of >> > the store implementation itself, specifically by writing a file (called >> a >> > Checkpoint File) in the state directory. If the file is there, it reads >> > that offset and restores from that point. If the file is missing, it >> > restores from the beginning of the stream. So it should "just work" for >> > you. Just for completeness, there have been several edge cases >> discovered >> > where this mechanism isn't completely safe, so in the case of EOS, I >> > believe we actually disregard that checkpoint file and the prior state >> and >> > always rebuild from the earliest offset in the changelog. >> > >> > Personally, I would like to see us provide the ability to store the >> > checkpoint inside the state store, so that checkpoint updates are >> > linearized correctly w.r.t. data updates, but I actually haven't >> mentioned >> > this thought to anyone until now ;) >> > >> > Finally, regarding your prior email: >> > Yes, I was thinking that the "wrong" output values might be part of >> > rolled-back transactions and therefore enabling read-committed mode on >> the >> > consumer might tell a different story that what you've seen to date. >> > >> > I'm honestly still baffled about those intermediate results that are >> > sneaking out. I wonder if it's something specific to your data stream, >> like >> > maybe if there is maybe an edge case when two records have exactly the >> same >> > timestamp? I'll have to stare at the code some more... >> > >> > Regardless, in order to reap the benefits of running the app with EOS, >> you >> > really have to also set your consumers to read_committed. Otherwise, >> you'll >> > be seeing output data from aborted (aka rolled-back) transactions, and >> you >> > miss the intended "exactly once" guarantee. >> > >> > Thanks, >> > -John >> > >> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <peter.lev...@gmail.com> >> > wrote: >> > >> >> Hi John, >> >> >> >> Haven't been able to reinstate the demo yet, but I have been re-reading >> >> the following scenario of yours.... >> >> >> >> On 1/24/19 11:48 PM, Peter Levart wrote: >> >> > Hi John, >> >> > >> >> > On 1/24/19 3:18 PM, John Roesler wrote: >> >> > >> >> >> >> >> >> The reason is that, upon restart, the suppression buffer can only >> >> >> "remember" what got sent & committed to its changelog topic before. >> >> >> >> >> >> The scenario I have in mind is: >> >> >> >> >> >> ... >> >> >> * buffer state X >> >> >> ... >> >> >> * flush state X to buffer changelog >> >> >> ... >> >> >> * commit transaction T0; start new transaction T1 >> >> >> ... >> >> >> * emit final result X (in uncommitted transaction T1) >> >> >> ... >> >> >> * crash before flushing to the changelog the fact that state X was >> >> >> emitted. >> >> >> Also, transaction T1 gets aborted, since we crash before committing. >> >> >> ... >> >> >> * restart, restoring state X again from the changelog (because the >> emit >> >> >> didn't get committed) >> >> >> * start transaction T2 >> >> >> * emit final result X again (in uncommitted transaction T2) >> >> >> ... >> >> >> * commit transaction T2 >> >> >> ... >> >> >> >> >> >> So, the result gets emitted twice, but the first time is in an >> aborted >> >> >> transaction. This leads me to another clarifying question: >> >> >> >> >> >> Based on your first message, it seems like the duplicates you >> observe >> >> >> are >> >> >> in the output topic. When you read the topic, do you configure your >> >> >> consumer with "read committed" mode? If not, you'll see "results" >> from >> >> >> uncommitted transactions, which could explain the duplicates. >> >> >> >> ...and I was thinking that perhaps the right solution to the >> suppression >> >> problem would be to use transactional producers for the resulting >> output >> >> topic AND the store change-log. Is this possible? Does the compaction >> of >> >> the log on the brokers work for transactional producers as expected? In >> >> that case, the sending of final result and the marking of that fact in >> >> the store change log would together be an atomic operation. >> >> That said, I think there's another problem with suppression which looks >> >> like the supression processor is already processing the input while the >> >> state store has not been fully restored yet or something related... Is >> >> this guaranteed not to happen? >> >> >> >> And now something unrelated I wanted to ask... >> >> >> >> I'm trying to create my own custom state store. From the API I can see >> >> it is pretty straightforward. One thing that I don't quite understand >> is >> >> how Kafka Streams know whether to replay the whole change log after the >> >> store registers itself or just a part of it and which part (from which >> >> offset per partition). There doesn't seem to be any API point through >> >> which the store could communicate this information back to Kafka >> >> Streams. Is such bookkeeping performed outside the store? Does Kafka >> >> Streams first invoke flush() on the store and then notes down the >> >> offsets from the change log producer somewhere? So next time the store >> >> is brought up, the log is only replayed from last noted down offset? So >> >> it can happen that the store gets some log entries that have already >> >> been incorporated in it (from the point of one flush before) but never >> >> misses any... In any case there has to be an indication somewhere that >> >> the store didn't survive and has to be rebuilt from scratch. How do >> >> Kafka Streams detect that situation? By placing some marker file into >> >> the directory reserved for store's local storage? >> >> >> >> Regards, Peter >> >> >> >> >> > > > -- > Santilli Jonathan > -- Santilli Jonathan