[ https://issues.apache.org/jira/browse/FLINK-4500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16142979#comment-16142979 ]
ASF GitHub Bot commented on FLINK-4500: --------------------------------------- GitHub user mcfongtw opened a pull request: https://github.com/apache/flink/pull/4605 [FLINK-4500] [C* Connector] CassandraSinkBase implements CheckpointedFunction ## What is the purpose of the change Have CassandraSinkBase to implement CheckpointedFunction so that all in-flight mutation message could be sent to C* sink before a checkpoint performs. As a result, the checkpoint would be complete. ## Brief change log * Implement CheckpointedFunction to (optionally) wait on all pending records being flushed to the C* sink before checkpoint performs (or closing connection). * Add debugging message in CassandraSinkBase. * Add unit tests for simple / multi-threaded message dispatching for successful / failed scenarios * Add unit tests for failure handling logics on errors thrown at different stages. * Add unit tests for flushing pending records when checkpoint performs. * Provide a Immediate / Delayed type of ResultSetFuture for testing purposes. * Add CassandraBaseTest in suppression list to use guava imports * In log4j-test settings, change root log level to INFO and enable ALL level against some test classes. ## Verifying this change This change is already covered by existing tests, such as *CassandraBaseTest*. This change added tests and can be verified as follows: * Add unit tests for simple / multi-threaded message dispatching for successful / failed scenarios * Add unit tests for failure handling logics on errors thrown at different stages. * Add unit tests for flushing pending records when checkpoint performs. * Provide a Immediate / Delayed type of ResultSetFuture for testing purposes. * Add CassandraBaseTest in suppression list to use guava imports ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no** (maybe) ) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/mcfongtw/flink FLINK-4500 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4605.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4605 ---- commit caefe390bf2aaa22d996cc24a31a3ba76241fb23 Author: Michael Fong <mcfong.o...@gmail.com> Date: 2017-08-14T12:57:06Z [FLINK-4500] CassandraSinkBase implements CheckpointedFunction * Implement CheckpointedFunction to (optionally) wait on all pending records being flushed to the C* sink before taking a snapshot (or closing connection). * Add debugging message in CassandraSinkBase. * Add unit tests for simple / multi-threaded message dispatching for successful / failed scenarios * Add unit tests for failure handling logics on errors thrown at different stages. * Add unit tests for flushing pending records when checkpoint performs. * Provide a Immediate / Delayed type of ResultSetFuture for testing purposes. * Add CassandraBaseTest in suppression list to use guava imports * In log4j-test settings, change root log level to INFO and enable ALL level against some test classes. ---- > Cassandra sink can lose messages > -------------------------------- > > Key: FLINK-4500 > URL: https://issues.apache.org/jira/browse/FLINK-4500 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector > Affects Versions: 1.1.0 > Reporter: Elias Levy > Assignee: Michael Fong > > The problem is the same as I pointed out with the Kafka producer sink > (FLINK-4027). The CassandraTupleSink's send() and CassandraPojoSink's send() > both send data asynchronously to Cassandra and record whether an error occurs > via a future callback. But CassandraSinkBase does not implement > Checkpointed, so it can't stop checkpoint from happening even though the are > Cassandra queries in flight from the checkpoint that may fail. If they do > fail, they would subsequently not be replayed when the job recovered, and > would thus be lost. > In addition, > CassandraSinkBase's close should check whether there is a pending exception > and throw it, rather than silently close. It should also wait for any > pending async queries to complete and check their status before closing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)