[ https://issues.apache.org/jira/browse/FLINK-4500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171519#comment-16171519 ]
ASF GitHub Bot commented on FLINK-4500: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139660015 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java --- @@ -37,29 +41,49 @@ * * @param <IN> Type of the elements emitted by this sink */ -public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { +public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); protected transient Cluster cluster; protected transient Session session; - protected transient volatile Throwable exception; + protected transient volatile Throwable asyncError; protected transient FutureCallback<V> callback; - private final ClusterBuilder builder; + protected final ClusterBuilder builder; private final AtomicInteger updatesPending = new AtomicInteger(); + /** + * If true, the producer will wait until all outstanding action requests have been sent to C*. + */ + private boolean flushOnCheckpoint = true; --- End diff -- Ideally all sinks should behave the same way, but that would be out of scope for PR in particular. > Cassandra sink can lose messages > -------------------------------- > > Key: FLINK-4500 > URL: https://issues.apache.org/jira/browse/FLINK-4500 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector > Affects Versions: 1.1.0 > Reporter: Elias Levy > Assignee: Michael Fong > > The problem is the same as I pointed out with the Kafka producer sink > (FLINK-4027). The CassandraTupleSink's send() and CassandraPojoSink's send() > both send data asynchronously to Cassandra and record whether an error occurs > via a future callback. But CassandraSinkBase does not implement > Checkpointed, so it can't stop checkpoint from happening even though the are > Cassandra queries in flight from the checkpoint that may fail. If they do > fail, they would subsequently not be replayed when the job recovered, and > would thus be lost. > In addition, > CassandraSinkBase's close should check whether there is a pending exception > and throw it, rather than silently close. It should also wait for any > pending async queries to complete and check their status before closing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)