[ 
https://issues.apache.org/jira/browse/KAFKA-12475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302200#comment-17302200
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12475:
------------------------------------------------

One option to fix this would be to add a deleteAll() method to the StateStore 
interface(s), For compatibility, and because not all stores support a deleteAll 
operation, this should have a default implementation that invokes the existing 
delete(key) method on everything in the store. This will obviously suffer 
performance-wise, but additional latency in state store restoration is 
preferred over silently producing incorrect results with EOS. Also, we expect 
that the vast majority of custom state stores will either be local & able to 
just wipe out the state directory as usual, or else are backed by a storage 
engine that supports some form of a deleteAll() operation.

For the rocksdb-based state stores that Streams provides, we can just override 
deleteAll() to wipe out the state directory as we do today. Note that this 
approach would require a KIP, and there may be some compatibility concerns for 
users of local custom state stores today: they should not be required to change 
their code to implement deleteAll() and wipe out the state store. So we may 
need to actually wipe out the state directory in the default implementation, 
and leave it up to EOS users with remote storage to implement things for 
semantic correctness.

> Kafka Streams breaks EOS with remote state stores
> -------------------------------------------------
>
>                 Key: KAFKA-12475
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12475
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: A. Sophie Blee-Goldman
>            Priority: Major
>              Labels: needs-kip
>
> Currently in Kafka Streams, exactly-once semantics (EOS) require that the 
> state stores be completely erased and restored from the changelog from 
> scratch in case of an error. This erasure is implemented by closing the state 
> store and then simply wiping out the local state directory. This works fine 
> for the two store implementations provided OOTB, in-memory and rocksdb, but 
> fails when the application includes a custom StateStore based on remote 
> storage, such as Redis. In this case Streams will fail to erase any of the 
> data before reinserting data from the changelog, resulting in possible 
> duplicates and breaking the guarantee of EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to