Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139660015 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java --- @@ -37,29 +41,49 @@ * * @param <IN> Type of the elements emitted by this sink */ -public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { +public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); protected transient Cluster cluster; protected transient Session session; - protected transient volatile Throwable exception; + protected transient volatile Throwable asyncError; protected transient FutureCallback<V> callback; - private final ClusterBuilder builder; + protected final ClusterBuilder builder; private final AtomicInteger updatesPending = new AtomicInteger(); + /** + * If true, the producer will wait until all outstanding action requests have been sent to C*. + */ + private boolean flushOnCheckpoint = true; --- End diff -- Ideally all sinks should behave the same way, but that would be out of scope for PR in particular.
---