hlteoh37 commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1754639229
########## flink-connector-prometheus/README.md: ########## @@ -0,0 +1,260 @@ +## Flink Prometheus connector (sink) + +Implementation of the Prometheus sink connector for DataStream API. + +The sink writes to Prometheus using the Remote-Write interface, based +on [Remote-Write specifications version 1.0](https://prometheus.io/docs/concepts/remote_write_spec/) + +### Guarantees and input restrictions + +Due to the strict [ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) +and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) requirements +of Prometheus Remote-Write, the sink guarantees that input data are written to Prometheus only if input data are in +order and well-formed. Review Comment: The word "guarantee" here is a little confusing. Maybe we can say something like the "Prometheus Remote-Write has strict guarantees around ordering and format. This sink requires that users ensure that input data is in order and well-formed before adding it to the sink." ########## flink-connector-prometheus/README.md: ########## @@ -0,0 +1,260 @@ +## Flink Prometheus connector (sink) + +Implementation of the Prometheus sink connector for DataStream API. + +The sink writes to Prometheus using the Remote-Write interface, based +on [Remote-Write specifications version 1.0](https://prometheus.io/docs/concepts/remote_write_spec/) + +### Guarantees and input restrictions + +Due to the strict [ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) +and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) requirements +of Prometheus Remote-Write, the sink guarantees that input data are written to Prometheus only if input data are in +order and well-formed. + +For efficiency, the connector does not do any validation. +If input is out of order or malformed, **the write request is rejected by Prometheus**. +If the write request is rejected, depending on the configured [error handling behaviour](#error-handling-behavior) for +"Prometheus non-retriable errors", the sink will either throw an exception (`FAIL`, default behavior) or discard the +offending +request and continue (`DISCARD_AND_CONTINUE`). See [error handling behaviour](#error-handling-behavior), below, for +further details. + +The sink receives as input time-series, each containing one or more samples. +To optimise the write throughput, input time-series are batched, in the order they are received, and written with a +single write-request. + +If a write-request contains any out-of-order or malformed data, **the entire request is rejected** and all time series +are discarded. +The reason is Remote-Write +specifications [explicitly forbids retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff) of +rejected write requests (4xx responses). +and the Prometheus response does not contain enough information to efficiently partially retry the write, discarding the +offending data. + +### Responsibilities of the application + +It is responsibility of the application sending the data to the sink in the correct order and format. + +1. Input time-series must be well-formed, e.g. only valid and non-duplicated labels, + samples in timestamp order (see [Labels and Ordering](https://prometheus.io/docs/concepts/remote_write_spec/#labels) + in Prometheus Remote-Write specs). +2. Input time-series with identical labels are sent to the sink in timestamp order. +3. If sink parallelism > 1 is used, the input stream must be partitioned so that all time-series with identical labels + go to the same sink subtask. A `KeySelector` is provided to partition input correctly ( + see [Partitioning](#partitioning), below). + +#### Sink input objects + +To help sending well-formed data to the sink, the connector +expect [`PrometheusTimeSeries`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java) +POJOs as input. + +Each `PrometheusTimeSeries` instance maps 1-to-1 to +a [remote-write `TimeSeries`](https://prometheus.io/docs/concepts/remote_write_spec/#protocol). Each object contains: + +* exactly one `metericName`, mapped to the special `__name__` label +* optionally, any number of additional labels { k: String, v:String } - MUST BE IN ORDER BY KEY +* one or more `Samples` { value: double, timestamp: long } - MUST BE IN TIMESTAMP ORDER + +`PrometheusTimeSeries` provides a builder interface. + +```java + +// List<Tuple2<Double, Long>> samples = ... + +PrometheusTimeSeries.Builder tsBuilder = PrometheusTimeSeries.builder() + .withMetricName("CPU") // mapped to `__name__` label + .addLabel("InstanceID", instanceId) + .addLabel("AccountID", accountId); + +for( +Tuple2<Double, Long> sample :samples){ + tsBuilder. + +addSample(sample.f0, sample.f1); +} + +PrometheusTimeSeries ts = tsBuilder.build(); +``` + +#### Input data validation + +Prometheus imposes strict constraints to the content sent to remote-write, including label format and ordering, sample +time ordering ecc. Review Comment: nit: etc. ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesConverter.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +/** + * Converts the sink input {@link PrometheusTimeSeries} into the Protobuf {@link Types.TimeSeries} + * that are sent to Prometheus. + */ +public class PrometheusTimeSeriesConverter Review Comment: `@PublicEvolving`? ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java: ########## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Optional; + +/** Builder for Sink implementation. */ +public class PrometheusSinkBuilder + extends AsyncSinkBaseBuilder< + PrometheusTimeSeries, Types.TimeSeries, PrometheusSinkBuilder> { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkBuilder.class); + + // Max batch size, in number of samples + private static final int DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES = 500; + // Max time a record is buffered + private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; + // Max nr of requestEntry that will be buffered + private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 1000; + // Metric Group name added to the custom metrics + private static final String DEFAULT_METRIC_GROUP_NAME = "Prometheus"; + + // Max in-flight requests is always = 1, to retain ordering + private static final int MAX_IN_FLIGHT_REQUESTS = 1; + + private String prometheusRemoteWriteUrl; + private RetryConfiguration retryConfiguration; + private Integer socketTimeoutMs; + private PrometheusRequestSigner requestSigner = null; + private Integer maxBatchSizeInSamples; + private Integer maxRecordSizeInSamples; + private String httpUserAgent = null; + private SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig = null; + private String metricGroupName = null; + + @Override + public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() { + + int actualMaxBatchSizeInSamples = + Optional.ofNullable(getMaxBatchSizeInSamples()) + .orElse(DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES); + int actualMaxBufferedRequests = + Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS); + long actualMaxTimeInBufferMS = + Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS); + + int actualMaxRecordSizeInSamples = + Optional.ofNullable(getMaxRecordSizeInSamples()).orElse(getMaxBatchSizeInSamples()); + + int actualSocketTimeoutMs = + Optional.ofNullable(getSocketTimeoutMs()) + .orElse(PrometheusAsyncHttpClientBuilder.DEFAULT_SOCKET_TIMEOUT_MS); + + String actualHttpUserAgent = + Optional.ofNullable(getHttpUserAgent()) + .orElse(PrometheusRemoteWriteHttpRequestBuilder.DEFAULT_USER_AGENT); + + SinkWriterErrorHandlingBehaviorConfiguration actualErrorHandlingBehaviorConfig = + Optional.ofNullable(errorHandlingBehaviorConfig) + .orElse(SinkWriterErrorHandlingBehaviorConfiguration.DEFAULT_BEHAVIORS); + + String actualMetricGroupName = + Optional.ofNullable(metricGroupName).orElse(DEFAULT_METRIC_GROUP_NAME); + + Preconditions.checkArgument( + StringUtils.isNotBlank(prometheusRemoteWriteUrl), + "Missing or blank Prometheus Remote-Write URL"); + checkValidRemoteWriteUrl(prometheusRemoteWriteUrl); + Preconditions.checkNotNull(retryConfiguration, "Missing retry configuration"); + Preconditions.checkArgument( + actualMaxBatchSizeInSamples > 0, "Max batch size (in samples) must be positive"); + Preconditions.checkArgument( + actualMaxRecordSizeInSamples <= actualMaxBatchSizeInSamples, + "Max record size (in samples) must be <= Max batch size"); + + LOG.info( + "Prometheus sink configuration:" + + "\n\t\tmaxBatchSizeInSamples={}\n\t\tmaxRecordSizeInSamples={}" + + "\n\t\tmaxTimeInBufferMs={}\n\t\tmaxInFlightRequests={}\n\t\tmaxBufferedRequests={}" + + "\n\t\tinitialRetryDelayMs={}\n\t\tmaxRetryDelayMs={}\n\t\tmaxRetryCount={}" + + "\n\t\tsocketTimeoutMs={}\n\t\thttpUserAgent={}" + + "\n\t\tErrorHandlingBehaviour: onMaxRetryExceeded={}, onHttpClientIOFailure={}, onNonRetriableError={}", + actualMaxBatchSizeInSamples, + actualMaxRecordSizeInSamples, + actualMaxTimeInBufferMS, + MAX_IN_FLIGHT_REQUESTS, + actualMaxBufferedRequests, + retryConfiguration.getInitialRetryDelayMS(), + retryConfiguration.getMaxRetryDelayMS(), + retryConfiguration.getMaxRetryCount(), + socketTimeoutMs, + actualHttpUserAgent, + actualErrorHandlingBehaviorConfig.getOnMaxRetryExceeded(), + actualErrorHandlingBehaviorConfig.getOnHttpClientIOFail(), + actualErrorHandlingBehaviorConfig.getOnPrometheusNonRetriableError()); + + return new PrometheusSink( + new PrometheusTimeSeriesConverter(), + MAX_IN_FLIGHT_REQUESTS, + actualMaxBufferedRequests, + actualMaxBatchSizeInSamples, + actualMaxRecordSizeInSamples, + actualMaxTimeInBufferMS, + prometheusRemoteWriteUrl, + new PrometheusAsyncHttpClientBuilder(retryConfiguration) + .setSocketTimeout(actualSocketTimeoutMs), + requestSigner, + actualHttpUserAgent, + actualErrorHandlingBehaviorConfig, + actualMetricGroupName); + } + + private static void checkValidRemoteWriteUrl(String url) { + try { + new URL(url); + } catch (MalformedURLException mue) { + throw new IllegalArgumentException("Invalid Remote-Write URL: " + url, mue); + } + } + + public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String prometheusRemoteWriteUrl) { + this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl; + return this; + } + + public PrometheusSinkBuilder setRequestSigner(PrometheusRequestSigner requestSigner) { + this.requestSigner = requestSigner; + return this; + } + + public PrometheusSinkBuilder setMaxBatchSizeInSamples(int maxBatchSizeInSamples) { + this.maxBatchSizeInSamples = maxBatchSizeInSamples; + return this; + } + + public PrometheusSinkBuilder setMaxRecordSizeInSamples(int maxRecordSizeInSamples) { + this.maxRecordSizeInSamples = maxRecordSizeInSamples; + return this; + } + + public PrometheusSinkBuilder setRetryConfiguration(RetryConfiguration retryConfiguration) { + this.retryConfiguration = retryConfiguration; + return this; + } + + public PrometheusSinkBuilder setSocketTimeoutMs(int socketTimeoutMs) { + this.socketTimeoutMs = socketTimeoutMs; + return this; + } + + public PrometheusSinkBuilder setHttpUserAgent(String httpUserAgent) { + this.httpUserAgent = httpUserAgent; + return this; + } + + public PrometheusSinkBuilder setErrorHandlingBehaviourConfiguration( + SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig) { + this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig; + return this; + } + + public PrometheusSinkBuilder setMetricGroupName(String metricGroupName) { + this.metricGroupName = metricGroupName; + return this; + } + + private Integer getMaxBatchSizeInSamples() { + return maxBatchSizeInSamples; + } + + private Integer getMaxRecordSizeInSamples() { + return maxRecordSizeInSamples; + } + + public RetryConfiguration getRetryConfiguration() { + return retryConfiguration; + } + + public Integer getSocketTimeoutMs() { + return socketTimeoutMs; + } + + public String getHttpUserAgent() { + return httpUserAgent; + } + + public SinkWriterErrorHandlingBehaviorConfiguration getErrorHandlingBehaviorConfig() { + return errorHandlingBehaviorConfig; + } + + public String getMetricGroupName() { + return metricGroupName; + } + + /// Disable accessing maxBatchSize, maxBatchSizeInBytes, and maxRecordSizeInBytes directly + + /** Not supported. Use setMaxBatchSizeInSamples(int) instead */ + @Override + public PrometheusSinkBuilder setMaxBatchSize(int maxBatchSize) { + throw new UnsupportedOperationException("maxBatchSize is not supported by this sink"); + } + + /** Not supported. Use setMaxBatchSizeInSamples(int) instead */ + @Override + public PrometheusSinkBuilder setMaxBatchSizeInBytes(long maxBatchSizeInBytes) { + throw new UnsupportedOperationException( + "maxBatchSizeInBytes is not supported by this sink"); + } + + /** Not supported. Use setMaxRecordSizeInSamples(int) instead */ + @Override + public PrometheusSinkBuilder setMaxRecordSizeInBytes(long maxRecordSizeInBytes) { + throw new UnsupportedOperationException( + "maxRecordSizeInBytes is not supported by this sink"); + } + + /** Not supported. Use getMaxBatchSizeInSamples() instead */ + @Override + protected Integer getMaxBatchSize() { + throw new UnsupportedOperationException("maxBatchSize is not supported by this sink"); + } + + /** Not supported. Use getMaxBatchSizeInSamples() instead */ + @Override + protected Long getMaxBatchSizeInBytes() { + throw new UnsupportedOperationException( + "maxRecordSizeInBytes is not supported by this sink"); + } + + /** Not supported. Use getMaxRecordSizeInSamples() instead */ + @Override + protected Long getMaxRecordSizeInBytes() { + throw new UnsupportedOperationException( + "maxRecordSizeInBytes is not supported by this sink"); + } Review Comment: We don't need to do this, as we don't expect this builder to be extended! ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/metrics/SinkMetrics.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; + +/** Wraps all metrics in a single class. */ +public class SinkMetrics { Review Comment: `@Internal`? ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics; +import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; + +import java.util.Collection; + +/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */ +@PublicEvolving +public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> { + private final String prometheusRemoteWriteUrl; + private final PrometheusAsyncHttpClientBuilder clientBuilder; + private final PrometheusRequestSigner requestSigner; + private final int maxBatchSizeInSamples; + private final String httpUserAgent; + private final SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig; + private final String metricGroupName; + + protected PrometheusSink( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + int maxRecordSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + PrometheusAsyncHttpClientBuilder clientBuilder, + PrometheusRequestSigner requestSigner, + String httpUserAgent, + SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig, + String metricGroupName) { + super( + elementConverter, + maxBatchSizeInSamples, // maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, // maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInSamples // maxRecordSizeInBytes + ); + + Preconditions.checkArgument(maxInFlightRequests == 1, "maxInFlightRequests must be 1"); + + this.maxBatchSizeInSamples = maxBatchSizeInSamples; + this.requestSigner = requestSigner; + this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl; + this.clientBuilder = clientBuilder; + this.httpUserAgent = httpUserAgent; + this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig; + this.metricGroupName = metricGroupName; + } + + public int getMaxBatchSizeInSamples() { + return maxBatchSizeInSamples; + } Review Comment: Actually can we just delete this? ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics; +import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; + +import java.util.Collection; + +/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */ +@PublicEvolving +public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> { + private final String prometheusRemoteWriteUrl; + private final PrometheusAsyncHttpClientBuilder clientBuilder; + private final PrometheusRequestSigner requestSigner; + private final int maxBatchSizeInSamples; + private final String httpUserAgent; + private final SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig; + private final String metricGroupName; + + protected PrometheusSink( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + int maxRecordSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + PrometheusAsyncHttpClientBuilder clientBuilder, + PrometheusRequestSigner requestSigner, + String httpUserAgent, + SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig, + String metricGroupName) { + super( + elementConverter, + maxBatchSizeInSamples, // maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, // maxBatchSizeInBytes, Review Comment: Can we add a comment to explain that we don't need to check the bytes, so we simply use sample as replacement for bytes ########## flink-connector-prometheus/README.md: ########## @@ -0,0 +1,260 @@ +## Flink Prometheus connector (sink) + +Implementation of the Prometheus sink connector for DataStream API. + +The sink writes to Prometheus using the Remote-Write interface, based +on [Remote-Write specifications version 1.0](https://prometheus.io/docs/concepts/remote_write_spec/) + +### Guarantees and input restrictions + +Due to the strict [ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) +and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) requirements +of Prometheus Remote-Write, the sink guarantees that input data are written to Prometheus only if input data are in +order and well-formed. + +For efficiency, the connector does not do any validation. +If input is out of order or malformed, **the write request is rejected by Prometheus**. +If the write request is rejected, depending on the configured [error handling behaviour](#error-handling-behavior) for +"Prometheus non-retriable errors", the sink will either throw an exception (`FAIL`, default behavior) or discard the +offending +request and continue (`DISCARD_AND_CONTINUE`). See [error handling behaviour](#error-handling-behavior), below, for +further details. + +The sink receives as input time-series, each containing one or more samples. +To optimise the write throughput, input time-series are batched, in the order they are received, and written with a +single write-request. + +If a write-request contains any out-of-order or malformed data, **the entire request is rejected** and all time series +are discarded. +The reason is Remote-Write +specifications [explicitly forbids retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff) of +rejected write requests (4xx responses). +and the Prometheus response does not contain enough information to efficiently partially retry the write, discarding the +offending data. + +### Responsibilities of the application + +It is responsibility of the application sending the data to the sink in the correct order and format. + +1. Input time-series must be well-formed, e.g. only valid and non-duplicated labels, + samples in timestamp order (see [Labels and Ordering](https://prometheus.io/docs/concepts/remote_write_spec/#labels) + in Prometheus Remote-Write specs). +2. Input time-series with identical labels are sent to the sink in timestamp order. +3. If sink parallelism > 1 is used, the input stream must be partitioned so that all time-series with identical labels + go to the same sink subtask. A `KeySelector` is provided to partition input correctly ( + see [Partitioning](#partitioning), below). + +#### Sink input objects + +To help sending well-formed data to the sink, the connector +expect [`PrometheusTimeSeries`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java) +POJOs as input. + +Each `PrometheusTimeSeries` instance maps 1-to-1 to +a [remote-write `TimeSeries`](https://prometheus.io/docs/concepts/remote_write_spec/#protocol). Each object contains: + +* exactly one `metericName`, mapped to the special `__name__` label +* optionally, any number of additional labels { k: String, v:String } - MUST BE IN ORDER BY KEY +* one or more `Samples` { value: double, timestamp: long } - MUST BE IN TIMESTAMP ORDER + +`PrometheusTimeSeries` provides a builder interface. + +```java + +// List<Tuple2<Double, Long>> samples = ... + +PrometheusTimeSeries.Builder tsBuilder = PrometheusTimeSeries.builder() + .withMetricName("CPU") // mapped to `__name__` label + .addLabel("InstanceID", instanceId) + .addLabel("AccountID", accountId); + +for( +Tuple2<Double, Long> sample :samples){ + tsBuilder. + +addSample(sample.f0, sample.f1); +} + +PrometheusTimeSeries ts = tsBuilder.build(); +``` + +#### Input data validation + +Prometheus imposes strict constraints to the content sent to remote-write, including label format and ordering, sample +time ordering ecc. + +For efficiency, the sink **does not do any validation or reordering** of the input. It's responsibility +of the application ensuring that input is well-formed. + +Any malformed data will be rejected on write to Prometheus. Depending on +the [error handling behaviours](#error-handling-behavior) +configured, the sink will throw an exception stopping the job (default), or drop the entire write-request, log the fact, +and continue. + +For complete details about these constraints, refer to +the [remote-write specifications](https://prometheus.io/docs/concepts/remote_write_spec/). + +### Batching, blocking writes and retry + +The sink batches multiple time-series into a single write-request, retaining the order.. + +Batching is based on the number of samples. Each write-request contains up to 500 samples, with a max buffering time of +5 seconds +(both configurable). The number of time-series doesn't matter. + +As by [Prometheus Remote-Write specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff), +the sink retries 5xx and 429 responses. Retrying is blocking, to retain sample ordering, and uses and exponential +backoff. + +The exponential backoff starts with an initial delay (default 30 ms) and increases it exponentially up to a max retry +delay (default 5 sec). It continues retrying until the max number of retries is reached (default reties forever). + +On non-retriable error response (4xx, except 429, non retryable exceptions), or on reaching the retry limit, depending +on +the configured [error handling behaviour](#error-handling-behavior) for "Max retries exceeded", the sink will either +throw +an exception (`FAIL`, default behaviour), or **discard the entire write-request**, log a warning and continue. See +[error handling behaviour](#error-handling-behavior), below, for further details. + +### Initializing the sink + +Example of sink initialisation (for documentation purposes, we are setting all parameters to their default values): +Sink + +```java +PrometheusSink sink = PrometheusSink.builder() + .setMaxBatchSizeInSamples(500) // Batch size (write-request size), in samples (default: 500) + .setMaxRecordSizeInSamples(500) // Max sink input record size, in samples (default: 500), must be <= maxBatchSizeInSamples Review Comment: Might be good to clarify the behavior when this is exceeded. The sink will continuously restart and reject the record, to be used for data validation before sending to sink. Might be even mentioning that this is not expected in the normal use case of the sink. (As compared to maxBatchSize which can be tuned for performance reasons) ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics; +import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; + +import java.util.Collection; + +/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */ +@PublicEvolving +public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> { + private final String prometheusRemoteWriteUrl; + private final PrometheusAsyncHttpClientBuilder clientBuilder; + private final PrometheusRequestSigner requestSigner; + private final int maxBatchSizeInSamples; + private final String httpUserAgent; + private final SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig; + private final String metricGroupName; + + protected PrometheusSink( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + int maxRecordSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + PrometheusAsyncHttpClientBuilder clientBuilder, + PrometheusRequestSigner requestSigner, + String httpUserAgent, + SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig, + String metricGroupName) { + super( + elementConverter, + maxBatchSizeInSamples, // maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, // maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInSamples // maxRecordSizeInBytes + ); + + Preconditions.checkArgument(maxInFlightRequests == 1, "maxInFlightRequests must be 1"); + + this.maxBatchSizeInSamples = maxBatchSizeInSamples; + this.requestSigner = requestSigner; + this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl; + this.clientBuilder = clientBuilder; + this.httpUserAgent = httpUserAgent; + this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig; + this.metricGroupName = metricGroupName; + } + + public int getMaxBatchSizeInSamples() { + return maxBatchSizeInSamples; + } Review Comment: can this be `private`? ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusRemoteWriteHttpRequestBuilder.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.util.Preconditions; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHeaders; + +import java.util.HashMap; +import java.util.Map; + +/** Builds the POST request to the Remote-Write endpoint for a given binary payload. */ +public class PrometheusRemoteWriteHttpRequestBuilder { Review Comment: Let's annotate this as `@Internal` ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/errorhandling/PrometheusSinkWriteException.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.errorhandling; + +/** Exception during writing to Prometheus Remote-Write endpoint. */ +public class PrometheusSinkWriteException extends RuntimeException { Review Comment: `@Internal`? ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior; +import org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier; +import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +/** + * Callback handling the outcome of the http async request. + * + * <p>This class implements the error handling behaviour, based on the configuration in {@link + * SinkWriterErrorHandlingBehaviorConfiguration}. Depending on the condition, the sink may throw an + * exception and cause the job to fail, or log the condition to WARN, increment the counters and + * continue with the next request. + * + * <p>In any case, every write-request either entirely succeed or fail. Partial failures are not + * handled. + * + * <p>In no condition a write-request is re-queued for the AsyncSink to reprocess: this would cause + * out of order writes that would be rejected by Prometheus. + * + * <p>Note that the http async client retries, based on the configured retry policy. The callback is + * called with an outcome of *completed* either when the request has succeeded or the max retry + * limit has been exceeded. It is responsibility of the callback distinguishing between these + * conditions. + */ +class HttpResponseCallback implements FutureCallback<SimpleHttpResponse> { Review Comment: Let's annotate this as `@Internal` ########## flink-connector-prometheus/README.md: ########## @@ -0,0 +1,260 @@ +## Flink Prometheus connector (sink) + +Implementation of the Prometheus sink connector for DataStream API. + +The sink writes to Prometheus using the Remote-Write interface, based +on [Remote-Write specifications version 1.0](https://prometheus.io/docs/concepts/remote_write_spec/) + +### Guarantees and input restrictions + +Due to the strict [ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) +and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) requirements +of Prometheus Remote-Write, the sink guarantees that input data are written to Prometheus only if input data are in +order and well-formed. + +For efficiency, the connector does not do any validation. +If input is out of order or malformed, **the write request is rejected by Prometheus**. +If the write request is rejected, depending on the configured [error handling behaviour](#error-handling-behavior) for +"Prometheus non-retriable errors", the sink will either throw an exception (`FAIL`, default behavior) or discard the +offending +request and continue (`DISCARD_AND_CONTINUE`). See [error handling behaviour](#error-handling-behavior), below, for +further details. + +The sink receives as input time-series, each containing one or more samples. +To optimise the write throughput, input time-series are batched, in the order they are received, and written with a +single write-request. + +If a write-request contains any out-of-order or malformed data, **the entire request is rejected** and all time series +are discarded. +The reason is Remote-Write +specifications [explicitly forbids retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff) of +rejected write requests (4xx responses). +and the Prometheus response does not contain enough information to efficiently partially retry the write, discarding the +offending data. + +### Responsibilities of the application + +It is responsibility of the application sending the data to the sink in the correct order and format. + +1. Input time-series must be well-formed, e.g. only valid and non-duplicated labels, + samples in timestamp order (see [Labels and Ordering](https://prometheus.io/docs/concepts/remote_write_spec/#labels) + in Prometheus Remote-Write specs). +2. Input time-series with identical labels are sent to the sink in timestamp order. +3. If sink parallelism > 1 is used, the input stream must be partitioned so that all time-series with identical labels + go to the same sink subtask. A `KeySelector` is provided to partition input correctly ( + see [Partitioning](#partitioning), below). + +#### Sink input objects + +To help sending well-formed data to the sink, the connector +expect [`PrometheusTimeSeries`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java) +POJOs as input. + +Each `PrometheusTimeSeries` instance maps 1-to-1 to +a [remote-write `TimeSeries`](https://prometheus.io/docs/concepts/remote_write_spec/#protocol). Each object contains: + +* exactly one `metericName`, mapped to the special `__name__` label +* optionally, any number of additional labels { k: String, v:String } - MUST BE IN ORDER BY KEY +* one or more `Samples` { value: double, timestamp: long } - MUST BE IN TIMESTAMP ORDER + +`PrometheusTimeSeries` provides a builder interface. + +```java + +// List<Tuple2<Double, Long>> samples = ... + +PrometheusTimeSeries.Builder tsBuilder = PrometheusTimeSeries.builder() + .withMetricName("CPU") // mapped to `__name__` label + .addLabel("InstanceID", instanceId) + .addLabel("AccountID", accountId); + +for( +Tuple2<Double, Long> sample :samples){ + tsBuilder. + +addSample(sample.f0, sample.f1); +} + +PrometheusTimeSeries ts = tsBuilder.build(); +``` + +#### Input data validation + +Prometheus imposes strict constraints to the content sent to remote-write, including label format and ordering, sample +time ordering ecc. + +For efficiency, the sink **does not do any validation or reordering** of the input. It's responsibility +of the application ensuring that input is well-formed. + +Any malformed data will be rejected on write to Prometheus. Depending on +the [error handling behaviours](#error-handling-behavior) +configured, the sink will throw an exception stopping the job (default), or drop the entire write-request, log the fact, +and continue. + +For complete details about these constraints, refer to +the [remote-write specifications](https://prometheus.io/docs/concepts/remote_write_spec/). + +### Batching, blocking writes and retry + +The sink batches multiple time-series into a single write-request, retaining the order.. + +Batching is based on the number of samples. Each write-request contains up to 500 samples, with a max buffering time of +5 seconds +(both configurable). The number of time-series doesn't matter. + +As by [Prometheus Remote-Write specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff), +the sink retries 5xx and 429 responses. Retrying is blocking, to retain sample ordering, and uses and exponential +backoff. + +The exponential backoff starts with an initial delay (default 30 ms) and increases it exponentially up to a max retry +delay (default 5 sec). It continues retrying until the max number of retries is reached (default reties forever). + +On non-retriable error response (4xx, except 429, non retryable exceptions), or on reaching the retry limit, depending +on +the configured [error handling behaviour](#error-handling-behavior) for "Max retries exceeded", the sink will either +throw +an exception (`FAIL`, default behaviour), or **discard the entire write-request**, log a warning and continue. See +[error handling behaviour](#error-handling-behavior), below, for further details. + +### Initializing the sink + +Example of sink initialisation (for documentation purposes, we are setting all parameters to their default values): +Sink + +```java +PrometheusSink sink = PrometheusSink.builder() + .setMaxBatchSizeInSamples(500) // Batch size (write-request size), in samples (default: 500) + .setMaxRecordSizeInSamples(500) // Max sink input record size, in samples (default: 500), must be <= maxBatchSizeInSamples + .setMaxTimeInBufferMS(5000) // Max time a time-series is buffered for batching (default: 5000 ms) + .setRetryConfiguration(RetryConfiguration.builder() + .setInitialRetryDelayMS(30L) // Initial retry delay (default: 30 ms) + .setMaxRetryDelayMS(5000L) // Maximum retray delay, with exponential backoff (default: 5000 ms) Review Comment: nit: retry delay ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java: ########## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Optional; + +/** Builder for Sink implementation. */ +public class PrometheusSinkBuilder + extends AsyncSinkBaseBuilder< + PrometheusTimeSeries, Types.TimeSeries, PrometheusSinkBuilder> { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkBuilder.class); + + // Max batch size, in number of samples + private static final int DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES = 500; + // Max time a record is buffered + private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; + // Max nr of requestEntry that will be buffered + private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 1000; + // Metric Group name added to the custom metrics + private static final String DEFAULT_METRIC_GROUP_NAME = "Prometheus"; + + // Max in-flight requests is always = 1, to retain ordering + private static final int MAX_IN_FLIGHT_REQUESTS = 1; + + private String prometheusRemoteWriteUrl; + private RetryConfiguration retryConfiguration; + private Integer socketTimeoutMs; + private PrometheusRequestSigner requestSigner = null; + private Integer maxBatchSizeInSamples; + private Integer maxRecordSizeInSamples; + private String httpUserAgent = null; + private SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig = null; + private String metricGroupName = null; + + @Override + public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() { + + int actualMaxBatchSizeInSamples = + Optional.ofNullable(getMaxBatchSizeInSamples()) + .orElse(DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES); + int actualMaxBufferedRequests = + Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS); + long actualMaxTimeInBufferMS = + Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS); + + int actualMaxRecordSizeInSamples = + Optional.ofNullable(getMaxRecordSizeInSamples()).orElse(getMaxBatchSizeInSamples()); + + int actualSocketTimeoutMs = + Optional.ofNullable(getSocketTimeoutMs()) + .orElse(PrometheusAsyncHttpClientBuilder.DEFAULT_SOCKET_TIMEOUT_MS); + + String actualHttpUserAgent = + Optional.ofNullable(getHttpUserAgent()) + .orElse(PrometheusRemoteWriteHttpRequestBuilder.DEFAULT_USER_AGENT); + + SinkWriterErrorHandlingBehaviorConfiguration actualErrorHandlingBehaviorConfig = + Optional.ofNullable(errorHandlingBehaviorConfig) + .orElse(SinkWriterErrorHandlingBehaviorConfiguration.DEFAULT_BEHAVIORS); + + String actualMetricGroupName = + Optional.ofNullable(metricGroupName).orElse(DEFAULT_METRIC_GROUP_NAME); + + Preconditions.checkArgument( + StringUtils.isNotBlank(prometheusRemoteWriteUrl), + "Missing or blank Prometheus Remote-Write URL"); + checkValidRemoteWriteUrl(prometheusRemoteWriteUrl); + Preconditions.checkNotNull(retryConfiguration, "Missing retry configuration"); + Preconditions.checkArgument( + actualMaxBatchSizeInSamples > 0, "Max batch size (in samples) must be positive"); + Preconditions.checkArgument( + actualMaxRecordSizeInSamples <= actualMaxBatchSizeInSamples, + "Max record size (in samples) must be <= Max batch size"); + + LOG.info( + "Prometheus sink configuration:" + + "\n\t\tmaxBatchSizeInSamples={}\n\t\tmaxRecordSizeInSamples={}" + + "\n\t\tmaxTimeInBufferMs={}\n\t\tmaxInFlightRequests={}\n\t\tmaxBufferedRequests={}" + + "\n\t\tinitialRetryDelayMs={}\n\t\tmaxRetryDelayMs={}\n\t\tmaxRetryCount={}" + + "\n\t\tsocketTimeoutMs={}\n\t\thttpUserAgent={}" + + "\n\t\tErrorHandlingBehaviour: onMaxRetryExceeded={}, onHttpClientIOFailure={}, onNonRetriableError={}", + actualMaxBatchSizeInSamples, + actualMaxRecordSizeInSamples, + actualMaxTimeInBufferMS, + MAX_IN_FLIGHT_REQUESTS, + actualMaxBufferedRequests, + retryConfiguration.getInitialRetryDelayMS(), + retryConfiguration.getMaxRetryDelayMS(), + retryConfiguration.getMaxRetryCount(), + socketTimeoutMs, + actualHttpUserAgent, + actualErrorHandlingBehaviorConfig.getOnMaxRetryExceeded(), + actualErrorHandlingBehaviorConfig.getOnHttpClientIOFail(), + actualErrorHandlingBehaviorConfig.getOnPrometheusNonRetriableError()); + + return new PrometheusSink( + new PrometheusTimeSeriesConverter(), + MAX_IN_FLIGHT_REQUESTS, + actualMaxBufferedRequests, + actualMaxBatchSizeInSamples, + actualMaxRecordSizeInSamples, + actualMaxTimeInBufferMS, + prometheusRemoteWriteUrl, + new PrometheusAsyncHttpClientBuilder(retryConfiguration) + .setSocketTimeout(actualSocketTimeoutMs), + requestSigner, + actualHttpUserAgent, + actualErrorHandlingBehaviorConfig, + actualMetricGroupName); + } + + private static void checkValidRemoteWriteUrl(String url) { + try { + new URL(url); + } catch (MalformedURLException mue) { + throw new IllegalArgumentException("Invalid Remote-Write URL: " + url, mue); + } + } + + public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String prometheusRemoteWriteUrl) { + this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl; + return this; + } + + public PrometheusSinkBuilder setRequestSigner(PrometheusRequestSigner requestSigner) { + this.requestSigner = requestSigner; + return this; + } + + public PrometheusSinkBuilder setMaxBatchSizeInSamples(int maxBatchSizeInSamples) { + this.maxBatchSizeInSamples = maxBatchSizeInSamples; + return this; + } + + public PrometheusSinkBuilder setMaxRecordSizeInSamples(int maxRecordSizeInSamples) { + this.maxRecordSizeInSamples = maxRecordSizeInSamples; + return this; + } + + public PrometheusSinkBuilder setRetryConfiguration(RetryConfiguration retryConfiguration) { + this.retryConfiguration = retryConfiguration; + return this; + } + + public PrometheusSinkBuilder setSocketTimeoutMs(int socketTimeoutMs) { + this.socketTimeoutMs = socketTimeoutMs; + return this; + } + + public PrometheusSinkBuilder setHttpUserAgent(String httpUserAgent) { + this.httpUserAgent = httpUserAgent; + return this; + } + + public PrometheusSinkBuilder setErrorHandlingBehaviourConfiguration( + SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig) { + this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig; + return this; + } + + public PrometheusSinkBuilder setMetricGroupName(String metricGroupName) { + this.metricGroupName = metricGroupName; + return this; + } + + private Integer getMaxBatchSizeInSamples() { + return maxBatchSizeInSamples; + } + + private Integer getMaxRecordSizeInSamples() { + return maxRecordSizeInSamples; + } + + public RetryConfiguration getRetryConfiguration() { + return retryConfiguration; + } + + public Integer getSocketTimeoutMs() { + return socketTimeoutMs; + } + + public String getHttpUserAgent() { + return httpUserAgent; + } + + public SinkWriterErrorHandlingBehaviorConfiguration getErrorHandlingBehaviorConfig() { + return errorHandlingBehaviorConfig; + } + + public String getMetricGroupName() { + return metricGroupName; + } Review Comment: Can we remove the `get` methods on the builder? ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/errorhandling/OnErrorBehavior.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.errorhandling; + +/** + * Defines the behaviour when an error is encountered: discard the offending request and continue, + * or fail, throwing an exception. + */ +public enum OnErrorBehavior { Review Comment: `@PublicEvolving`. Actually let's put this under a `ConfigOption` class ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.http; + +import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback; +import org.apache.flink.util.Preconditions; + +import org.apache.hc.client5.http.config.TlsConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; + +import java.io.Serializable; +import java.util.Optional; + +/** + * Builder for async http client that will retry, based on the {@link RemoteWriteRetryStrategy} + * specified. + */ +public class PrometheusAsyncHttpClientBuilder implements Serializable { Review Comment: `@PublicEvolving`? ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesLabelsAndMetricNameKeySelector.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.api.java.functions.KeySelector; + +import org.apache.commons.lang3.builder.HashCodeBuilder; + +/** + * Implementation of KeySelector ensuring the {@link PrometheusTimeSeries} with the same set of + * labels end up in the same partition. + * + * <p>For using the sink with parallelism > 1, the input of the sink must be a keyedStream using + * this KeySelector to extract the key. This guarantees TimeSeries with the same set of labels are + * written to Prometheus in the same order they are sent to the sink. + * + * <p>The partition key is the hash of all labels AND the metricName. The metricName is added as + * additional label by the sink, before writing to Prometheus. + */ +public class PrometheusTimeSeriesLabelsAndMetricNameKeySelector Review Comment: `@PublicEvolving`? ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/errorhandling/SinkWriterErrorHandlingBehaviorConfiguration.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.errorhandling; + +import java.io.Serializable; +import java.util.Optional; + +import static org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior.FAIL; + +/** + * Configure the error-handling behavior of the writer, for different types of error. Also defines + * default behaviors. + */ +public class SinkWriterErrorHandlingBehaviorConfiguration implements Serializable { Review Comment: `@PublicEvolving`? ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RetryConfiguration.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.http; + +import java.io.Serializable; + +/** Defines the retry strategy configuration. */ +public class RetryConfiguration implements Serializable { Review Comment: Can we just make this a set of `ConfigOptions`? ########## pom.xml: ########## @@ -0,0 +1,104 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-parent</artifactId> + <version>1.0.0</version> + </parent> + + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-prometheus-parent</artifactId> + <version>1.0.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <name>Flink Prometheus</name> + <modules> + <module>flink-connector-prometheus</module> + <module>flink-connector-prometheus-request-signer-amp</module> + </modules> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flink.version>1.18.1</flink.version> Review Comment: Let's make this `1.19.1` ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/errorhandling/PrometheusSinkWriteException.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.errorhandling; + +/** Exception during writing to Prometheus Remote-Write endpoint. */ +public class PrometheusSinkWriteException extends RuntimeException { + + private final String reason; + private final int httpStatusCode; + private final String httpReasonPhrase; + private final int timeSeriesCount; + private final long sampleCount; + private final String httpResponseBody; + + public PrometheusSinkWriteException(String reason, int timeSeriesCount, long sampleCount) { + super("Reason: " + reason); + this.reason = reason; + this.timeSeriesCount = timeSeriesCount; + this.sampleCount = sampleCount; + this.httpStatusCode = -1; + this.httpReasonPhrase = ""; + this.httpResponseBody = ""; + } + + public PrometheusSinkWriteException( + String reason, int timeSeriesCount, long sampleCount, Exception cause) { + super("Reason: " + reason, cause); + this.reason = reason; + this.timeSeriesCount = timeSeriesCount; + this.sampleCount = sampleCount; + this.httpStatusCode = -1; + this.httpReasonPhrase = ""; + this.httpResponseBody = ""; + } + + public PrometheusSinkWriteException( + String reason, + int httpStatusCode, + String httpReasonPhrase, + int timeSeriesCount, + long sampleCount, + String httpResponseBody) { + super( + String.format( + "Reason: %s. Http response: %d,%s (%s) .The offending write-request contains %d time-series and %d samples", + reason, + httpStatusCode, + httpReasonPhrase, + httpResponseBody, + timeSeriesCount, + sampleCount)); + this.reason = reason; + this.httpStatusCode = httpStatusCode; + this.httpReasonPhrase = httpReasonPhrase; + this.timeSeriesCount = timeSeriesCount; + this.sampleCount = sampleCount; + this.httpResponseBody = httpResponseBody; + } + + public String getReason() { + return reason; + } + + public int getHttpStatusCode() { + return httpStatusCode; + } + + public String getHttpReasonPhrase() { + return httpReasonPhrase; + } + + public int getTimeSeriesCount() { + return timeSeriesCount; + } + + public long getSampleCount() { + return sampleCount; + } + + public String getHttpResponseBody() { + return httpResponseBody; + } Review Comment: can we remove these? ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesLabelsAndMetricNameKeySelector.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.api.java.functions.KeySelector; + +import org.apache.commons.lang3.builder.HashCodeBuilder; + +/** + * Implementation of KeySelector ensuring the {@link PrometheusTimeSeries} with the same set of + * labels end up in the same partition. + * + * <p>For using the sink with parallelism > 1, the input of the sink must be a keyedStream using + * this KeySelector to extract the key. This guarantees TimeSeries with the same set of labels are + * written to Prometheus in the same order they are sent to the sink. + * + * <p>The partition key is the hash of all labels AND the metricName. The metricName is added as + * additional label by the sink, before writing to Prometheus. + */ +public class PrometheusTimeSeriesLabelsAndMetricNameKeySelector Review Comment: Can we also name this something smaller? Like `PrometheusKeySelector`? ########## pom.xml: ########## @@ -0,0 +1,104 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-parent</artifactId> + <version>1.0.0</version> + </parent> + + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-prometheus-parent</artifactId> + <version>1.0.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <name>Flink Prometheus</name> + <modules> + <module>flink-connector-prometheus</module> + <module>flink-connector-prometheus-request-signer-amp</module> + </modules> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flink.version>1.18.1</flink.version> + <protobuf.version>3.22.2</protobuf.version> + <apache.httpclient.version>5.2.1</apache.httpclient.version> + <aws.sdkv1.version>1.12.570</aws.sdkv1.version> + <log4j.version>2.17.1</log4j.version> + </properties> + + <build> + <plugins> + <plugin> Review Comment: Could we copy over the other plugins here too ? https://github.com/apache/flink-connector-aws/blob/main/pom.xml#L421-L477 ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/metrics/SinkMetricsCallback.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.metrics; + +import static org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics.SinkCounter.NUM_SAMPLES_DROPPED; +import static org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED; +import static org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics.SinkCounter.NUM_SAMPLES_OUT; +import static org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED; +import static org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_OUT; +import static org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED; + +/** Callback updating {@link SinkMetrics} on specific request outcomes. */ +public class SinkMetricsCallback { Review Comment: `@Internal`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org