zentol commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r235142071
########## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ########## @@ -127,35 +144,112 @@ public void close() throws Exception { } @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - } + public void initializeState(FunctionInitializationContext context) throws Exception {} @Override - public void snapshotState(FunctionSnapshotContext ctx) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { checkAsyncErrors(); - waitForPendingUpdates(); + flush(); checkAsyncErrors(); } - private void waitForPendingUpdates() throws InterruptedException { - synchronized (updatesPending) { - while (updatesPending.get() > 0) { - updatesPending.wait(); + @Override + public void invoke(IN value) throws Exception { + checkAsyncErrors(); + tryAcquire(); + final ListenableFuture<V> result = send(value); + Futures.addCallback(result, new FutureCallback<V>() { + @Override + public void onSuccess(V ignored) { + release(); } + + @Override + public void onFailure(Throwable currentError) { + setAsyncErrors(currentError); + release(); + } + }); + } + + // ----------------------- User-Defined Sink Methods ---------------------- + + public abstract ListenableFuture<V> send(IN value); + + // ------------------------- Configuration Methods ------------------------ + + /** + * Sets the maximum allowed number of concurrent requests for this sink. + * + * @param maxConcurrentRequests maximum number of concurrent requests allowed + * @param timeout timeout duration when acquiring a permit to execute + * @param unit timeout unit when acquiring a permit to execute + */ + public void setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit unit) { + Preconditions.checkArgument(maxConcurrentRequests >= 0, "maxConcurrentRequests cannot be negative."); + Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative."); + this.maxConcurrentRequests = maxConcurrentRequests; + this.maxConcurrentRequestsTimeout = timeout; + this.maxConcurrentRequestsTimeoutUnit = unit; + } + + // --------------------------- Cassandra Methods -------------------------- + + protected Cluster createCluster() { Review comment: This methods seems unnecessary. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services