hlteoh37 commented on code in PR #22: URL: https://github.com/apache/flink-connector-prometheus/pull/22#discussion_r2124415562
########## flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java: ########## @@ -98,6 +98,11 @@ private AwsCredentialsProvider getCredentialsProvider() { return credentialsProvider; } + @VisibleForTesting + String getAwsRegion() { + return awsRegion; + } + Review Comment: This is a bit of a smell, since we are exposing internal object details to test the Factory method. Can we instead test the factory method directly? ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java: ########## @@ -18,36 +18,13 @@ package org.apache.flink.connector.prometheus.sink; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.connector.base.sink.AsyncSinkBase; -import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; -import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics; -import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback; import org.apache.flink.connector.prometheus.sink.prometheus.Types; -import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.util.Preconditions; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; - -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Collection; /** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */ @PublicEvolving -public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> { - private final String prometheusRemoteWriteUrl; - private final PrometheusAsyncHttpClientBuilder clientBuilder; - private final PrometheusRequestSigner requestSigner; - private final int maxBatchSizeInSamples; - private final String httpUserAgent; - private final PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration - errorHandlingBehaviorConfig; - private final String metricGroupName; - - @SuppressWarnings("checkstyle:RegexpSingleline") +public class PrometheusSink extends PrometheusSinkBase<PrometheusTimeSeries> { Review Comment: Why don't we take the hit and simply propose a major version bump to implement a generic `PrometheusSink`? That way we remove the need to maintain two coupled public interfaces (we see this coupling from the `castToPrometheusSink()` public method in the `PrometheusSinkBase`... which is a big smell! Since the parent class creates an instance of the child class...) ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/RowDataToPrometheusTimeSeriesConverter.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.table.data.RowData.createFieldGetter; + +/** + * Converts from Flink Table API internal type of {@link RowData} to {@link PrometheusTimeSeries}. + */ +@Internal +public class RowDataToPrometheusTimeSeriesConverter { + + private final DataType physicalDataType; + private final PrometheusConfig prometheusConfig; + + public RowDataToPrometheusTimeSeriesConverter( + DataType physicalDataType, PrometheusConfig prometheusConfig) { + this.physicalDataType = physicalDataType; + this.prometheusConfig = prometheusConfig; + } + + public PrometheusTimeSeries convertRowData(RowData row) { + List<DataTypes.Field> fields = DataType.getFields(physicalDataType); + + PrometheusTimeSeries.Builder builder = PrometheusTimeSeries.builder(); + Double sampleValue = null; + Long sampleTimestamp = null; + + for (int i = 0; i < fields.size(); i++) { + DataTypes.Field field = fields.get(i); + RowData.FieldGetter fieldGetter = + createFieldGetter(fields.get(i).getDataType().getLogicalType(), i); + FieldValue fieldValue = new FieldValue(fieldGetter.getFieldOrNull(row)); + String fieldName = field.getName(); + + if (fieldName.equals(prometheusConfig.getMetricName())) { + builder.withMetricName(fieldValue.getStringValue()); + } else if (fieldName.equals(prometheusConfig.getMetricSampleKey())) { + sampleValue = fieldValue.getDoubleValue(); + } else if (prometheusConfig.getLabelKeys().contains(fieldName)) { + builder.addLabel(fieldName, fieldValue.getStringValue()); + } else if (fieldName.equals(prometheusConfig.getMetricSampleTimestamp())) { + sampleTimestamp = fieldValue.getLongValue(); + } else { + throw new IllegalArgumentException( + String.format( + "%s is not a supported field, valid fields are: %s", Review Comment: Is there a reason we don't just ignore the row in the `RowData` instead? e.g. user specifies a row in the table that is not used in the sink ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/RowDataToPrometheusTimeSeriesConverter.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.table.data.RowData.createFieldGetter; + +/** + * Converts from Flink Table API internal type of {@link RowData} to {@link PrometheusTimeSeries}. + */ +@Internal +public class RowDataToPrometheusTimeSeriesConverter { + + private final DataType physicalDataType; + private final PrometheusConfig prometheusConfig; + + public RowDataToPrometheusTimeSeriesConverter( + DataType physicalDataType, PrometheusConfig prometheusConfig) { + this.physicalDataType = physicalDataType; + this.prometheusConfig = prometheusConfig; + } + + public PrometheusTimeSeries convertRowData(RowData row) { + List<DataTypes.Field> fields = DataType.getFields(physicalDataType); + + PrometheusTimeSeries.Builder builder = PrometheusTimeSeries.builder(); + Double sampleValue = null; + Long sampleTimestamp = null; + + for (int i = 0; i < fields.size(); i++) { + DataTypes.Field field = fields.get(i); + RowData.FieldGetter fieldGetter = + createFieldGetter(fields.get(i).getDataType().getLogicalType(), i); + FieldValue fieldValue = new FieldValue(fieldGetter.getFieldOrNull(row)); + String fieldName = field.getName(); + + if (fieldName.equals(prometheusConfig.getMetricName())) { + builder.withMetricName(fieldValue.getStringValue()); + } else if (fieldName.equals(prometheusConfig.getMetricSampleKey())) { + sampleValue = fieldValue.getDoubleValue(); + } else if (prometheusConfig.getLabelKeys().contains(fieldName)) { + builder.addLabel(fieldName, fieldValue.getStringValue()); + } else if (fieldName.equals(prometheusConfig.getMetricSampleTimestamp())) { + sampleTimestamp = fieldValue.getLongValue(); + } else { + throw new IllegalArgumentException( + String.format( + "%s is not a supported field, valid fields are: %s", + fieldName, + Arrays.asList( + prometheusConfig.getMetricName(), + prometheusConfig.getLabelKeys(), + prometheusConfig.getMetricSampleKey(), + prometheusConfig.getMetricSampleTimestamp()))); + } + } + + if (sampleValue != null && sampleTimestamp != null) { + builder.addSample(sampleValue, sampleTimestamp); + } else { + throw new IllegalArgumentException( + String.format( + "Row is missing sampleValue field; %s or sampleTimestamp: %s", + sampleValue, sampleTimestamp)); + } + + return builder.build(); + } + + private static class FieldValue { + private final Object value; + + private FieldValue(Object value) { + this.value = value; + } + + private String getStringValue() { + return value.toString(); + } Review Comment: Are there any unsupported types? What happens if the field provided is a `MAP` / `ARRAY` ? https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#list-of-data-types ########## flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerFactory.java: ########## @@ -0,0 +1,36 @@ +package org.apache.flink.connector.prometheus.sink.aws; + +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.connector.prometheus.table.PrometheusConfig; +import org.apache.flink.connector.prometheus.table.PrometheusDynamicRequestSignerFactory; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class AmazonManagedPrometheusWriteRequestSignerFactory + implements PrometheusDynamicRequestSignerFactory { + @Override + public String requestSignerIdentifer() { + return "amazon-managed-prometheus"; + } + + @Override + public PrometheusRequestSigner getRequestSigner(PrometheusConfig config) { + return new AmazonManagedPrometheusWriteRequestSigner( + config.getRemoteWriteEndpointUrl(), + getAwsRegionFromEndpointUrl(config.getRemoteWriteEndpointUrl())); + } + + private String getAwsRegionFromEndpointUrl(String endpointUrl) { + String regex = "aps[a-zA-Z0-9-]*\\.([^.]+)\\."; + Pattern pattern = Pattern.compile(regex); Review Comment: Let's make this static, instead of reinitialising on each request ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBase.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics; +import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Collection; + +public class PrometheusSinkBase<IN> extends AsyncSinkBase<IN, Types.TimeSeries> { Review Comment: This is `@PublicEvolving`. We will be extending the public interface for the sink repository 2x. ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/PrometheusDynamicSink.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder; +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.connector.prometheus.sink.PrometheusSinkBase; +import org.apache.flink.connector.prometheus.sink.PrometheusSinkBaseBuilder; +import org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.stream.Stream; + +public class PrometheusDynamicSink extends AsyncDynamicTableSink<Types.TimeSeries> Review Comment: This is `@PublicEvolving`. ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBase.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics; +import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Collection; + +public class PrometheusSinkBase<IN> extends AsyncSinkBase<IN, Types.TimeSeries> { + private final String prometheusRemoteWriteUrl; + private final PrometheusAsyncHttpClientBuilder clientBuilder; + private final PrometheusRequestSigner requestSigner; + private final int maxBatchSizeInSamples; + private final int maxRecordSizeInSamples; + private final String httpUserAgent; + private final PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration + errorHandlingBehaviorConfig; + private final String metricGroupName; + + @SuppressWarnings("checkstyle:RegexpSingleline") + protected PrometheusSinkBase( + ElementConverter<IN, Types.TimeSeries> elementConverter, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + int maxRecordSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + PrometheusAsyncHttpClientBuilder clientBuilder, + PrometheusRequestSigner requestSigner, + String httpUserAgent, + PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration + errorHandlingBehaviorConfig, + String metricGroupName) { + // This sink batches in terms of "samples", because writes to Prometheus are better + // optimized in terms of samples. AsyncSinkBase handles batching and does not make any + // assumptions about the actual unit of "size", but parameters are named assuming this unit + // is "bytes". + super( + elementConverter, + maxBatchSizeInSamples, // maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, // maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInSamples // maxRecordSizeInBytes + ); + + Preconditions.checkArgument( + maxBatchSizeInSamples > 1, "Max batch size (in samples) must be positive"); + Preconditions.checkArgument( + maxRecordSizeInSamples <= maxBatchSizeInSamples, + "Max record size (in samples) must be <= Max batch size"); + Preconditions.checkArgument(maxInFlightRequests == 1, "Max in-flight requests must be 1"); + Preconditions.checkArgument( + StringUtils.isNotBlank(prometheusRemoteWriteUrl), + "Missing or blank Prometheus Remote-Write URL"); + checkValidRemoteWriteUrl(prometheusRemoteWriteUrl); + Preconditions.checkArgument( + StringUtils.isNotBlank(httpUserAgent), "Missing HTTP User Agent string"); + Preconditions.checkNotNull( + errorHandlingBehaviorConfig, "Missing error handling configuration"); + Preconditions.checkArgument( + StringUtils.isNotBlank(metricGroupName), "Missing metric group name"); + this.maxBatchSizeInSamples = maxBatchSizeInSamples; + this.maxRecordSizeInSamples = maxRecordSizeInSamples; + this.requestSigner = requestSigner; + this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl; + this.clientBuilder = clientBuilder; + this.httpUserAgent = httpUserAgent; + this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig; + this.metricGroupName = metricGroupName; + } + + @Override + public StatefulSinkWriter<IN, BufferedRequestState<Types.TimeSeries>> createWriter( + InitContext initContext) { + SinkMetricsCallback metricsCallback = + new SinkMetricsCallback( + SinkMetrics.registerSinkMetrics( + initContext.metricGroup().addGroup(metricGroupName))); + CloseableHttpAsyncClient asyncHttpClient = + clientBuilder.buildAndStartClient(metricsCallback); + + return new PrometheusSinkWriter<>( + getElementConverter(), + initContext, + getMaxInFlightRequests(), + getMaxBufferedRequests(), + maxBatchSizeInSamples, + maxRecordSizeInSamples, + getMaxTimeInBufferMS(), + prometheusRemoteWriteUrl, + asyncHttpClient, + metricsCallback, + requestSigner, + httpUserAgent, + errorHandlingBehaviorConfig); + } + + @Override + public StatefulSinkWriter<IN, BufferedRequestState<Types.TimeSeries>> restoreWriter( + InitContext initContext, + Collection<BufferedRequestState<Types.TimeSeries>> recoveredState) { + SinkMetricsCallback metricsCallback = + new SinkMetricsCallback( + SinkMetrics.registerSinkMetrics( + initContext.metricGroup().addGroup(metricGroupName))); + CloseableHttpAsyncClient asyncHttpClient = + clientBuilder.buildAndStartClient(metricsCallback); + return new PrometheusSinkWriter<>( + getElementConverter(), + initContext, + getMaxInFlightRequests(), + getMaxBufferedRequests(), + maxBatchSizeInSamples, + maxRecordSizeInSamples, + getMaxTimeInBufferMS(), + prometheusRemoteWriteUrl, + asyncHttpClient, + metricsCallback, + requestSigner, + httpUserAgent, + errorHandlingBehaviorConfig, + recoveredState); + } + + public PrometheusSink castToPrometheusSink() { + return new PrometheusSink( + new PrometheusTimeSeriesConverter(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + maxBatchSizeInSamples, + maxRecordSizeInSamples, + getMaxTimeInBufferMS(), + prometheusRemoteWriteUrl, + clientBuilder, + requestSigner, + httpUserAgent, + errorHandlingBehaviorConfig, + metricGroupName); + } Review Comment: The fact that we need this is a big code smell, since the higher level abstraction generates the lower level abstraction. Given we have just released the connector, let's consider instead "fixing" the interface, upgrading the major version to remove this problem? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org