[ https://issues.apache.org/jira/browse/FLINK-4502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chesnay Schepler updated FLINK-4502: ------------------------------------ Component/s: Cassandra Connector > Cassandra connector documentation has misleading consistency guarantees > ----------------------------------------------------------------------- > > Key: FLINK-4502 > URL: https://issues.apache.org/jira/browse/FLINK-4502 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Documentation > Affects Versions: 1.1.0 > Reporter: Elias Levy > > The Cassandra connector documentation states that "enableWriteAheadLog() is > an optional method, that allows exactly-once processing for non-deterministic > algorithms." This claim appears to be false. > From what I gather, the write ahead log feature of the connector works as > follows: > - The sink is replaced with a stateful operator that writes incoming messages > to the state backend based on checkpoint they belong in. > - When the operator is notified that a Flink checkpoint has been completed > it, for each set of checkpoints older than and including the committed one: > * reads its messages from the state backend > * writes them to Cassandra > * records that it has committed them to Cassandra for the specific > checkpoint and operator instance > * and erases them from the state backend. > This process attempts to avoid resubmitting queries to Cassandra that would > otherwise occur when recovering a job from a checkpoint and having messages > replayed. > Alas, this does not guarantee exactly once semantics at the sink. The writes > to Cassandra that occur when the operator is notified that checkpoint is > completed are not atomic and they are potentially non-idempotent. If the job > dies while writing to Cassandra or before committing the checkpoint via > committer, queries will be replayed when the job recovers. Thus the > documentation appear to be incorrect in stating this provides exactly-once > semantics. > There also seems to be an issue in GenericWriteAheadSink's > notifyOfCompletedCheckpoint which may result in incorrect output. If > sendValues returns false because a write failed, instead of bailing, it > simply moves on to the next checkpoint to commit if there is one, keeping the > previous one around to try again later. But that can result in newer data > being overwritten with older data when the previous checkpoint is retried. > Although given that CassandraCommitter implements isCheckpointCommitted as > checkpointID <= this.lastCommittedCheckpointID, it actually means that when > it goes back to try the uncommitted older checkpoint it will consider it > committed, even though some of its data may not have been written out, and > the data will be discarded. -- This message was sent by Atlassian JIRA (v6.3.4#6332)