zentol commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r235143376
########## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +/** + * Configuration for {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseConfig implements Serializable { + // ------------------------ Default Configurations ------------------------ + + /** + * The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. + */ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** + * The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. + */ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** + * The default timeout unit when acquiring a permit to execute. By default, milliseconds. + */ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // ------------------------- Configuration Fields ------------------------- + + /** Maximum number of concurrent requests allowed. */ + private final int maxConcurrentRequests; + + /** Timeout duration when acquiring a permit to execute. */ + private final long maxConcurrentRequestsTimeout; + + /** Timeout unit when acquiring a permit to execute. */ + private final TimeUnit maxConcurrentRequestsTimeoutUnit; + + public CassandraSinkBaseConfig( + int maxConcurrentRequests, + long maxConcurrentRequestsTimeout, + TimeUnit maxConcurrentRequestsTimeoutUnit) { + Preconditions.checkArgument(maxConcurrentRequests >= 0, + "Max concurrent requests is expected to be positive"); + Preconditions.checkArgument(maxConcurrentRequestsTimeout >= 0, + "Max concurrent requests timeout is expected to be positive"); + Preconditions.checkNotNull(maxConcurrentRequestsTimeoutUnit, + "Max concurrent requests timeout unit cannot be null"); + this.maxConcurrentRequests = maxConcurrentRequests; + this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout; + this.maxConcurrentRequestsTimeoutUnit = maxConcurrentRequestsTimeoutUnit; + } + + public int getMaxConcurrentRequests() { + return maxConcurrentRequests; + } + + public long getMaxConcurrentRequestsTimeout() { + return maxConcurrentRequestsTimeout; + } + + public TimeUnit getMaxConcurrentRequestsTimeoutUnit() { + return maxConcurrentRequestsTimeoutUnit; + } + + @Override + public String toString() { + return "CassandraSinkBaseConfig{" + + "maxConcurrentRequests=" + maxConcurrentRequests + + ", maxConcurrentRequestsTimeout=" + maxConcurrentRequestsTimeout + + ", maxConcurrentRequestsTimeoutUnit=" + maxConcurrentRequestsTimeoutUnit + + '}'; + } + + public static Builder newBuilder() { Review comment: either the constructor should be private or this method removed; there's no value in having 2 ways of creating the builder. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services