hlteoh37 commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1754639229


##########
flink-connector-prometheus/README.md:
##########
@@ -0,0 +1,260 @@
+## Flink Prometheus connector (sink)
+
+Implementation of the Prometheus sink connector for DataStream API.
+
+The sink writes to Prometheus using the Remote-Write interface, based
+on [Remote-Write specifications version 
1.0](https://prometheus.io/docs/concepts/remote_write_spec/)
+
+### Guarantees and input restrictions
+
+Due to the strict 
[ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering)
+and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) 
requirements
+of Prometheus Remote-Write, the sink guarantees that input data are written to 
Prometheus only if input data are in
+order and well-formed.

Review Comment:
   The word "guarantee" here is a little confusing. Maybe we can say something 
like the "Prometheus Remote-Write has strict guarantees around ordering and 
format. This sink requires that users ensure that input data is in order and 
well-formed before adding it to the sink."



##########
flink-connector-prometheus/README.md:
##########
@@ -0,0 +1,260 @@
+## Flink Prometheus connector (sink)
+
+Implementation of the Prometheus sink connector for DataStream API.
+
+The sink writes to Prometheus using the Remote-Write interface, based
+on [Remote-Write specifications version 
1.0](https://prometheus.io/docs/concepts/remote_write_spec/)
+
+### Guarantees and input restrictions
+
+Due to the strict 
[ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering)
+and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) 
requirements
+of Prometheus Remote-Write, the sink guarantees that input data are written to 
Prometheus only if input data are in
+order and well-formed.
+
+For efficiency, the connector does not do any validation.
+If input is out of order or malformed, **the write request is rejected by 
Prometheus**.
+If the write request is rejected, depending on the configured [error handling 
behaviour](#error-handling-behavior) for
+"Prometheus non-retriable errors", the sink will either throw an exception 
(`FAIL`, default behavior) or discard the
+offending
+request and continue (`DISCARD_AND_CONTINUE`). See [error handling 
behaviour](#error-handling-behavior), below, for
+further details.
+
+The sink receives as input time-series, each containing one or more samples.
+To optimise the write throughput, input time-series are batched, in the order 
they are received, and written with a
+single write-request.
+
+If a write-request contains any out-of-order or malformed data, **the entire 
request is rejected** and all time series
+are discarded.
+The reason is Remote-Write
+specifications [explicitly forbids 
retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff)
 of
+rejected write requests (4xx responses).
+and the Prometheus response does not contain enough information to efficiently 
partially retry the write, discarding the
+offending data.
+
+### Responsibilities of the application
+
+It is responsibility of the application sending the data to the sink in the 
correct order and format.
+
+1. Input time-series must be well-formed, e.g. only valid and non-duplicated 
labels,
+   samples in timestamp order (see [Labels and 
Ordering](https://prometheus.io/docs/concepts/remote_write_spec/#labels)
+   in Prometheus Remote-Write specs).
+2. Input time-series with identical labels are sent to the sink in timestamp 
order.
+3. If sink parallelism > 1 is used, the input stream must be partitioned so 
that all time-series with identical labels
+   go to the same sink subtask. A `KeySelector` is provided to partition input 
correctly (
+   see [Partitioning](#partitioning), below).
+
+#### Sink input objects
+
+To help sending well-formed data to the sink, the connector
+expect 
[`PrometheusTimeSeries`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java)
+POJOs as input.
+
+Each `PrometheusTimeSeries` instance maps 1-to-1 to
+a [remote-write 
`TimeSeries`](https://prometheus.io/docs/concepts/remote_write_spec/#protocol). 
Each object contains:
+
+* exactly one `metericName`, mapped to the special  `__name__` label
+* optionally, any number of additional labels { k: String, v:String } - MUST 
BE IN ORDER BY KEY
+* one or more `Samples` { value: double, timestamp: long } - MUST BE IN 
TIMESTAMP ORDER
+
+`PrometheusTimeSeries` provides a builder interface.
+
+```java
+
+// List<Tuple2<Double, Long>> samples = ...
+
+PrometheusTimeSeries.Builder tsBuilder = PrometheusTimeSeries.builder()
+        .withMetricName("CPU") // mapped to  `__name__` label
+        .addLabel("InstanceID", instanceId)
+        .addLabel("AccountID", accountId);
+    
+for(
+Tuple2<Double, Long> sample :samples){
+        tsBuilder.
+
+addSample(sample.f0, sample.f1);
+}
+
+PrometheusTimeSeries ts = tsBuilder.build();
+```
+
+#### Input data validation
+
+Prometheus imposes strict constraints to the content sent to remote-write, 
including label format and ordering, sample
+time ordering ecc.

Review Comment:
   nit: etc.



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesConverter.java:
##########
@@ -0,0 +1,61 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+/**
+ * Converts the sink input {@link PrometheusTimeSeries} into the Protobuf 
{@link Types.TimeSeries}
+ * that are sent to Prometheus.
+ */
+public class PrometheusTimeSeriesConverter

Review Comment:
   `@PublicEvolving`?



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java:
##########
@@ -0,0 +1,264 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Optional;
+
+/** Builder for Sink implementation. */
+public class PrometheusSinkBuilder
+        extends AsyncSinkBaseBuilder<
+                PrometheusTimeSeries, Types.TimeSeries, PrometheusSinkBuilder> 
{
+    private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusSinkBuilder.class);
+
+    // Max batch size, in number of samples
+    private static final int DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES = 500;
+    // Max time a record is buffered
+    private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+    // Max nr of requestEntry that will be buffered
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 1000;
+    // Metric Group name added to the custom metrics
+    private static final String DEFAULT_METRIC_GROUP_NAME = "Prometheus";
+
+    // Max in-flight requests is always = 1, to retain ordering
+    private static final int MAX_IN_FLIGHT_REQUESTS = 1;
+
+    private String prometheusRemoteWriteUrl;
+    private RetryConfiguration retryConfiguration;
+    private Integer socketTimeoutMs;
+    private PrometheusRequestSigner requestSigner = null;
+    private Integer maxBatchSizeInSamples;
+    private Integer maxRecordSizeInSamples;
+    private String httpUserAgent = null;
+    private SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig = null;
+    private String metricGroupName = null;
+
+    @Override
+    public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() {
+
+        int actualMaxBatchSizeInSamples =
+                Optional.ofNullable(getMaxBatchSizeInSamples())
+                        .orElse(DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES);
+        int actualMaxBufferedRequests =
+                
Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS);
+        long actualMaxTimeInBufferMS =
+                
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS);
+
+        int actualMaxRecordSizeInSamples =
+                
Optional.ofNullable(getMaxRecordSizeInSamples()).orElse(getMaxBatchSizeInSamples());
+
+        int actualSocketTimeoutMs =
+                Optional.ofNullable(getSocketTimeoutMs())
+                        
.orElse(PrometheusAsyncHttpClientBuilder.DEFAULT_SOCKET_TIMEOUT_MS);
+
+        String actualHttpUserAgent =
+                Optional.ofNullable(getHttpUserAgent())
+                        
.orElse(PrometheusRemoteWriteHttpRequestBuilder.DEFAULT_USER_AGENT);
+
+        SinkWriterErrorHandlingBehaviorConfiguration 
actualErrorHandlingBehaviorConfig =
+                Optional.ofNullable(errorHandlingBehaviorConfig)
+                        
.orElse(SinkWriterErrorHandlingBehaviorConfiguration.DEFAULT_BEHAVIORS);
+
+        String actualMetricGroupName =
+                
Optional.ofNullable(metricGroupName).orElse(DEFAULT_METRIC_GROUP_NAME);
+
+        Preconditions.checkArgument(
+                StringUtils.isNotBlank(prometheusRemoteWriteUrl),
+                "Missing or blank Prometheus Remote-Write URL");
+        checkValidRemoteWriteUrl(prometheusRemoteWriteUrl);
+        Preconditions.checkNotNull(retryConfiguration, "Missing retry 
configuration");
+        Preconditions.checkArgument(
+                actualMaxBatchSizeInSamples > 0, "Max batch size (in samples) 
must be positive");
+        Preconditions.checkArgument(
+                actualMaxRecordSizeInSamples <= actualMaxBatchSizeInSamples,
+                "Max record size (in samples) must be <= Max batch size");
+
+        LOG.info(
+                "Prometheus sink configuration:"
+                        + 
"\n\t\tmaxBatchSizeInSamples={}\n\t\tmaxRecordSizeInSamples={}"
+                        + 
"\n\t\tmaxTimeInBufferMs={}\n\t\tmaxInFlightRequests={}\n\t\tmaxBufferedRequests={}"
+                        + 
"\n\t\tinitialRetryDelayMs={}\n\t\tmaxRetryDelayMs={}\n\t\tmaxRetryCount={}"
+                        + "\n\t\tsocketTimeoutMs={}\n\t\thttpUserAgent={}"
+                        + "\n\t\tErrorHandlingBehaviour: 
onMaxRetryExceeded={}, onHttpClientIOFailure={}, onNonRetriableError={}",
+                actualMaxBatchSizeInSamples,
+                actualMaxRecordSizeInSamples,
+                actualMaxTimeInBufferMS,
+                MAX_IN_FLIGHT_REQUESTS,
+                actualMaxBufferedRequests,
+                retryConfiguration.getInitialRetryDelayMS(),
+                retryConfiguration.getMaxRetryDelayMS(),
+                retryConfiguration.getMaxRetryCount(),
+                socketTimeoutMs,
+                actualHttpUserAgent,
+                actualErrorHandlingBehaviorConfig.getOnMaxRetryExceeded(),
+                actualErrorHandlingBehaviorConfig.getOnHttpClientIOFail(),
+                
actualErrorHandlingBehaviorConfig.getOnPrometheusNonRetriableError());
+
+        return new PrometheusSink(
+                new PrometheusTimeSeriesConverter(),
+                MAX_IN_FLIGHT_REQUESTS,
+                actualMaxBufferedRequests,
+                actualMaxBatchSizeInSamples,
+                actualMaxRecordSizeInSamples,
+                actualMaxTimeInBufferMS,
+                prometheusRemoteWriteUrl,
+                new PrometheusAsyncHttpClientBuilder(retryConfiguration)
+                        .setSocketTimeout(actualSocketTimeoutMs),
+                requestSigner,
+                actualHttpUserAgent,
+                actualErrorHandlingBehaviorConfig,
+                actualMetricGroupName);
+    }
+
+    private static void checkValidRemoteWriteUrl(String url) {
+        try {
+            new URL(url);
+        } catch (MalformedURLException mue) {
+            throw new IllegalArgumentException("Invalid Remote-Write URL: " + 
url, mue);
+        }
+    }
+
+    public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String 
prometheusRemoteWriteUrl) {
+        this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setRequestSigner(PrometheusRequestSigner 
requestSigner) {
+        this.requestSigner = requestSigner;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setMaxBatchSizeInSamples(int 
maxBatchSizeInSamples) {
+        this.maxBatchSizeInSamples = maxBatchSizeInSamples;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setMaxRecordSizeInSamples(int 
maxRecordSizeInSamples) {
+        this.maxRecordSizeInSamples = maxRecordSizeInSamples;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+        this.retryConfiguration = retryConfiguration;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setSocketTimeoutMs(int socketTimeoutMs) {
+        this.socketTimeoutMs = socketTimeoutMs;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setHttpUserAgent(String httpUserAgent) {
+        this.httpUserAgent = httpUserAgent;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setErrorHandlingBehaviourConfiguration(
+            SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig) {
+        this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setMetricGroupName(String metricGroupName) {
+        this.metricGroupName = metricGroupName;
+        return this;
+    }
+
+    private Integer getMaxBatchSizeInSamples() {
+        return maxBatchSizeInSamples;
+    }
+
+    private Integer getMaxRecordSizeInSamples() {
+        return maxRecordSizeInSamples;
+    }
+
+    public RetryConfiguration getRetryConfiguration() {
+        return retryConfiguration;
+    }
+
+    public Integer getSocketTimeoutMs() {
+        return socketTimeoutMs;
+    }
+
+    public String getHttpUserAgent() {
+        return httpUserAgent;
+    }
+
+    public SinkWriterErrorHandlingBehaviorConfiguration 
getErrorHandlingBehaviorConfig() {
+        return errorHandlingBehaviorConfig;
+    }
+
+    public String getMetricGroupName() {
+        return metricGroupName;
+    }
+
+    /// Disable accessing maxBatchSize, maxBatchSizeInBytes, and 
maxRecordSizeInBytes directly
+
+    /** Not supported. Use setMaxBatchSizeInSamples(int) instead */
+    @Override
+    public PrometheusSinkBuilder setMaxBatchSize(int maxBatchSize) {
+        throw new UnsupportedOperationException("maxBatchSize is not supported 
by this sink");
+    }
+
+    /** Not supported. Use setMaxBatchSizeInSamples(int) instead */
+    @Override
+    public PrometheusSinkBuilder setMaxBatchSizeInBytes(long 
maxBatchSizeInBytes) {
+        throw new UnsupportedOperationException(
+                "maxBatchSizeInBytes is not supported by this sink");
+    }
+
+    /** Not supported. Use setMaxRecordSizeInSamples(int) instead */
+    @Override
+    public PrometheusSinkBuilder setMaxRecordSizeInBytes(long 
maxRecordSizeInBytes) {
+        throw new UnsupportedOperationException(
+                "maxRecordSizeInBytes is not supported by this sink");
+    }
+
+    /** Not supported. Use getMaxBatchSizeInSamples() instead */
+    @Override
+    protected Integer getMaxBatchSize() {
+        throw new UnsupportedOperationException("maxBatchSize is not supported 
by this sink");
+    }
+
+    /** Not supported. Use getMaxBatchSizeInSamples() instead */
+    @Override
+    protected Long getMaxBatchSizeInBytes() {
+        throw new UnsupportedOperationException(
+                "maxRecordSizeInBytes is not supported by this sink");
+    }
+
+    /** Not supported. Use getMaxRecordSizeInSamples() instead */
+    @Override
+    protected Long getMaxRecordSizeInBytes() {
+        throw new UnsupportedOperationException(
+                "maxRecordSizeInBytes is not supported by this sink");
+    }

Review Comment:
   
   
   We don't need to do this, as we don't expect this builder to be extended!
   



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/metrics/SinkMetrics.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+/** Wraps all metrics in a single class. */
+public class SinkMetrics {

Review Comment:
   `@Internal`?



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java:
##########
@@ -0,0 +1,148 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+
+import java.util.Collection;
+
+/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
+@PublicEvolving
+public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, 
Types.TimeSeries> {
+    private final String prometheusRemoteWriteUrl;
+    private final PrometheusAsyncHttpClientBuilder clientBuilder;
+    private final PrometheusRequestSigner requestSigner;
+    private final int maxBatchSizeInSamples;
+    private final String httpUserAgent;
+    private final SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig;
+    private final String metricGroupName;
+
+    protected PrometheusSink(
+            ElementConverter<PrometheusTimeSeries, Types.TimeSeries> 
elementConverter,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            int maxBatchSizeInSamples,
+            int maxRecordSizeInSamples,
+            long maxTimeInBufferMS,
+            String prometheusRemoteWriteUrl,
+            PrometheusAsyncHttpClientBuilder clientBuilder,
+            PrometheusRequestSigner requestSigner,
+            String httpUserAgent,
+            SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig,
+            String metricGroupName) {
+        super(
+                elementConverter,
+                maxBatchSizeInSamples, // maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInSamples, // maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInSamples // maxRecordSizeInBytes
+                );
+
+        Preconditions.checkArgument(maxInFlightRequests == 1, 
"maxInFlightRequests must be 1");
+
+        this.maxBatchSizeInSamples = maxBatchSizeInSamples;
+        this.requestSigner = requestSigner;
+        this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
+        this.clientBuilder = clientBuilder;
+        this.httpUserAgent = httpUserAgent;
+        this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig;
+        this.metricGroupName = metricGroupName;
+    }
+
+    public int getMaxBatchSizeInSamples() {
+        return maxBatchSizeInSamples;
+    }

Review Comment:
   Actually can we just delete this?



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java:
##########
@@ -0,0 +1,148 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+
+import java.util.Collection;
+
+/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
+@PublicEvolving
+public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, 
Types.TimeSeries> {
+    private final String prometheusRemoteWriteUrl;
+    private final PrometheusAsyncHttpClientBuilder clientBuilder;
+    private final PrometheusRequestSigner requestSigner;
+    private final int maxBatchSizeInSamples;
+    private final String httpUserAgent;
+    private final SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig;
+    private final String metricGroupName;
+
+    protected PrometheusSink(
+            ElementConverter<PrometheusTimeSeries, Types.TimeSeries> 
elementConverter,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            int maxBatchSizeInSamples,
+            int maxRecordSizeInSamples,
+            long maxTimeInBufferMS,
+            String prometheusRemoteWriteUrl,
+            PrometheusAsyncHttpClientBuilder clientBuilder,
+            PrometheusRequestSigner requestSigner,
+            String httpUserAgent,
+            SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig,
+            String metricGroupName) {
+        super(
+                elementConverter,
+                maxBatchSizeInSamples, // maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInSamples, // maxBatchSizeInBytes,

Review Comment:
   Can we add a comment to explain that we don't need to check the bytes, so we 
simply use sample as replacement for bytes



##########
flink-connector-prometheus/README.md:
##########
@@ -0,0 +1,260 @@
+## Flink Prometheus connector (sink)
+
+Implementation of the Prometheus sink connector for DataStream API.
+
+The sink writes to Prometheus using the Remote-Write interface, based
+on [Remote-Write specifications version 
1.0](https://prometheus.io/docs/concepts/remote_write_spec/)
+
+### Guarantees and input restrictions
+
+Due to the strict 
[ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering)
+and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) 
requirements
+of Prometheus Remote-Write, the sink guarantees that input data are written to 
Prometheus only if input data are in
+order and well-formed.
+
+For efficiency, the connector does not do any validation.
+If input is out of order or malformed, **the write request is rejected by 
Prometheus**.
+If the write request is rejected, depending on the configured [error handling 
behaviour](#error-handling-behavior) for
+"Prometheus non-retriable errors", the sink will either throw an exception 
(`FAIL`, default behavior) or discard the
+offending
+request and continue (`DISCARD_AND_CONTINUE`). See [error handling 
behaviour](#error-handling-behavior), below, for
+further details.
+
+The sink receives as input time-series, each containing one or more samples.
+To optimise the write throughput, input time-series are batched, in the order 
they are received, and written with a
+single write-request.
+
+If a write-request contains any out-of-order or malformed data, **the entire 
request is rejected** and all time series
+are discarded.
+The reason is Remote-Write
+specifications [explicitly forbids 
retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff)
 of
+rejected write requests (4xx responses).
+and the Prometheus response does not contain enough information to efficiently 
partially retry the write, discarding the
+offending data.
+
+### Responsibilities of the application
+
+It is responsibility of the application sending the data to the sink in the 
correct order and format.
+
+1. Input time-series must be well-formed, e.g. only valid and non-duplicated 
labels,
+   samples in timestamp order (see [Labels and 
Ordering](https://prometheus.io/docs/concepts/remote_write_spec/#labels)
+   in Prometheus Remote-Write specs).
+2. Input time-series with identical labels are sent to the sink in timestamp 
order.
+3. If sink parallelism > 1 is used, the input stream must be partitioned so 
that all time-series with identical labels
+   go to the same sink subtask. A `KeySelector` is provided to partition input 
correctly (
+   see [Partitioning](#partitioning), below).
+
+#### Sink input objects
+
+To help sending well-formed data to the sink, the connector
+expect 
[`PrometheusTimeSeries`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java)
+POJOs as input.
+
+Each `PrometheusTimeSeries` instance maps 1-to-1 to
+a [remote-write 
`TimeSeries`](https://prometheus.io/docs/concepts/remote_write_spec/#protocol). 
Each object contains:
+
+* exactly one `metericName`, mapped to the special  `__name__` label
+* optionally, any number of additional labels { k: String, v:String } - MUST 
BE IN ORDER BY KEY
+* one or more `Samples` { value: double, timestamp: long } - MUST BE IN 
TIMESTAMP ORDER
+
+`PrometheusTimeSeries` provides a builder interface.
+
+```java
+
+// List<Tuple2<Double, Long>> samples = ...
+
+PrometheusTimeSeries.Builder tsBuilder = PrometheusTimeSeries.builder()
+        .withMetricName("CPU") // mapped to  `__name__` label
+        .addLabel("InstanceID", instanceId)
+        .addLabel("AccountID", accountId);
+    
+for(
+Tuple2<Double, Long> sample :samples){
+        tsBuilder.
+
+addSample(sample.f0, sample.f1);
+}
+
+PrometheusTimeSeries ts = tsBuilder.build();
+```
+
+#### Input data validation
+
+Prometheus imposes strict constraints to the content sent to remote-write, 
including label format and ordering, sample
+time ordering ecc.
+
+For efficiency, the sink **does not do any validation or reordering** of the 
input. It's responsibility
+of the application ensuring that input is well-formed.
+
+Any malformed data will be rejected on write to Prometheus. Depending on
+the [error handling behaviours](#error-handling-behavior)
+configured, the sink will throw an exception stopping the job (default), or 
drop the entire write-request, log the fact,
+and continue.
+
+For complete details about these constraints, refer to
+the [remote-write 
specifications](https://prometheus.io/docs/concepts/remote_write_spec/).
+
+### Batching, blocking writes and retry
+
+The sink batches multiple time-series into a single write-request, retaining 
the order..
+
+Batching is based on the number of samples. Each write-request contains up to 
500 samples, with a max buffering time of
+5 seconds
+(both configurable). The number of time-series doesn't matter.
+
+As by [Prometheus Remote-Write 
specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff),
+the sink retries 5xx and 429 responses. Retrying is blocking, to retain sample 
ordering, and uses and exponential
+backoff.
+
+The exponential backoff starts with an initial delay (default 30 ms) and 
increases it exponentially up to a max retry
+delay (default 5 sec). It continues retrying until the max number of retries 
is reached (default reties forever).
+
+On non-retriable error response (4xx, except 429, non retryable exceptions), 
or on reaching the retry limit, depending
+on
+the configured [error handling behaviour](#error-handling-behavior) for "Max 
retries exceeded", the sink will either
+throw
+an exception (`FAIL`, default behaviour), or **discard the entire 
write-request**, log a warning and continue. See
+[error handling behaviour](#error-handling-behavior), below, for further 
details.
+
+### Initializing the sink
+
+Example of sink initialisation (for documentation purposes, we are setting all 
parameters to their default values):
+Sink
+
+```java
+PrometheusSink sink = PrometheusSink.builder()
+        .setMaxBatchSizeInSamples(500)              // Batch size 
(write-request size), in samples (default: 500)
+        .setMaxRecordSizeInSamples(500)             // Max sink input record 
size, in samples (default: 500), must be <= maxBatchSizeInSamples

Review Comment:
   Might be good to clarify the behavior when this is exceeded. The sink will 
continuously restart and reject the record, to be used for data validation 
before sending to sink. Might be even mentioning that this is not expected in 
the normal use case of the sink. (As compared to maxBatchSize which can be 
tuned for performance reasons)



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java:
##########
@@ -0,0 +1,148 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+
+import java.util.Collection;
+
+/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
+@PublicEvolving
+public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, 
Types.TimeSeries> {
+    private final String prometheusRemoteWriteUrl;
+    private final PrometheusAsyncHttpClientBuilder clientBuilder;
+    private final PrometheusRequestSigner requestSigner;
+    private final int maxBatchSizeInSamples;
+    private final String httpUserAgent;
+    private final SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig;
+    private final String metricGroupName;
+
+    protected PrometheusSink(
+            ElementConverter<PrometheusTimeSeries, Types.TimeSeries> 
elementConverter,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            int maxBatchSizeInSamples,
+            int maxRecordSizeInSamples,
+            long maxTimeInBufferMS,
+            String prometheusRemoteWriteUrl,
+            PrometheusAsyncHttpClientBuilder clientBuilder,
+            PrometheusRequestSigner requestSigner,
+            String httpUserAgent,
+            SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig,
+            String metricGroupName) {
+        super(
+                elementConverter,
+                maxBatchSizeInSamples, // maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInSamples, // maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInSamples // maxRecordSizeInBytes
+                );
+
+        Preconditions.checkArgument(maxInFlightRequests == 1, 
"maxInFlightRequests must be 1");
+
+        this.maxBatchSizeInSamples = maxBatchSizeInSamples;
+        this.requestSigner = requestSigner;
+        this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
+        this.clientBuilder = clientBuilder;
+        this.httpUserAgent = httpUserAgent;
+        this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig;
+        this.metricGroupName = metricGroupName;
+    }
+
+    public int getMaxBatchSizeInSamples() {
+        return maxBatchSizeInSamples;
+    }

Review Comment:
   can this be `private`?



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusRemoteWriteHttpRequestBuilder.java:
##########
@@ -0,0 +1,77 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpHeaders;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Builds the POST request to the Remote-Write endpoint for a given binary 
payload. */
+public class PrometheusRemoteWriteHttpRequestBuilder {

Review Comment:
   Let's annotate this as `@Internal`



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/errorhandling/PrometheusSinkWriteException.java:
##########
@@ -0,0 +1,98 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.errorhandling;
+
+/** Exception during writing to Prometheus Remote-Write endpoint. */
+public class PrometheusSinkWriteException extends RuntimeException {

Review Comment:
   `@Internal`?



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java:
##########
@@ -0,0 +1,188 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * Callback handling the outcome of the http async request.
+ *
+ * <p>This class implements the error handling behaviour, based on the 
configuration in {@link
+ * SinkWriterErrorHandlingBehaviorConfiguration}. Depending on the condition, 
the sink may throw an
+ * exception and cause the job to fail, or log the condition to WARN, 
increment the counters and
+ * continue with the next request.
+ *
+ * <p>In any case, every write-request either entirely succeed or fail. 
Partial failures are not
+ * handled.
+ *
+ * <p>In no condition a write-request is re-queued for the AsyncSink to 
reprocess: this would cause
+ * out of order writes that would be rejected by Prometheus.
+ *
+ * <p>Note that the http async client retries, based on the configured retry 
policy. The callback is
+ * called with an outcome of *completed* either when the request has succeeded 
or the max retry
+ * limit has been exceeded. It is responsibility of the callback 
distinguishing between these
+ * conditions.
+ */
+class HttpResponseCallback implements FutureCallback<SimpleHttpResponse> {

Review Comment:
   Let's annotate this as `@Internal`



##########
flink-connector-prometheus/README.md:
##########
@@ -0,0 +1,260 @@
+## Flink Prometheus connector (sink)
+
+Implementation of the Prometheus sink connector for DataStream API.
+
+The sink writes to Prometheus using the Remote-Write interface, based
+on [Remote-Write specifications version 
1.0](https://prometheus.io/docs/concepts/remote_write_spec/)
+
+### Guarantees and input restrictions
+
+Due to the strict 
[ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering)
+and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) 
requirements
+of Prometheus Remote-Write, the sink guarantees that input data are written to 
Prometheus only if input data are in
+order and well-formed.
+
+For efficiency, the connector does not do any validation.
+If input is out of order or malformed, **the write request is rejected by 
Prometheus**.
+If the write request is rejected, depending on the configured [error handling 
behaviour](#error-handling-behavior) for
+"Prometheus non-retriable errors", the sink will either throw an exception 
(`FAIL`, default behavior) or discard the
+offending
+request and continue (`DISCARD_AND_CONTINUE`). See [error handling 
behaviour](#error-handling-behavior), below, for
+further details.
+
+The sink receives as input time-series, each containing one or more samples.
+To optimise the write throughput, input time-series are batched, in the order 
they are received, and written with a
+single write-request.
+
+If a write-request contains any out-of-order or malformed data, **the entire 
request is rejected** and all time series
+are discarded.
+The reason is Remote-Write
+specifications [explicitly forbids 
retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff)
 of
+rejected write requests (4xx responses).
+and the Prometheus response does not contain enough information to efficiently 
partially retry the write, discarding the
+offending data.
+
+### Responsibilities of the application
+
+It is responsibility of the application sending the data to the sink in the 
correct order and format.
+
+1. Input time-series must be well-formed, e.g. only valid and non-duplicated 
labels,
+   samples in timestamp order (see [Labels and 
Ordering](https://prometheus.io/docs/concepts/remote_write_spec/#labels)
+   in Prometheus Remote-Write specs).
+2. Input time-series with identical labels are sent to the sink in timestamp 
order.
+3. If sink parallelism > 1 is used, the input stream must be partitioned so 
that all time-series with identical labels
+   go to the same sink subtask. A `KeySelector` is provided to partition input 
correctly (
+   see [Partitioning](#partitioning), below).
+
+#### Sink input objects
+
+To help sending well-formed data to the sink, the connector
+expect 
[`PrometheusTimeSeries`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java)
+POJOs as input.
+
+Each `PrometheusTimeSeries` instance maps 1-to-1 to
+a [remote-write 
`TimeSeries`](https://prometheus.io/docs/concepts/remote_write_spec/#protocol). 
Each object contains:
+
+* exactly one `metericName`, mapped to the special  `__name__` label
+* optionally, any number of additional labels { k: String, v:String } - MUST 
BE IN ORDER BY KEY
+* one or more `Samples` { value: double, timestamp: long } - MUST BE IN 
TIMESTAMP ORDER
+
+`PrometheusTimeSeries` provides a builder interface.
+
+```java
+
+// List<Tuple2<Double, Long>> samples = ...
+
+PrometheusTimeSeries.Builder tsBuilder = PrometheusTimeSeries.builder()
+        .withMetricName("CPU") // mapped to  `__name__` label
+        .addLabel("InstanceID", instanceId)
+        .addLabel("AccountID", accountId);
+    
+for(
+Tuple2<Double, Long> sample :samples){
+        tsBuilder.
+
+addSample(sample.f0, sample.f1);
+}
+
+PrometheusTimeSeries ts = tsBuilder.build();
+```
+
+#### Input data validation
+
+Prometheus imposes strict constraints to the content sent to remote-write, 
including label format and ordering, sample
+time ordering ecc.
+
+For efficiency, the sink **does not do any validation or reordering** of the 
input. It's responsibility
+of the application ensuring that input is well-formed.
+
+Any malformed data will be rejected on write to Prometheus. Depending on
+the [error handling behaviours](#error-handling-behavior)
+configured, the sink will throw an exception stopping the job (default), or 
drop the entire write-request, log the fact,
+and continue.
+
+For complete details about these constraints, refer to
+the [remote-write 
specifications](https://prometheus.io/docs/concepts/remote_write_spec/).
+
+### Batching, blocking writes and retry
+
+The sink batches multiple time-series into a single write-request, retaining 
the order..
+
+Batching is based on the number of samples. Each write-request contains up to 
500 samples, with a max buffering time of
+5 seconds
+(both configurable). The number of time-series doesn't matter.
+
+As by [Prometheus Remote-Write 
specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff),
+the sink retries 5xx and 429 responses. Retrying is blocking, to retain sample 
ordering, and uses and exponential
+backoff.
+
+The exponential backoff starts with an initial delay (default 30 ms) and 
increases it exponentially up to a max retry
+delay (default 5 sec). It continues retrying until the max number of retries 
is reached (default reties forever).
+
+On non-retriable error response (4xx, except 429, non retryable exceptions), 
or on reaching the retry limit, depending
+on
+the configured [error handling behaviour](#error-handling-behavior) for "Max 
retries exceeded", the sink will either
+throw
+an exception (`FAIL`, default behaviour), or **discard the entire 
write-request**, log a warning and continue. See
+[error handling behaviour](#error-handling-behavior), below, for further 
details.
+
+### Initializing the sink
+
+Example of sink initialisation (for documentation purposes, we are setting all 
parameters to their default values):
+Sink
+
+```java
+PrometheusSink sink = PrometheusSink.builder()
+        .setMaxBatchSizeInSamples(500)              // Batch size 
(write-request size), in samples (default: 500)
+        .setMaxRecordSizeInSamples(500)             // Max sink input record 
size, in samples (default: 500), must be <= maxBatchSizeInSamples
+        .setMaxTimeInBufferMS(5000)                 // Max time a time-series 
is buffered for batching (default: 5000 ms)
+        .setRetryConfiguration(RetryConfiguration.builder()
+                .setInitialRetryDelayMS(30L)            // Initial retry delay 
(default: 30 ms)
+                .setMaxRetryDelayMS(5000L)              // Maximum retray 
delay, with exponential backoff (default: 5000 ms)

Review Comment:
   nit: retry delay



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java:
##########
@@ -0,0 +1,264 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Optional;
+
+/** Builder for Sink implementation. */
+public class PrometheusSinkBuilder
+        extends AsyncSinkBaseBuilder<
+                PrometheusTimeSeries, Types.TimeSeries, PrometheusSinkBuilder> 
{
+    private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusSinkBuilder.class);
+
+    // Max batch size, in number of samples
+    private static final int DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES = 500;
+    // Max time a record is buffered
+    private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+    // Max nr of requestEntry that will be buffered
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 1000;
+    // Metric Group name added to the custom metrics
+    private static final String DEFAULT_METRIC_GROUP_NAME = "Prometheus";
+
+    // Max in-flight requests is always = 1, to retain ordering
+    private static final int MAX_IN_FLIGHT_REQUESTS = 1;
+
+    private String prometheusRemoteWriteUrl;
+    private RetryConfiguration retryConfiguration;
+    private Integer socketTimeoutMs;
+    private PrometheusRequestSigner requestSigner = null;
+    private Integer maxBatchSizeInSamples;
+    private Integer maxRecordSizeInSamples;
+    private String httpUserAgent = null;
+    private SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig = null;
+    private String metricGroupName = null;
+
+    @Override
+    public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() {
+
+        int actualMaxBatchSizeInSamples =
+                Optional.ofNullable(getMaxBatchSizeInSamples())
+                        .orElse(DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES);
+        int actualMaxBufferedRequests =
+                
Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS);
+        long actualMaxTimeInBufferMS =
+                
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS);
+
+        int actualMaxRecordSizeInSamples =
+                
Optional.ofNullable(getMaxRecordSizeInSamples()).orElse(getMaxBatchSizeInSamples());
+
+        int actualSocketTimeoutMs =
+                Optional.ofNullable(getSocketTimeoutMs())
+                        
.orElse(PrometheusAsyncHttpClientBuilder.DEFAULT_SOCKET_TIMEOUT_MS);
+
+        String actualHttpUserAgent =
+                Optional.ofNullable(getHttpUserAgent())
+                        
.orElse(PrometheusRemoteWriteHttpRequestBuilder.DEFAULT_USER_AGENT);
+
+        SinkWriterErrorHandlingBehaviorConfiguration 
actualErrorHandlingBehaviorConfig =
+                Optional.ofNullable(errorHandlingBehaviorConfig)
+                        
.orElse(SinkWriterErrorHandlingBehaviorConfiguration.DEFAULT_BEHAVIORS);
+
+        String actualMetricGroupName =
+                
Optional.ofNullable(metricGroupName).orElse(DEFAULT_METRIC_GROUP_NAME);
+
+        Preconditions.checkArgument(
+                StringUtils.isNotBlank(prometheusRemoteWriteUrl),
+                "Missing or blank Prometheus Remote-Write URL");
+        checkValidRemoteWriteUrl(prometheusRemoteWriteUrl);
+        Preconditions.checkNotNull(retryConfiguration, "Missing retry 
configuration");
+        Preconditions.checkArgument(
+                actualMaxBatchSizeInSamples > 0, "Max batch size (in samples) 
must be positive");
+        Preconditions.checkArgument(
+                actualMaxRecordSizeInSamples <= actualMaxBatchSizeInSamples,
+                "Max record size (in samples) must be <= Max batch size");
+
+        LOG.info(
+                "Prometheus sink configuration:"
+                        + 
"\n\t\tmaxBatchSizeInSamples={}\n\t\tmaxRecordSizeInSamples={}"
+                        + 
"\n\t\tmaxTimeInBufferMs={}\n\t\tmaxInFlightRequests={}\n\t\tmaxBufferedRequests={}"
+                        + 
"\n\t\tinitialRetryDelayMs={}\n\t\tmaxRetryDelayMs={}\n\t\tmaxRetryCount={}"
+                        + "\n\t\tsocketTimeoutMs={}\n\t\thttpUserAgent={}"
+                        + "\n\t\tErrorHandlingBehaviour: 
onMaxRetryExceeded={}, onHttpClientIOFailure={}, onNonRetriableError={}",
+                actualMaxBatchSizeInSamples,
+                actualMaxRecordSizeInSamples,
+                actualMaxTimeInBufferMS,
+                MAX_IN_FLIGHT_REQUESTS,
+                actualMaxBufferedRequests,
+                retryConfiguration.getInitialRetryDelayMS(),
+                retryConfiguration.getMaxRetryDelayMS(),
+                retryConfiguration.getMaxRetryCount(),
+                socketTimeoutMs,
+                actualHttpUserAgent,
+                actualErrorHandlingBehaviorConfig.getOnMaxRetryExceeded(),
+                actualErrorHandlingBehaviorConfig.getOnHttpClientIOFail(),
+                
actualErrorHandlingBehaviorConfig.getOnPrometheusNonRetriableError());
+
+        return new PrometheusSink(
+                new PrometheusTimeSeriesConverter(),
+                MAX_IN_FLIGHT_REQUESTS,
+                actualMaxBufferedRequests,
+                actualMaxBatchSizeInSamples,
+                actualMaxRecordSizeInSamples,
+                actualMaxTimeInBufferMS,
+                prometheusRemoteWriteUrl,
+                new PrometheusAsyncHttpClientBuilder(retryConfiguration)
+                        .setSocketTimeout(actualSocketTimeoutMs),
+                requestSigner,
+                actualHttpUserAgent,
+                actualErrorHandlingBehaviorConfig,
+                actualMetricGroupName);
+    }
+
+    private static void checkValidRemoteWriteUrl(String url) {
+        try {
+            new URL(url);
+        } catch (MalformedURLException mue) {
+            throw new IllegalArgumentException("Invalid Remote-Write URL: " + 
url, mue);
+        }
+    }
+
+    public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String 
prometheusRemoteWriteUrl) {
+        this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setRequestSigner(PrometheusRequestSigner 
requestSigner) {
+        this.requestSigner = requestSigner;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setMaxBatchSizeInSamples(int 
maxBatchSizeInSamples) {
+        this.maxBatchSizeInSamples = maxBatchSizeInSamples;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setMaxRecordSizeInSamples(int 
maxRecordSizeInSamples) {
+        this.maxRecordSizeInSamples = maxRecordSizeInSamples;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+        this.retryConfiguration = retryConfiguration;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setSocketTimeoutMs(int socketTimeoutMs) {
+        this.socketTimeoutMs = socketTimeoutMs;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setHttpUserAgent(String httpUserAgent) {
+        this.httpUserAgent = httpUserAgent;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setErrorHandlingBehaviourConfiguration(
+            SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig) {
+        this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setMetricGroupName(String metricGroupName) {
+        this.metricGroupName = metricGroupName;
+        return this;
+    }
+
+    private Integer getMaxBatchSizeInSamples() {
+        return maxBatchSizeInSamples;
+    }
+
+    private Integer getMaxRecordSizeInSamples() {
+        return maxRecordSizeInSamples;
+    }
+
+    public RetryConfiguration getRetryConfiguration() {
+        return retryConfiguration;
+    }
+
+    public Integer getSocketTimeoutMs() {
+        return socketTimeoutMs;
+    }
+
+    public String getHttpUserAgent() {
+        return httpUserAgent;
+    }
+
+    public SinkWriterErrorHandlingBehaviorConfiguration 
getErrorHandlingBehaviorConfig() {
+        return errorHandlingBehaviorConfig;
+    }
+
+    public String getMetricGroupName() {
+        return metricGroupName;
+    }

Review Comment:
   Can we remove the `get` methods on the builder?



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/errorhandling/OnErrorBehavior.java:
##########
@@ -0,0 +1,27 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.errorhandling;
+
+/**
+ * Defines the behaviour when an error is encountered: discard the offending 
request and continue,
+ * or fail, throwing an exception.
+ */
+public enum OnErrorBehavior {

Review Comment:
   `@PublicEvolving`. Actually let's put this under a `ConfigOption` class



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java:
##########
@@ -0,0 +1,93 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.http;
+
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hc.client5.http.config.TlsConfig;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
+import 
org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.util.Timeout;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * Builder for async http client that will retry, based on the {@link 
RemoteWriteRetryStrategy}
+ * specified.
+ */
+public class PrometheusAsyncHttpClientBuilder implements Serializable {

Review Comment:
   `@PublicEvolving`?



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesLabelsAndMetricNameKeySelector.java:
##########
@@ -0,0 +1,44 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.api.java.functions.KeySelector;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * Implementation of KeySelector ensuring the {@link PrometheusTimeSeries} 
with the same set of
+ * labels end up in the same partition.
+ *
+ * <p>For using the sink with parallelism > 1, the input of the sink must be a 
keyedStream using
+ * this KeySelector to extract the key. This guarantees TimeSeries with the 
same set of labels are
+ * written to Prometheus in the same order they are sent to the sink.
+ *
+ * <p>The partition key is the hash of all labels AND the metricName. The 
metricName is added as
+ * additional label by the sink, before writing to Prometheus.
+ */
+public class PrometheusTimeSeriesLabelsAndMetricNameKeySelector

Review Comment:
   `@PublicEvolving`?



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/errorhandling/SinkWriterErrorHandlingBehaviorConfiguration.java:
##########
@@ -0,0 +1,105 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.errorhandling;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior.FAIL;
+
+/**
+ * Configure the error-handling behavior of the writer, for different types of 
error. Also defines
+ * default behaviors.
+ */
+public class SinkWriterErrorHandlingBehaviorConfiguration implements 
Serializable {

Review Comment:
   `@PublicEvolving`?



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RetryConfiguration.java:
##########
@@ -0,0 +1,79 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.http;
+
+import java.io.Serializable;
+
+/** Defines the retry strategy configuration. */
+public class RetryConfiguration implements Serializable {

Review Comment:
   Can we just make this a set of `ConfigOptions`?



##########
pom.xml:
##########
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-parent</artifactId>
+        <version>1.0.0</version>
+    </parent>
+
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-connector-prometheus-parent</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <name>Flink Prometheus</name>
+    <modules>
+        <module>flink-connector-prometheus</module>
+        <module>flink-connector-prometheus-request-signer-amp</module>
+    </modules>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <flink.version>1.18.1</flink.version>

Review Comment:
   Let's make this `1.19.1`



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/errorhandling/PrometheusSinkWriteException.java:
##########
@@ -0,0 +1,98 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.errorhandling;
+
+/** Exception during writing to Prometheus Remote-Write endpoint. */
+public class PrometheusSinkWriteException extends RuntimeException {
+
+    private final String reason;
+    private final int httpStatusCode;
+    private final String httpReasonPhrase;
+    private final int timeSeriesCount;
+    private final long sampleCount;
+    private final String httpResponseBody;
+
+    public PrometheusSinkWriteException(String reason, int timeSeriesCount, 
long sampleCount) {
+        super("Reason: " + reason);
+        this.reason = reason;
+        this.timeSeriesCount = timeSeriesCount;
+        this.sampleCount = sampleCount;
+        this.httpStatusCode = -1;
+        this.httpReasonPhrase = "";
+        this.httpResponseBody = "";
+    }
+
+    public PrometheusSinkWriteException(
+            String reason, int timeSeriesCount, long sampleCount, Exception 
cause) {
+        super("Reason: " + reason, cause);
+        this.reason = reason;
+        this.timeSeriesCount = timeSeriesCount;
+        this.sampleCount = sampleCount;
+        this.httpStatusCode = -1;
+        this.httpReasonPhrase = "";
+        this.httpResponseBody = "";
+    }
+
+    public PrometheusSinkWriteException(
+            String reason,
+            int httpStatusCode,
+            String httpReasonPhrase,
+            int timeSeriesCount,
+            long sampleCount,
+            String httpResponseBody) {
+        super(
+                String.format(
+                        "Reason: %s. Http response: %d,%s (%s) .The offending 
write-request contains %d time-series and %d samples",
+                        reason,
+                        httpStatusCode,
+                        httpReasonPhrase,
+                        httpResponseBody,
+                        timeSeriesCount,
+                        sampleCount));
+        this.reason = reason;
+        this.httpStatusCode = httpStatusCode;
+        this.httpReasonPhrase = httpReasonPhrase;
+        this.timeSeriesCount = timeSeriesCount;
+        this.sampleCount = sampleCount;
+        this.httpResponseBody = httpResponseBody;
+    }
+
+    public String getReason() {
+        return reason;
+    }
+
+    public int getHttpStatusCode() {
+        return httpStatusCode;
+    }
+
+    public String getHttpReasonPhrase() {
+        return httpReasonPhrase;
+    }
+
+    public int getTimeSeriesCount() {
+        return timeSeriesCount;
+    }
+
+    public long getSampleCount() {
+        return sampleCount;
+    }
+
+    public String getHttpResponseBody() {
+        return httpResponseBody;
+    }

Review Comment:
   can we remove these?



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesLabelsAndMetricNameKeySelector.java:
##########
@@ -0,0 +1,44 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.api.java.functions.KeySelector;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * Implementation of KeySelector ensuring the {@link PrometheusTimeSeries} 
with the same set of
+ * labels end up in the same partition.
+ *
+ * <p>For using the sink with parallelism > 1, the input of the sink must be a 
keyedStream using
+ * this KeySelector to extract the key. This guarantees TimeSeries with the 
same set of labels are
+ * written to Prometheus in the same order they are sent to the sink.
+ *
+ * <p>The partition key is the hash of all labels AND the metricName. The 
metricName is added as
+ * additional label by the sink, before writing to Prometheus.
+ */
+public class PrometheusTimeSeriesLabelsAndMetricNameKeySelector

Review Comment:
   Can we also name this something smaller? Like `PrometheusKeySelector`?



##########
pom.xml:
##########
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-parent</artifactId>
+        <version>1.0.0</version>
+    </parent>
+
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-connector-prometheus-parent</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <name>Flink Prometheus</name>
+    <modules>
+        <module>flink-connector-prometheus</module>
+        <module>flink-connector-prometheus-request-signer-amp</module>
+    </modules>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <flink.version>1.18.1</flink.version>
+        <protobuf.version>3.22.2</protobuf.version>
+        <apache.httpclient.version>5.2.1</apache.httpclient.version>
+        <aws.sdkv1.version>1.12.570</aws.sdkv1.version>
+        <log4j.version>2.17.1</log4j.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>

Review Comment:
   Could we copy over the other plugins here too ? 
   
   https://github.com/apache/flink-connector-aws/blob/main/pom.xml#L421-L477



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/metrics/SinkMetricsCallback.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.metrics;
+
+import static 
org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics.SinkCounter.NUM_SAMPLES_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics.SinkCounter.NUM_SAMPLES_OUT;
+import static 
org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_OUT;
+import static 
org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED;
+
+/** Callback updating {@link SinkMetrics} on specific request outcomes. */
+public class SinkMetricsCallback {

Review Comment:
   `@Internal`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to