[ 
https://issues.apache.org/jira/browse/FLINK-4502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-4502:
------------------------------------
    Component/s: Cassandra Connector

> Cassandra connector documentation has misleading consistency guarantees
> -----------------------------------------------------------------------
>
>                 Key: FLINK-4502
>                 URL: https://issues.apache.org/jira/browse/FLINK-4502
>             Project: Flink
>          Issue Type: Bug
>          Components: Cassandra Connector, Documentation
>    Affects Versions: 1.1.0
>            Reporter: Elias Levy
>
> The Cassandra connector documentation states that  "enableWriteAheadLog() is 
> an optional method, that allows exactly-once processing for non-deterministic 
> algorithms."  This claim appears to be false.
> From what I gather, the write ahead log feature of the connector works as 
> follows:
> - The sink is replaced with a stateful operator that writes incoming messages 
> to the state backend based on checkpoint they belong in.
> - When the operator is notified that a Flink checkpoint has been completed 
> it, for each set of checkpoints older than and including the committed one:
>   * reads its messages from the state backend
>   * writes them to Cassandra
>   * records that it has committed them to Cassandra for the specific 
> checkpoint and operator instance
>    * and erases them from the state backend.
> This process attempts to avoid resubmitting queries to Cassandra that would 
> otherwise occur when recovering a job from a checkpoint and having messages 
> replayed.
> Alas, this does not guarantee exactly once semantics at the sink.  The writes 
> to Cassandra that occur when the operator is notified that checkpoint is 
> completed are not atomic and they are potentially non-idempotent.  If the job 
> dies while writing to Cassandra or before committing the checkpoint via 
> committer, queries will be replayed when the job recovers.  Thus the 
> documentation appear to be incorrect in stating this provides exactly-once 
> semantics.
> There also seems to be an issue in GenericWriteAheadSink's 
> notifyOfCompletedCheckpoint which may result in incorrect output.  If 
> sendValues returns false because a write failed, instead of bailing, it 
> simply moves on to the next checkpoint to commit if there is one, keeping the 
> previous one around to try again later.  But that can result in newer data 
> being overwritten with older data when the previous checkpoint is retried.  
> Although given that CassandraCommitter implements isCheckpointCommitted as 
> checkpointID <= this.lastCommittedCheckpointID, it actually means that when 
> it goes back to try the uncommitted older checkpoint it will consider it 
> committed, even though some of its data may not have been written out, and 
> the data will be discarded.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to