nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597456357
########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior; +import org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED; + +/** + * Callback handling the outcome of the http async request. + * + * <p>This class implements the error handling behaviour, based on the configuration in {@link + * SinkWriterErrorHandlingBehaviorConfiguration}. Depending on the condition, the sink may throw an + * exception and cause the job to fail, or log the condition to WARN, increment the counters and + * continue with the next request. + * + * <p>In any case, every write-request either entirely succeed or fail. Partial failures are not + * handled. + * + * <p>In no condition a write-request is re-queued for the AsyncSink to reprocess: this would cause + * out of order writes that would be rejected by Prometheus. + * + * <p>Note that the http async client retries, based on the configured retry policy. The callback is + * called with an outcome of *completed* either when the request has succeeded or the max retry + * limit has been exceeded. It is responsibility of the callback distinguishing between these + * conditions. + */ +class HttpResponseCallback implements FutureCallback<SimpleHttpResponse> { + private static final Logger LOG = LoggerFactory.getLogger(HttpResponseCallback.class); + + private final int timeSeriesCount; + private final long sampleCount; + private final Consumer<List<Types.TimeSeries>> reQueuedResult; + private final SinkMetrics metrics; + private final SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig; + + public HttpResponseCallback( + int timeSeriesCount, + long sampleCount, + SinkMetrics metrics, Review Comment: Not exactly a minor change, because it has impact in many places. But definitely a good idea. Implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org