hlteoh37 commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/22#discussion_r2124415562


##########
flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java:
##########
@@ -98,6 +98,11 @@ private AwsCredentialsProvider getCredentialsProvider() {
         return credentialsProvider;
     }
 
+    @VisibleForTesting
+    String getAwsRegion() {
+        return awsRegion;
+    }
+

Review Comment:
   This is a bit of a smell, since we are exposing internal object details to 
test the Factory method. Can we instead test the factory method directly?



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java:
##########
@@ -18,36 +18,13 @@
 package org.apache.flink.connector.prometheus.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.base.sink.AsyncSinkBase;
-import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
-import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics;
-import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
 import org.apache.flink.connector.prometheus.sink.prometheus.Types;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Collection;
 
 /** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
 @PublicEvolving
-public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, 
Types.TimeSeries> {
-    private final String prometheusRemoteWriteUrl;
-    private final PrometheusAsyncHttpClientBuilder clientBuilder;
-    private final PrometheusRequestSigner requestSigner;
-    private final int maxBatchSizeInSamples;
-    private final String httpUserAgent;
-    private final 
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-            errorHandlingBehaviorConfig;
-    private final String metricGroupName;
-
-    @SuppressWarnings("checkstyle:RegexpSingleline")
+public class PrometheusSink extends PrometheusSinkBase<PrometheusTimeSeries> {

Review Comment:
   Why don't we take the hit and simply propose a major version bump to 
implement a generic `PrometheusSink`? That way we remove the need to maintain 
two coupled public interfaces (we see this coupling from the 
`castToPrometheusSink()` public method in the `PrometheusSinkBase`... which is 
a big smell! Since the parent class creates an instance of the child class...)



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/RowDataToPrometheusTimeSeriesConverter.java:
##########
@@ -0,0 +1,116 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * Converts from Flink Table API internal type of {@link RowData} to {@link 
PrometheusTimeSeries}.
+ */
+@Internal
+public class RowDataToPrometheusTimeSeriesConverter {
+
+    private final DataType physicalDataType;
+    private final PrometheusConfig prometheusConfig;
+
+    public RowDataToPrometheusTimeSeriesConverter(
+            DataType physicalDataType, PrometheusConfig prometheusConfig) {
+        this.physicalDataType = physicalDataType;
+        this.prometheusConfig = prometheusConfig;
+    }
+
+    public PrometheusTimeSeries convertRowData(RowData row) {
+        List<DataTypes.Field> fields = DataType.getFields(physicalDataType);
+
+        PrometheusTimeSeries.Builder builder = PrometheusTimeSeries.builder();
+        Double sampleValue = null;
+        Long sampleTimestamp = null;
+
+        for (int i = 0; i < fields.size(); i++) {
+            DataTypes.Field field = fields.get(i);
+            RowData.FieldGetter fieldGetter =
+                    
createFieldGetter(fields.get(i).getDataType().getLogicalType(), i);
+            FieldValue fieldValue = new 
FieldValue(fieldGetter.getFieldOrNull(row));
+            String fieldName = field.getName();
+
+            if (fieldName.equals(prometheusConfig.getMetricName())) {
+                builder.withMetricName(fieldValue.getStringValue());
+            } else if 
(fieldName.equals(prometheusConfig.getMetricSampleKey())) {
+                sampleValue = fieldValue.getDoubleValue();
+            } else if (prometheusConfig.getLabelKeys().contains(fieldName)) {
+                builder.addLabel(fieldName, fieldValue.getStringValue());
+            } else if 
(fieldName.equals(prometheusConfig.getMetricSampleTimestamp())) {
+                sampleTimestamp = fieldValue.getLongValue();
+            } else {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "%s is not a supported field, valid fields 
are: %s",

Review Comment:
   Is there a reason we don't just ignore the row in the `RowData` instead? 
e.g. user specifies a row in the table that is not used in the sink



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/RowDataToPrometheusTimeSeriesConverter.java:
##########
@@ -0,0 +1,116 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * Converts from Flink Table API internal type of {@link RowData} to {@link 
PrometheusTimeSeries}.
+ */
+@Internal
+public class RowDataToPrometheusTimeSeriesConverter {
+
+    private final DataType physicalDataType;
+    private final PrometheusConfig prometheusConfig;
+
+    public RowDataToPrometheusTimeSeriesConverter(
+            DataType physicalDataType, PrometheusConfig prometheusConfig) {
+        this.physicalDataType = physicalDataType;
+        this.prometheusConfig = prometheusConfig;
+    }
+
+    public PrometheusTimeSeries convertRowData(RowData row) {
+        List<DataTypes.Field> fields = DataType.getFields(physicalDataType);
+
+        PrometheusTimeSeries.Builder builder = PrometheusTimeSeries.builder();
+        Double sampleValue = null;
+        Long sampleTimestamp = null;
+
+        for (int i = 0; i < fields.size(); i++) {
+            DataTypes.Field field = fields.get(i);
+            RowData.FieldGetter fieldGetter =
+                    
createFieldGetter(fields.get(i).getDataType().getLogicalType(), i);
+            FieldValue fieldValue = new 
FieldValue(fieldGetter.getFieldOrNull(row));
+            String fieldName = field.getName();
+
+            if (fieldName.equals(prometheusConfig.getMetricName())) {
+                builder.withMetricName(fieldValue.getStringValue());
+            } else if 
(fieldName.equals(prometheusConfig.getMetricSampleKey())) {
+                sampleValue = fieldValue.getDoubleValue();
+            } else if (prometheusConfig.getLabelKeys().contains(fieldName)) {
+                builder.addLabel(fieldName, fieldValue.getStringValue());
+            } else if 
(fieldName.equals(prometheusConfig.getMetricSampleTimestamp())) {
+                sampleTimestamp = fieldValue.getLongValue();
+            } else {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "%s is not a supported field, valid fields 
are: %s",
+                                fieldName,
+                                Arrays.asList(
+                                        prometheusConfig.getMetricName(),
+                                        prometheusConfig.getLabelKeys(),
+                                        prometheusConfig.getMetricSampleKey(),
+                                        
prometheusConfig.getMetricSampleTimestamp())));
+            }
+        }
+
+        if (sampleValue != null && sampleTimestamp != null) {
+            builder.addSample(sampleValue, sampleTimestamp);
+        } else {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Row is missing sampleValue field; %s or 
sampleTimestamp: %s",
+                            sampleValue, sampleTimestamp));
+        }
+
+        return builder.build();
+    }
+
+    private static class FieldValue {
+        private final Object value;
+
+        private FieldValue(Object value) {
+            this.value = value;
+        }
+
+        private String getStringValue() {
+            return value.toString();
+        }

Review Comment:
   Are there any unsupported types? What happens if the field provided is a 
`MAP` / `ARRAY` ? 
   
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#list-of-data-types



##########
flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactory.java:
##########
@@ -0,0 +1,36 @@
+package org.apache.flink.connector.prometheus.sink.aws;
+
+import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
+import org.apache.flink.connector.prometheus.table.PrometheusConfig;
+import 
org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class AmazonManagedPrometheusWriteRequestSignerFactory
+        implements PrometheusDynamicRequestSignerFactory {
+    @Override
+    public String requestSignerIdentifer() {
+        return "amazon-managed-prometheus";
+    }
+
+    @Override
+    public PrometheusRequestSigner getRequestSigner(PrometheusConfig config) {
+        return new AmazonManagedPrometheusWriteRequestSigner(
+                config.getRemoteWriteEndpointUrl(),
+                
getAwsRegionFromEndpointUrl(config.getRemoteWriteEndpointUrl()));
+    }
+
+    private String getAwsRegionFromEndpointUrl(String endpointUrl) {
+        String regex = "aps[a-zA-Z0-9-]*\\.([^.]+)\\.";
+        Pattern pattern = Pattern.compile(regex);

Review Comment:
   Let's make this static, instead of reinitialising on each request



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBase.java:
##########
@@ -0,0 +1,189 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collection;
+
+public class PrometheusSinkBase<IN> extends AsyncSinkBase<IN, 
Types.TimeSeries> {

Review Comment:
   This is `@PublicEvolving`. We will be extending the public interface for the 
sink repository 2x. 



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/PrometheusDynamicSink.java:
##########
@@ -0,0 +1,185 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder;
+import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
+import org.apache.flink.connector.prometheus.sink.PrometheusSinkBase;
+import org.apache.flink.connector.prometheus.sink.PrometheusSinkBaseBuilder;
+import org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.stream.Stream;
+
+public class PrometheusDynamicSink extends 
AsyncDynamicTableSink<Types.TimeSeries>

Review Comment:
   This is `@PublicEvolving`.



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBase.java:
##########
@@ -0,0 +1,189 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collection;
+
+public class PrometheusSinkBase<IN> extends AsyncSinkBase<IN, 
Types.TimeSeries> {
+    private final String prometheusRemoteWriteUrl;
+    private final PrometheusAsyncHttpClientBuilder clientBuilder;
+    private final PrometheusRequestSigner requestSigner;
+    private final int maxBatchSizeInSamples;
+    private final int maxRecordSizeInSamples;
+    private final String httpUserAgent;
+    private final 
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
+            errorHandlingBehaviorConfig;
+    private final String metricGroupName;
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    protected PrometheusSinkBase(
+            ElementConverter<IN, Types.TimeSeries> elementConverter,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            int maxBatchSizeInSamples,
+            int maxRecordSizeInSamples,
+            long maxTimeInBufferMS,
+            String prometheusRemoteWriteUrl,
+            PrometheusAsyncHttpClientBuilder clientBuilder,
+            PrometheusRequestSigner requestSigner,
+            String httpUserAgent,
+            
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
+                    errorHandlingBehaviorConfig,
+            String metricGroupName) {
+        // This sink batches in terms of "samples", because writes to 
Prometheus are better
+        // optimized in terms of samples. AsyncSinkBase handles batching and 
does not make any
+        // assumptions about the actual unit of "size", but parameters are 
named assuming this unit
+        // is "bytes".
+        super(
+                elementConverter,
+                maxBatchSizeInSamples, // maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInSamples, // maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInSamples // maxRecordSizeInBytes
+                );
+
+        Preconditions.checkArgument(
+                maxBatchSizeInSamples > 1, "Max batch size (in samples) must 
be positive");
+        Preconditions.checkArgument(
+                maxRecordSizeInSamples <= maxBatchSizeInSamples,
+                "Max record size (in samples) must be <= Max batch size");
+        Preconditions.checkArgument(maxInFlightRequests == 1, "Max in-flight 
requests must be 1");
+        Preconditions.checkArgument(
+                StringUtils.isNotBlank(prometheusRemoteWriteUrl),
+                "Missing or blank Prometheus Remote-Write URL");
+        checkValidRemoteWriteUrl(prometheusRemoteWriteUrl);
+        Preconditions.checkArgument(
+                StringUtils.isNotBlank(httpUserAgent), "Missing HTTP User 
Agent string");
+        Preconditions.checkNotNull(
+                errorHandlingBehaviorConfig, "Missing error handling 
configuration");
+        Preconditions.checkArgument(
+                StringUtils.isNotBlank(metricGroupName), "Missing metric group 
name");
+        this.maxBatchSizeInSamples = maxBatchSizeInSamples;
+        this.maxRecordSizeInSamples = maxRecordSizeInSamples;
+        this.requestSigner = requestSigner;
+        this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
+        this.clientBuilder = clientBuilder;
+        this.httpUserAgent = httpUserAgent;
+        this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig;
+        this.metricGroupName = metricGroupName;
+    }
+
+    @Override
+    public StatefulSinkWriter<IN, BufferedRequestState<Types.TimeSeries>> 
createWriter(
+            InitContext initContext) {
+        SinkMetricsCallback metricsCallback =
+                new SinkMetricsCallback(
+                        SinkMetrics.registerSinkMetrics(
+                                
initContext.metricGroup().addGroup(metricGroupName)));
+        CloseableHttpAsyncClient asyncHttpClient =
+                clientBuilder.buildAndStartClient(metricsCallback);
+
+        return new PrometheusSinkWriter<>(
+                getElementConverter(),
+                initContext,
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                maxBatchSizeInSamples,
+                maxRecordSizeInSamples,
+                getMaxTimeInBufferMS(),
+                prometheusRemoteWriteUrl,
+                asyncHttpClient,
+                metricsCallback,
+                requestSigner,
+                httpUserAgent,
+                errorHandlingBehaviorConfig);
+    }
+
+    @Override
+    public StatefulSinkWriter<IN, BufferedRequestState<Types.TimeSeries>> 
restoreWriter(
+            InitContext initContext,
+            Collection<BufferedRequestState<Types.TimeSeries>> recoveredState) 
{
+        SinkMetricsCallback metricsCallback =
+                new SinkMetricsCallback(
+                        SinkMetrics.registerSinkMetrics(
+                                
initContext.metricGroup().addGroup(metricGroupName)));
+        CloseableHttpAsyncClient asyncHttpClient =
+                clientBuilder.buildAndStartClient(metricsCallback);
+        return new PrometheusSinkWriter<>(
+                getElementConverter(),
+                initContext,
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                maxBatchSizeInSamples,
+                maxRecordSizeInSamples,
+                getMaxTimeInBufferMS(),
+                prometheusRemoteWriteUrl,
+                asyncHttpClient,
+                metricsCallback,
+                requestSigner,
+                httpUserAgent,
+                errorHandlingBehaviorConfig,
+                recoveredState);
+    }
+
+    public PrometheusSink castToPrometheusSink() {
+        return new PrometheusSink(
+                new PrometheusTimeSeriesConverter(),
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                maxBatchSizeInSamples,
+                maxRecordSizeInSamples,
+                getMaxTimeInBufferMS(),
+                prometheusRemoteWriteUrl,
+                clientBuilder,
+                requestSigner,
+                httpUserAgent,
+                errorHandlingBehaviorConfig,
+                metricGroupName);
+    }

Review Comment:
   The fact that we need this is a big code smell, since the higher level 
abstraction generates the lower level abstraction. Given we have just released 
the connector, let's consider instead "fixing" the interface, upgrading the 
major version to remove this problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to