[ https://issues.apache.org/jira/browse/FLINK-4500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16139448#comment-16139448 ]
Michael Fong edited comment on FLINK-4500 at 8/24/17 1:36 AM: -------------------------------------------------------------- I plan to design the first part of the requirement on existing implementation by 1. implements CheckedpointedFunction 2. leverage AtomicInteger updatesPending to make sure all in-flight records will be flushed to sink when checkpoint performs. 3. As other data connectors, there will be a flag to turn on or off to the honor the promise during checkpointing. For the reference, the second part was addressed by FLINK-5101 in the earlier comments. was (Author: mcfongtw): I plan to design the first part of the requirement on existing implementation by 1. implements CheckedpointedFunction 2. leverage AtomicInteger updatesPending to make sure all in-flight records will be flushed to sink when checkpoint performs. 3. As other data connectors, there will be a flag to turn on or off to the honor the promise during checkpointing. For the reference, the second part was addressed by FLINK-5101 already. > Cassandra sink can lose messages > -------------------------------- > > Key: FLINK-4500 > URL: https://issues.apache.org/jira/browse/FLINK-4500 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector > Affects Versions: 1.1.0 > Reporter: Elias Levy > Assignee: Michael Fong > > The problem is the same as I pointed out with the Kafka producer sink > (FLINK-4027). The CassandraTupleSink's send() and CassandraPojoSink's send() > both send data asynchronously to Cassandra and record whether an error occurs > via a future callback. But CassandraSinkBase does not implement > Checkpointed, so it can't stop checkpoint from happening even though the are > Cassandra queries in flight from the checkpoint that may fail. If they do > fail, they would subsequently not be replayed when the job recovered, and > would thus be lost. > In addition, > CassandraSinkBase's close should check whether there is a pending exception > and throw it, rather than silently close. It should also wait for any > pending async queries to complete and check their status before closing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)