alpreu commented on a change in pull request #17374:
URL: https://github.com/apache/flink/pull/17374#discussion_r724221489



##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java
##########
@@ -20,20 +20,91 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.elasticsearch.sink.FlushBackoffType;
 import org.apache.flink.table.api.ValidationException;
 
 import org.apache.http.HttpHost;
 
+import java.time.Duration;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.USERNAME_OPTION;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Elasticsearch 7 specific configuration. */
 @Internal
-final class Elasticsearch7Configuration extends ElasticsearchConfiguration {
+final class Elasticsearch7Configuration {
+    protected final ReadableConfig config;
+    private final ClassLoader classLoader;
+
     Elasticsearch7Configuration(ReadableConfig config, ClassLoader 
classLoader) {
-        super(config, classLoader);
+        this.config = checkNotNull(config);
+        this.classLoader = checkNotNull(classLoader);
+    }
+
+    public int getBulkFlushMaxActions() {
+        int maxActions = 
config.get(ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+        // convert 0 to -1, because Elasticsearch client use -1 to disable 
this configuration.
+        return maxActions == 0 ? -1 : maxActions;
+    }
+
+    public long getBulkFlushMaxByteSize() {
+        long maxSize =
+                
config.get(ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+        // convert 0 to -1, because Elasticsearch client use -1 to disable 
this configuration.
+        return maxSize == 0 ? -1 : maxSize;
+    }
+
+    public long getBulkFlushInterval() {
+        long interval = config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+        // convert 0 to -1, because Elasticsearch client use -1 to disable 
this configuration.
+        return interval == 0 ? -1 : interval;

Review comment:
       Good catch, they are also checked and documented in the 
ElasticsearchSinkBuilder




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to