[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701474#comment-16701474 ]
ASF GitHub Bot commented on FLINK-9083: --------------------------------------- jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r236963630 ########## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ########## @@ -45,73 +48,63 @@ */ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); + + // --------------------------- Cassandra Fields --------------------------- + + private final ClusterBuilder builder; + protected transient Cluster cluster; protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback<V> callback; + // ------------------------ Synchronization Fields ------------------------ - private final ClusterBuilder builder; + private AtomicReference<Throwable> throwable; + private Semaphore semaphore; + private Phaser phaser; Review comment: I used a Phaser, Semaphore, backoffs, NACK-responsive retries, and Guava's RateLimiter in my custom CassandraSink. The Phaser is a dynamic, reusable barrier, so this is used to ensure that all requests are accounted for when checkpointing or closing. Thus, this is not used for limiting resources and stability but more for correctness. The Semaphore is used to limit max concurrent requests while the RateLimiter is used to limit max requests per second. Although, concurrency and rate are formally related which prompts people to choose one over the other, there are downsides of choosing just one. A RateLimiter has problems with bursts. I believe the RateLimiter behaves like a leaky bucket, so if a burst of requests are invoked, the quota is used up, and subsequent requests are smoothed out. As Cassandra requests are generally very little latency, this suddenly hits the cluster hard and then it idles. Although the average rate is preserved by the RateLimiter, you could thrash the cluster potentially. e.g. In a large-scale, multi-tenant cluster, I use the RateLimiter to quota QPS for resourcing purposes, but if everyone bursts at the same time, you can hurt your cluster in a bad way. The Semaphore helps mitigates bursts by controlling concurrency and it aligns with the staged event driven architecture of Cassandra where you can set the max concurrent requests as a fraction of the cluster's queue size. Using all the techniques I mentioned in the beginning, proper resource control was done to maximize throughput without jeopardizing availability. Aside: I think I made a ticket for the other features aside from this ticket. I've been meaning to get to them all together, but my years has been hectic personally and professionally, so I apologize about that. :") ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > ----------------------------------------------------- > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector > Reporter: Jacob Park > Assignee: Jacob Park > Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)