zentol closed pull request #6782: [FLINK-9083][Cassandra Connector] Add async 
backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/connectors/cassandra.md b/docs/dev/connectors/cassandra.md
index 2a2acb3bc38..292314dafd2 100644
--- a/docs/dev/connectors/cassandra.md
+++ b/docs/dev/connectors/cassandra.md
@@ -72,13 +72,16 @@ The following configuration methods can be used:
 4. _setMapperOptions(MapperOptions options)_
     * Sets the mapper options that are used to configure the DataStax 
ObjectMapper.
     * Only applies when processing __POJO__ data types.
-5. _enableWriteAheadLog([CheckpointCommitter committer])_
+5. _setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout)_
+    * Sets the maximum allowed number of concurrent requests with a timeout 
for acquiring permits to execute.
+    * Only applies when __enableWriteAheadLog()__ is not configured.
+6. _enableWriteAheadLog([CheckpointCommitter committer])_
     * An __optional__ setting
     * Allows exactly-once processing for non-deterministic algorithms.
-6. _setFailureHandler([CassandraFailureHandler failureHandler])_
+7. _setFailureHandler([CassandraFailureHandler failureHandler])_
     * An __optional__ setting
     * Sets the custom failure handler.
-7. _build()_
+8. _build()_
     * Finalizes the configuration and constructs the CassandraSink instance.
 
 ### Write-ahead Log
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
index 41826f58203..5e1fcca05e7 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
@@ -32,8 +32,12 @@
        private final String insertQuery;
        private transient PreparedStatement ps;
 
-       public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder 
builder, CassandraFailureHandler failureHandler) {
-               super(builder, failureHandler);
+       public AbstractCassandraTupleSink(
+                       String insertQuery,
+                       ClusterBuilder builder,
+                       CassandraSinkBaseConfig config,
+                       CassandraFailureHandler failureHandler) {
+               super(builder, config, failureHandler);
                this.insertQuery = insertQuery;
        }
 
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index cf4432d4b20..7a3eba091b8 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -51,24 +51,51 @@
         *
         * @param clazz Class instance
         */
-       public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
+       public CassandraPojoSink(
+                       Class<IN> clazz,
+                       ClusterBuilder builder) {
                this(clazz, builder, null, null);
        }
 
-       public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, 
@Nullable MapperOptions options) {
+       public CassandraPojoSink(
+                       Class<IN> clazz,
+                       ClusterBuilder builder,
+                       @Nullable MapperOptions options) {
                this(clazz, builder, options, null);
        }
 
-       public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, 
String keyspace) {
+       public CassandraPojoSink(
+                       Class<IN> clazz,
+                       ClusterBuilder builder,
+                       String keyspace) {
                this(clazz, builder, null, keyspace);
        }
 
-       public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, 
@Nullable MapperOptions options, String keyspace) {
-               this(clazz, builder, options, keyspace, new 
NoOpCassandraFailureHandler());
+       public CassandraPojoSink(
+                       Class<IN> clazz,
+                       ClusterBuilder builder,
+                       @Nullable MapperOptions options,
+                       String keyspace) {
+               this(clazz, builder, options, keyspace, 
CassandraSinkBaseConfig.newBuilder().build());
        }
 
-       public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, 
@Nullable MapperOptions options, String keyspace, CassandraFailureHandler 
failureHandler) {
-               super(builder, failureHandler);
+       CassandraPojoSink(
+                       Class<IN> clazz,
+                       ClusterBuilder builder,
+                       @Nullable MapperOptions options,
+                       String keyspace,
+                       CassandraSinkBaseConfig config) {
+               this(clazz, builder, options, keyspace, config, new 
NoOpCassandraFailureHandler());
+       }
+
+       CassandraPojoSink(
+                       Class<IN> clazz,
+                       ClusterBuilder builder,
+                       @Nullable MapperOptions options,
+                       String keyspace,
+                       CassandraSinkBaseConfig config,
+                       CassandraFailureHandler failureHandler) {
+               super(builder, config, failureHandler);
                this.clazz = clazz;
                this.options = options;
                this.keyspace = keyspace;
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
index f51506b802c..a60aebae9df 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
@@ -26,12 +26,28 @@
 
        private final int rowArity;
 
-       public CassandraRowSink(int rowArity, String insertQuery, 
ClusterBuilder builder) {
-               this(rowArity, insertQuery, builder, new 
NoOpCassandraFailureHandler());
+       public CassandraRowSink(
+                       int rowArity,
+                       String insertQuery,
+                       ClusterBuilder builder) {
+               this(rowArity, insertQuery, builder, 
CassandraSinkBaseConfig.newBuilder().build());
        }
 
-       public CassandraRowSink(int rowArity, String insertQuery, 
ClusterBuilder builder, CassandraFailureHandler failureHandler) {
-               super(insertQuery, builder, failureHandler);
+       CassandraRowSink(
+                       int rowArity,
+                       String insertQuery,
+                       ClusterBuilder builder,
+                       CassandraSinkBaseConfig config) {
+               this(rowArity, insertQuery, builder, config, new 
NoOpCassandraFailureHandler());
+       }
+
+       CassandraRowSink(
+                       int rowArity,
+                       String insertQuery,
+                       ClusterBuilder builder,
+                       CassandraSinkBaseConfig config,
+                       CassandraFailureHandler failureHandler) {
+               super(insertQuery, builder, config, failureHandler);
                this.rowArity = rowArity;
        }
 
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
index 84af78d7a6a..13d1a8f253c 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
@@ -26,12 +26,25 @@
  * @param <IN> Type of the elements emitted by this sink, it must extend 
{@link Product}
  */
 public class CassandraScalaProductSink<IN extends Product> extends 
AbstractCassandraTupleSink<IN> {
-       public CassandraScalaProductSink(String insertQuery, ClusterBuilder 
builder) {
-               this(insertQuery, builder, new NoOpCassandraFailureHandler());
+       public CassandraScalaProductSink(
+                       String insertQuery,
+                       ClusterBuilder builder) {
+               this(insertQuery, builder, 
CassandraSinkBaseConfig.newBuilder().build());
        }
 
-       public CassandraScalaProductSink(String insertQuery, ClusterBuilder 
builder, CassandraFailureHandler failureHandler) {
-               super(insertQuery, builder, failureHandler);
+       CassandraScalaProductSink(
+                       String insertQuery,
+                       ClusterBuilder builder,
+                       CassandraSinkBaseConfig config) {
+               this(insertQuery, builder, config, new 
NoOpCassandraFailureHandler());
+       }
+
+       CassandraScalaProductSink(
+                       String insertQuery,
+                       ClusterBuilder builder,
+                       CassandraSinkBaseConfig config,
+                       CassandraFailureHandler failureHandler) {
+               super(insertQuery, builder, config, failureHandler);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index e774cd31893..fff8af72ea0 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -37,6 +37,8 @@
 
 import com.datastax.driver.core.Cluster;
 
+import java.time.Duration;
+
 import scala.Product;
 
 /**
@@ -235,6 +237,7 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) {
                protected final DataStream<IN> input;
                protected final TypeSerializer<IN> serializer;
                protected final TypeInformation<IN> typeInfo;
+               protected final CassandraSinkBaseConfig.Builder configBuilder;
                protected ClusterBuilder builder;
                protected String keyspace;
                protected MapperOptions mapperOptions;
@@ -247,6 +250,7 @@ public CassandraSinkBuilder(DataStream<IN> input, 
TypeInformation<IN> typeInfo,
                        this.input = input;
                        this.typeInfo = typeInfo;
                        this.serializer = serializer;
+                       this.configBuilder = 
CassandraSinkBaseConfig.newBuilder();
                }
 
                /**
@@ -367,6 +371,34 @@ protected Cluster buildCluster(Cluster.Builder builder) {
                        return this;
                }
 
+               /**
+                * Sets the maximum allowed number of concurrent requests for 
this sink.
+                *
+                * <p>This call has no effect if {@link 
CassandraSinkBuilder#enableWriteAheadLog()} is called.
+                *
+                * @param maxConcurrentRequests maximum number of concurrent 
requests allowed
+                * @param timeout timeout duration when acquiring a permit to 
execute
+                * @return this builder
+                */
+               public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int 
maxConcurrentRequests, Duration timeout) {
+                       
this.configBuilder.setMaxConcurrentRequests(maxConcurrentRequests);
+                       
this.configBuilder.setMaxConcurrentRequestsTimeout(timeout);
+                       return this;
+               }
+
+               /**
+                * Sets the maximum allowed number of concurrent requests for 
this sink.
+                *
+                * <p>This call has no effect if {@link 
CassandraSinkBuilder#enableWriteAheadLog()} is called.
+                *
+                * @param maxConcurrentRequests maximum number of concurrent 
requests allowed
+                * @return this builder
+                */
+               public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int 
maxConcurrentRequests) {
+                       
this.configBuilder.setMaxConcurrentRequests(maxConcurrentRequests);
+                       return this;
+               }
+
                /**
                 * Finalizes the configuration of this sink.
                 *
@@ -416,7 +448,12 @@ protected void sanityCheck() {
 
                @Override
                public CassandraSink<IN> createSink() throws Exception {
-                       return new CassandraSink<>(input.addSink(new 
CassandraTupleSink<IN>(query, builder, failureHandler)).name("Cassandra Sink"));
+                       final CassandraTupleSink<IN> sink = new 
CassandraTupleSink<>(
+                               query,
+                               builder,
+                               configBuilder.build(),
+                               failureHandler);
+                       return new 
CassandraSink<>(input.addSink(sink).name("Cassandra Sink"));
                }
 
                @Override
@@ -448,8 +485,13 @@ protected void sanityCheck() {
 
                @Override
                protected CassandraSink<Row> createSink() throws Exception {
-                       return new CassandraSink<>(input.addSink(new 
CassandraRowSink(typeInfo.getArity(), query, builder, 
failureHandler)).name("Cassandra Sink"));
-
+                       final CassandraRowSink sink = new CassandraRowSink(
+                               typeInfo.getArity(),
+                               query,
+                               builder,
+                               configBuilder.build(),
+                               failureHandler);
+                       return new 
CassandraSink<>(input.addSink(sink).name("Cassandra Sink"));
                }
 
                @Override
@@ -479,7 +521,14 @@ protected void sanityCheck() {
 
                @Override
                public CassandraSink<IN> createSink() throws Exception {
-                       return new CassandraSink<>(input.addSink(new 
CassandraPojoSink<>(typeInfo.getTypeClass(), builder, mapperOptions, keyspace, 
failureHandler)).name("Cassandra Sink"));
+                       final CassandraPojoSink<IN> sink = new 
CassandraPojoSink<>(
+                               typeInfo.getTypeClass(),
+                               builder,
+                               mapperOptions,
+                               keyspace,
+                               configBuilder.build(),
+                               failureHandler);
+                       return new 
CassandraSink<>(input.addSink(sink).name("Cassandra Sink"));
                }
 
                @Override
@@ -493,7 +542,6 @@ protected void sanityCheck() {
         * @param <IN>
         */
        public static class CassandraScalaProductSinkBuilder<IN extends 
Product> extends CassandraSinkBuilder<IN> {
-
                public CassandraScalaProductSinkBuilder(DataStream<IN> input, 
TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
                        super(input, typeInfo, serializer);
                }
@@ -511,7 +559,12 @@ protected void sanityCheck() {
 
                @Override
                public CassandraSink<IN> createSink() throws Exception {
-                       return new CassandraSink<>(input.addSink(new 
CassandraScalaProductSink<IN>(query, builder, failureHandler)).name("Cassandra 
Sink"));
+                       final CassandraScalaProductSink<IN> sink = new 
CassandraScalaProductSink<>(
+                               query,
+                               builder,
+                               configBuilder.build(),
+                               failureHandler);
+                       return new 
CassandraSink<>(input.addSink(sink).name("Cassandra Sink"));
                }
 
                @Override
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index d24347ec89d..f18e256ed56 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -24,6 +24,7 @@
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
@@ -33,9 +34,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} 
and {@link
@@ -45,21 +47,23 @@
  */
 public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> 
implements CheckpointedFunction {
        protected final Logger log = LoggerFactory.getLogger(getClass());
+
        protected transient Cluster cluster;
        protected transient Session session;
 
-       protected transient volatile Throwable exception;
-       protected transient FutureCallback<V> callback;
+       private AtomicReference<Throwable> throwable;
+       private FutureCallback<V> callback;
+       private Semaphore semaphore;
 
        private final ClusterBuilder builder;
-
-       private final AtomicInteger updatesPending = new AtomicInteger();
+       private final CassandraSinkBaseConfig config;
 
        private final CassandraFailureHandler failureHandler;
 
-       CassandraSinkBase(ClusterBuilder builder, CassandraFailureHandler 
failureHandler) {
+       CassandraSinkBase(ClusterBuilder builder, CassandraSinkBaseConfig 
config, CassandraFailureHandler failureHandler) {
                this.builder = builder;
-               this.failureHandler = checkNotNull(failureHandler);
+               this.config = config;
+               this.failureHandler = 
Preconditions.checkNotNull(failureHandler);
                ClosureCleaner.clean(builder, true);
        }
 
@@ -68,50 +72,28 @@ public void open(Configuration configuration) {
                this.callback = new FutureCallback<V>() {
                        @Override
                        public void onSuccess(V ignored) {
-                               int pending = updatesPending.decrementAndGet();
-                               if (pending == 0) {
-                                       synchronized (updatesPending) {
-                                               updatesPending.notifyAll();
-                                       }
-                               }
+                               semaphore.release();
                        }
 
                        @Override
                        public void onFailure(Throwable t) {
-                               int pending = updatesPending.decrementAndGet();
-                               if (pending == 0) {
-                                       synchronized (updatesPending) {
-                                               updatesPending.notifyAll();
-                                       }
-                               }
-                               exception = t;
-
+                               throwable.compareAndSet(null, t);
                                log.error("Error while sending value.", t);
+                               semaphore.release();
                        }
                };
                this.cluster = builder.getCluster();
                this.session = createSession();
-       }
-
-       protected Session createSession() {
-               return cluster.connect();
-       }
 
-       @Override
-       public void invoke(IN value) throws Exception {
-               checkAsyncErrors();
-               ListenableFuture<V> result = send(value);
-               updatesPending.incrementAndGet();
-               Futures.addCallback(result, callback);
+               throwable = new AtomicReference<>();
+               semaphore = new Semaphore(config.getMaxConcurrentRequests());
        }
 
-       public abstract ListenableFuture<V> send(IN value);
-
        @Override
        public void close() throws Exception {
                try {
                        checkAsyncErrors();
-                       waitForPendingUpdates();
+                       flush();
                        checkAsyncErrors();
                } finally {
                        try {
@@ -138,29 +120,55 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
        @Override
        public void snapshotState(FunctionSnapshotContext ctx) throws Exception 
{
                checkAsyncErrors();
-               waitForPendingUpdates();
+               flush();
                checkAsyncErrors();
        }
 
-       private void waitForPendingUpdates() throws InterruptedException {
-               synchronized (updatesPending) {
-                       while (updatesPending.get() > 0) {
-                               updatesPending.wait();
-                       }
+       @Override
+       public void invoke(IN value) throws Exception {
+               checkAsyncErrors();
+               tryAcquire();
+               final ListenableFuture<V> result = send(value);
+               Futures.addCallback(result, callback);
+       }
+
+       protected Session createSession() {
+               return cluster.connect();
+       }
+
+       public abstract ListenableFuture<V> send(IN value);
+
+       private void tryAcquire() throws InterruptedException, TimeoutException 
{
+               if 
(!semaphore.tryAcquire(config.getMaxConcurrentRequestsTimeout().toMillis(), 
TimeUnit.MILLISECONDS)) {
+                       throw new TimeoutException(
+                               String.format(
+                                       "Failed to acquire 1 permit of %d to 
send value in %s.",
+                                       config.getMaxConcurrentRequests(),
+                                       config.getMaxConcurrentRequestsTimeout()
+                               )
+                       );
                }
        }
 
        private void checkAsyncErrors() throws Exception {
-               Throwable error = exception;
-               if (error != null) {
-                       // prevent throwing duplicated error
-                       exception = null;
-                       failureHandler.onFailure(error);
+               final Throwable currentError = throwable.getAndSet(null);
+               if (currentError != null) {
+                       failureHandler.onFailure(currentError);
                }
        }
 
+       private void flush() {
+               
semaphore.acquireUninterruptibly(config.getMaxConcurrentRequests());
+               semaphore.release(config.getMaxConcurrentRequests());
+       }
+
+       @VisibleForTesting
+       int getAvailablePermits() {
+               return semaphore.availablePermits();
+       }
+
        @VisibleForTesting
-       int getNumOfPendingRecords() {
-               return updatesPending.get();
+       int getAcquiredPermits() {
+               return config.getMaxConcurrentRequests() - 
semaphore.availablePermits();
        }
 }
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
new file mode 100644
index 00000000000..cb8d904fbf9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * Configuration for {@link CassandraSinkBase}.
+ */
+public final class CassandraSinkBaseConfig implements Serializable  {
+       // ------------------------ Default Configurations 
------------------------
+
+       /**
+        * The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+        */
+       public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+       /**
+        * The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+        */
+       public static final Duration DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Duration.ofMillis(Long.MAX_VALUE);
+
+       // ------------------------- Configuration Fields 
-------------------------
+
+       /** Maximum number of concurrent requests allowed. */
+       private final int maxConcurrentRequests;
+
+       /** Timeout duration when acquiring a permit to execute. */
+       private final Duration maxConcurrentRequestsTimeout;
+
+       private CassandraSinkBaseConfig(
+                       int maxConcurrentRequests,
+                       Duration maxConcurrentRequestsTimeout) {
+               Preconditions.checkArgument(maxConcurrentRequests > 0,
+                       "Max concurrent requests is expected to be positive");
+               Preconditions.checkNotNull(maxConcurrentRequestsTimeout,
+                       "Max concurrent requests timeout cannot be null");
+               
Preconditions.checkArgument(!maxConcurrentRequestsTimeout.isNegative(),
+                       "Max concurrent requests timeout is expected to be 
positive");
+               this.maxConcurrentRequests = maxConcurrentRequests;
+               this.maxConcurrentRequestsTimeout = 
maxConcurrentRequestsTimeout;
+       }
+
+       public int getMaxConcurrentRequests() {
+               return maxConcurrentRequests;
+       }
+
+       public Duration getMaxConcurrentRequestsTimeout() {
+               return maxConcurrentRequestsTimeout;
+       }
+
+       @Override
+       public String toString() {
+               return "CassandraSinkBaseConfig{" +
+                       "maxConcurrentRequests=" + maxConcurrentRequests +
+                       ", maxConcurrentRequestsTimeout=" + 
maxConcurrentRequestsTimeout +
+                       '}';
+       }
+
+       public static Builder newBuilder() {
+               return new Builder();
+       }
+
+       /**
+        * Builder for the {@link CassandraSinkBaseConfig}.
+        */
+       public static class Builder {
+               private int maxConcurrentRequests = 
DEFAULT_MAX_CONCURRENT_REQUESTS;
+               private Duration maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+
+               Builder() { }
+
+               public Builder setMaxConcurrentRequests(int 
maxConcurrentRequests) {
+                       this.maxConcurrentRequests = maxConcurrentRequests;
+                       return this;
+               }
+
+               public Builder setMaxConcurrentRequestsTimeout(Duration 
timeout) {
+                       this.maxConcurrentRequestsTimeout = timeout;
+                       return this;
+               }
+
+               public CassandraSinkBaseConfig build() {
+                       return new CassandraSinkBaseConfig(
+                               maxConcurrentRequests,
+                               maxConcurrentRequestsTimeout);
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
index 116acfd8569..4164bce6afd 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
@@ -25,12 +25,25 @@
  * @param <IN> Type of the elements emitted by this sink, it must extend 
{@link Tuple}
  */
 public class CassandraTupleSink<IN extends Tuple> extends 
AbstractCassandraTupleSink<IN> {
-       public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
-               this(insertQuery, builder, new NoOpCassandraFailureHandler());
+       public CassandraTupleSink(
+                       String insertQuery,
+                       ClusterBuilder builder) {
+               this(insertQuery, builder, 
CassandraSinkBaseConfig.newBuilder().build());
        }
 
-       public CassandraTupleSink(String insertQuery, ClusterBuilder builder, 
CassandraFailureHandler failureHandler) {
-               super(insertQuery, builder, failureHandler);
+       CassandraTupleSink(
+                       String insertQuery,
+                       ClusterBuilder builder,
+                       CassandraSinkBaseConfig config) {
+               this(insertQuery, builder, config, new 
NoOpCassandraFailureHandler());
+       }
+
+       CassandraTupleSink(
+                       String insertQuery,
+                       ClusterBuilder builder,
+                       CassandraSinkBaseConfig config,
+                       CassandraFailureHandler failureHandler) {
+               super(insertQuery, builder, config, failureHandler);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index 8c66882611a..2b705a56e7b 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -22,11 +22,11 @@
 import org.apache.flink.queryablestate.FutureUtils;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -34,8 +34,14 @@
 import org.junit.Test;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
 
+import static org.hamcrest.number.OrderingComparison.greaterThan;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 
@@ -44,6 +50,8 @@
  */
 public class CassandraSinkBaseTest {
 
+       private static final long DEFAULT_TEST_TIMEOUT = 5000;
+
        @Test(expected = NoHostAvailableException.class)
        public void testHostNotFoundErrorHandling() throws Exception {
                CassandraSinkBase base = new CassandraSinkBase(new 
ClusterBuilder() {
@@ -54,7 +62,7 @@ protected Cluster buildCluster(Cluster.Builder builder) {
                                        .withoutJMXReporting()
                                        .withoutMetrics().build();
                        }
-               }, new NoOpCassandraFailureHandler()) {
+               }, CassandraSinkBaseConfig.newBuilder().build(), new 
NoOpCassandraFailureHandler()) {
                        @Override
                        public ListenableFuture send(Object value) {
                                return null;
@@ -64,166 +72,266 @@ public ListenableFuture send(Object value) {
                base.open(new Configuration());
        }
 
-       @Test(timeout = 5000)
+       @Test(timeout = DEFAULT_TEST_TIMEOUT)
        public void testSuccessfulPath() throws Exception {
-               TestCassandraSink casSinkFunc = new TestCassandraSink();
-               casSinkFunc.open(new Configuration());
+               try (TestCassandraSink casSinkFunc = 
createOpenedTestCassandraSink()) {
+                       
casSinkFunc.enqueueCompletableFuture(CompletableFuture.completedFuture(null));
 
-               
casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(CompletableFuture.completedFuture(null)));
-               casSinkFunc.invoke("hello");
+                       final int originalPermits = 
casSinkFunc.getAvailablePermits();
+                       Assert.assertThat(originalPermits, greaterThan(0));
+                       Assert.assertEquals(0, 
casSinkFunc.getAcquiredPermits());
 
-               Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+                       casSinkFunc.invoke("hello");
 
-               casSinkFunc.close();
+                       Assert.assertEquals(originalPermits, 
casSinkFunc.getAvailablePermits());
+                       Assert.assertEquals(0, 
casSinkFunc.getAcquiredPermits());
+               }
        }
 
-       @Test(timeout = 5000)
+       @Test(timeout = DEFAULT_TEST_TIMEOUT)
        public void testThrowErrorOnClose() throws Exception {
                TestCassandraSink casSinkFunc = new TestCassandraSink();
 
                casSinkFunc.open(new Configuration());
 
                Exception cause = new RuntimeException();
-               
casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+               
casSinkFunc.enqueueCompletableFuture(FutureUtils.getFailedFuture(cause));
                casSinkFunc.invoke("hello");
                try {
                        casSinkFunc.close();
 
                        Assert.fail("Close should have thrown an exception.");
                } catch (IOException e) {
-                       Assert.assertEquals(cause, e.getCause());
-                       Assert.assertEquals(0, 
casSinkFunc.getNumOfPendingRecords());
+                       ExceptionUtils.findThrowable(e, candidate -> candidate 
== cause)
+                               .orElseThrow(() -> e);
                }
        }
 
-       @Test(timeout = 5000)
+       @Test(timeout = DEFAULT_TEST_TIMEOUT)
        public void testThrowErrorOnInvoke() throws Exception {
-               TestCassandraSink casSinkFunc = new TestCassandraSink();
-
-               casSinkFunc.open(new Configuration());
-
-               Exception cause = new RuntimeException();
-               
casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
-
-               casSinkFunc.invoke("hello");
-
-               try {
-                       casSinkFunc.invoke("world");
-                       Assert.fail("Sending of second value should have 
failed.");
-               } catch (IOException e) {
-                       Assert.assertEquals(cause, e.getCause());
-                       Assert.assertEquals(0, 
casSinkFunc.getNumOfPendingRecords());
+               try (TestCassandraSink casSinkFunc = 
createOpenedTestCassandraSink()) {
+                       Exception cause = new RuntimeException();
+                       
casSinkFunc.enqueueCompletableFuture(FutureUtils.getFailedFuture(cause));
+
+                       casSinkFunc.invoke("hello");
+
+                       try {
+                               casSinkFunc.invoke("world");
+                               Assert.fail("Sending of second value should 
have failed.");
+                       } catch (IOException e) {
+                               Assert.assertEquals(cause, e.getCause());
+                               Assert.assertEquals(0, 
casSinkFunc.getAcquiredPermits());
+                       }
                }
        }
 
-       @Test(timeout = 5000)
+       @Test(timeout = DEFAULT_TEST_TIMEOUT)
        public void testIgnoreError() throws Exception {
                Exception cause = new RuntimeException();
                CassandraFailureHandler failureHandler = failure -> 
Assert.assertEquals(cause, failure);
-               TestCassandraSink casSinkFunc = new 
TestCassandraSink(failureHandler);
 
-               casSinkFunc.open(new Configuration());
+               try (TestCassandraSink casSinkFunc = 
createOpenedTestCassandraSink(failureHandler)) {
 
-               
casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+                       
casSinkFunc.enqueueCompletableFuture(FutureUtils.getFailedFuture(cause));
+                       
casSinkFunc.enqueueCompletableFuture(FutureUtils.getFailedFuture(cause));
 
-               casSinkFunc.invoke("hello");
-               casSinkFunc.invoke("world");
+                       casSinkFunc.invoke("hello");
+                       casSinkFunc.invoke("world");
+               }
        }
 
-       @Test(timeout = 5000)
+       @Test(timeout = DEFAULT_TEST_TIMEOUT)
        public void testThrowErrorOnSnapshot() throws Exception {
                TestCassandraSink casSinkFunc = new TestCassandraSink();
 
-               OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                       new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(casSinkFunc));
+               try (OneInputStreamOperatorTestHarness<String, Object> 
testHarness = createOpenedTestHarness(casSinkFunc)) {
+                       Exception cause = new RuntimeException();
+                       
casSinkFunc.enqueueCompletableFuture(FutureUtils.getFailedFuture(cause));
 
-               testHarness.open();
+                       casSinkFunc.invoke("hello");
 
-               Exception cause = new RuntimeException();
-               
casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+                       try {
+                               testHarness.snapshot(123L, 123L);
 
-               casSinkFunc.invoke("hello");
+                               Assert.fail();
+                       } catch (Exception e) {
+                               Assert.assertTrue(e.getCause() instanceof 
IOException);
+                       }
+               }
+       }
 
-               try {
-                       testHarness.snapshot(123L, 123L);
+       @Test(timeout = DEFAULT_TEST_TIMEOUT)
+       public void testWaitForPendingUpdatesOnSnapshot() throws Exception {
+               final TestCassandraSink casSinkFunc = new TestCassandraSink();
 
-                       Assert.fail();
-               } catch (Exception e) {
-                       Assert.assertTrue(e.getCause() instanceof IOException);
-                       Assert.assertEquals(cause, e.getCause().getCause());
-                       Assert.assertEquals(0, 
casSinkFunc.getNumOfPendingRecords());
-               }
+               try (OneInputStreamOperatorTestHarness<String, Object> 
testHarness = createOpenedTestHarness(casSinkFunc)) {
+                       CompletableFuture<ResultSet> completableFuture = new 
CompletableFuture<>();
+                       casSinkFunc.enqueueCompletableFuture(completableFuture);
+
+                       casSinkFunc.invoke("hello");
+                       Assert.assertEquals(1, 
casSinkFunc.getAcquiredPermits());
+
+                       final CountDownLatch latch = new CountDownLatch(1);
+                       Thread t = new 
CheckedThread("Flink-CassandraSinkBaseTest") {
+                               @Override
+                               public void go() throws Exception {
+                                       testHarness.snapshot(123L, 123L);
+                                       latch.countDown();
+                               }
+                       };
+                       t.start();
+                       while (t.getState() != Thread.State.WAITING) {
+                               Thread.sleep(5);
+                       }
 
-               testHarness.close();
+                       Assert.assertEquals(1, 
casSinkFunc.getAcquiredPermits());
+                       completableFuture.complete(null);
+                       latch.await();
+                       Assert.assertEquals(0, 
casSinkFunc.getAcquiredPermits());
+               }
        }
 
-       @Test(timeout = 5000)
-       public void testWaitForPendingUpdatesOnSnapshot() throws Exception {
+       @Test(timeout = DEFAULT_TEST_TIMEOUT)
+       public void testWaitForPendingUpdatesOnClose() throws Exception {
                TestCassandraSink casSinkFunc = new TestCassandraSink();
 
-               OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                       new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(casSinkFunc));
+               try (OneInputStreamOperatorTestHarness<String, Object> 
testHarness = createOpenedTestHarness(casSinkFunc)) {
 
-               testHarness.open();
-
-               CompletableFuture<ResultSet> completableFuture = new 
CompletableFuture<>();
-               ResultSetFuture resultSetFuture = 
ResultSetFutures.fromCompletableFuture(completableFuture);
-               casSinkFunc.setResultFuture(resultSetFuture);
+                       CompletableFuture<ResultSet> completableFuture = new 
CompletableFuture<>();
+                       casSinkFunc.enqueueCompletableFuture(completableFuture);
 
-               casSinkFunc.invoke("hello");
-               Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+                       casSinkFunc.invoke("hello");
+                       Assert.assertEquals(1, 
casSinkFunc.getAcquiredPermits());
 
-               Thread t = new CheckedThread("Flink-CassandraSinkBaseTest") {
-                       @Override
-                       public void go() throws Exception {
-                               testHarness.snapshot(123L, 123L);
+                       final CountDownLatch latch = new CountDownLatch(1);
+                       Thread t = new 
CheckedThread("Flink-CassandraSinkBaseTest") {
+                               @Override
+                               public void go() throws Exception {
+                                       testHarness.close();
+                                       latch.countDown();
+                               }
+                       };
+                       t.start();
+                       while (t.getState() != Thread.State.WAITING) {
+                               Thread.sleep(5);
                        }
-               };
-               t.start();
-               while (t.getState() != Thread.State.WAITING) {
-                       Thread.sleep(5);
+
+                       Assert.assertEquals(1, 
casSinkFunc.getAcquiredPermits());
+                       completableFuture.complete(null);
+                       latch.await();
+                       Assert.assertEquals(0, 
casSinkFunc.getAcquiredPermits());
                }
+       }
+
+       @Test(timeout = DEFAULT_TEST_TIMEOUT)
+       public void testReleaseOnSuccess() throws Exception {
+               final CassandraSinkBaseConfig config = 
CassandraSinkBaseConfig.newBuilder()
+                       .setMaxConcurrentRequests(1)
+                       .build();
+
+               try (TestCassandraSink testCassandraSink = 
createOpenedTestCassandraSink(config)) {
+                       Assert.assertEquals(1, 
testCassandraSink.getAvailablePermits());
+                       Assert.assertEquals(0, 
testCassandraSink.getAcquiredPermits());
+
+                       CompletableFuture<ResultSet> completableFuture = new 
CompletableFuture<>();
+                       
testCassandraSink.enqueueCompletableFuture(completableFuture);
+                       testCassandraSink.invoke("N/A");
+
+                       Assert.assertEquals(0, 
testCassandraSink.getAvailablePermits());
+                       Assert.assertEquals(1, 
testCassandraSink.getAcquiredPermits());
 
-               Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
-               completableFuture.complete(null);
-               Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+                       completableFuture.complete(null);
 
-               testHarness.close();
+                       Assert.assertEquals(1, 
testCassandraSink.getAvailablePermits());
+                       Assert.assertEquals(0, 
testCassandraSink.getAcquiredPermits());
+               }
        }
 
-       @Test(timeout = 5000)
-       public void testWaitForPendingUpdatesOnClose() throws Exception {
-               TestCassandraSink casSinkFunc = new TestCassandraSink();
+       @Test(timeout = DEFAULT_TEST_TIMEOUT)
+       public void testReleaseOnFailure() throws Exception {
+               final CassandraSinkBaseConfig config = 
CassandraSinkBaseConfig.newBuilder()
+                       .setMaxConcurrentRequests(1)
+                       .build();
+               final CassandraFailureHandler failureHandler = ignored -> {};
 
-               OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                       new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(casSinkFunc));
+               try (TestCassandraSink testCassandraSink = 
createOpenedTestCassandraSink(config, failureHandler)) {
+                       Assert.assertEquals(1, 
testCassandraSink.getAvailablePermits());
+                       Assert.assertEquals(0, 
testCassandraSink.getAcquiredPermits());
 
-               testHarness.open();
+                       CompletableFuture<ResultSet> completableFuture = new 
CompletableFuture<>();
+                       
testCassandraSink.enqueueCompletableFuture(completableFuture);
+                       testCassandraSink.invoke("N/A");
 
-               CompletableFuture<ResultSet> completableFuture = new 
CompletableFuture<>();
-               ResultSetFuture resultSetFuture = 
ResultSetFutures.fromCompletableFuture(completableFuture);
-               casSinkFunc.setResultFuture(resultSetFuture);
+                       Assert.assertEquals(0, 
testCassandraSink.getAvailablePermits());
+                       Assert.assertEquals(1, 
testCassandraSink.getAcquiredPermits());
 
-               casSinkFunc.invoke("hello");
-               Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+                       completableFuture.completeExceptionally(new 
RuntimeException());
 
-               Thread t = new CheckedThread("Flink-CassandraSinkBaseTest") {
-                       @Override
-                       public void go() throws Exception {
-                               testHarness.close();
+                       Assert.assertEquals(1, 
testCassandraSink.getAvailablePermits());
+                       Assert.assertEquals(0, 
testCassandraSink.getAcquiredPermits());
+               }
+       }
+
+       @Test(timeout = DEFAULT_TEST_TIMEOUT)
+       public void testTimeoutExceptionOnInvoke() throws Exception {
+               final CassandraSinkBaseConfig config = 
CassandraSinkBaseConfig.newBuilder()
+                       .setMaxConcurrentRequests(1)
+                       .setMaxConcurrentRequestsTimeout(Duration.ofMillis(1))
+                       .build();
+
+               try (TestCassandraSink testCassandraSink = 
createOpenedTestCassandraSink(config)) {
+                       CompletableFuture<ResultSet> completableFuture = new 
CompletableFuture<>();
+                       
testCassandraSink.enqueueCompletableFuture(completableFuture);
+                       
testCassandraSink.enqueueCompletableFuture(completableFuture);
+                       testCassandraSink.invoke("Invoke #1");
+
+                       try {
+                               testCassandraSink.invoke("Invoke #2");
+                               Assert.fail("Sending value should have 
experienced a TimeoutException");
+                       } catch (Exception e) {
+                               Assert.assertTrue(e instanceof 
TimeoutException);
+                       } finally {
+                               completableFuture.complete(null);
                        }
-               };
-               t.start();
-               while (t.getState() != Thread.State.WAITING) {
-                       Thread.sleep(5);
                }
+       }
 
-               Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
-               completableFuture.complete(null);
-               Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+       private TestCassandraSink createOpenedTestCassandraSink() {
+               final TestCassandraSink testCassandraSink = new 
TestCassandraSink();
+               testCassandraSink.open(new Configuration());
+               return testCassandraSink;
        }
 
-       private static class TestCassandraSink extends 
CassandraSinkBase<String, ResultSet> {
+       private TestCassandraSink 
createOpenedTestCassandraSink(CassandraFailureHandler failureHandler) {
+               final TestCassandraSink testCassandraSink = new 
TestCassandraSink(failureHandler);
+               testCassandraSink.open(new Configuration());
+               return testCassandraSink;
+       }
+
+       private TestCassandraSink 
createOpenedTestCassandraSink(CassandraSinkBaseConfig config) {
+               final TestCassandraSink testCassandraSink = new 
TestCassandraSink(config);
+               testCassandraSink.open(new Configuration());
+               return testCassandraSink;
+       }
+
+       private TestCassandraSink createOpenedTestCassandraSink(
+               CassandraSinkBaseConfig config,
+               CassandraFailureHandler failureHandler) {
+               final TestCassandraSink testCassandraSink = new 
TestCassandraSink(config, failureHandler);
+               testCassandraSink.open(new Configuration());
+               return testCassandraSink;
+       }
+
+       private OneInputStreamOperatorTestHarness<String, Object> 
createOpenedTestHarness(
+               TestCassandraSink testCassandraSink) throws Exception {
+               final StreamSink<String> testStreamSink = new 
StreamSink<>(testCassandraSink);
+               final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                       new OneInputStreamOperatorTestHarness<>(testStreamSink);
+               testHarness.open();
+               return testHarness;
+       }
+
+       private static class TestCassandraSink extends 
CassandraSinkBase<String, ResultSet> implements AutoCloseable {
 
                private static final ClusterBuilder builder;
                private static final Cluster cluster;
@@ -243,24 +351,32 @@ protected Cluster buildCluster(Cluster.Builder builder) {
                        };
                }
 
-               private ResultSetFuture result;
+               private final Queue<ListenableFuture<ResultSet>> 
resultSetFutures = new LinkedList<>();
 
                TestCassandraSink() {
-                       super(builder, new NoOpCassandraFailureHandler());
+                       this(CassandraSinkBaseConfig.newBuilder().build());
+               }
+
+               TestCassandraSink(CassandraSinkBaseConfig config) {
+                       this(config, new NoOpCassandraFailureHandler());
                }
 
                TestCassandraSink(CassandraFailureHandler failureHandler) {
-                       super(builder, failureHandler);
+                       this(CassandraSinkBaseConfig.newBuilder().build(), 
failureHandler);
                }
 
-               void setResultFuture(ResultSetFuture result) {
-                       Preconditions.checkNotNull(result);
-                       this.result = result;
+               TestCassandraSink(CassandraSinkBaseConfig config, 
CassandraFailureHandler failureHandler) {
+                       super(builder, config, failureHandler);
                }
 
                @Override
                public ListenableFuture<ResultSet> send(String value) {
-                       return result;
+                       return resultSetFutures.poll();
+               }
+
+               void enqueueCompletableFuture(CompletableFuture<ResultSet> 
completableFuture) {
+                       Preconditions.checkNotNull(completableFuture);
+                       
resultSetFutures.offer(ResultSetFutures.fromCompletableFuture(completableFuture));
                }
        }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to