darenwkt commented on code in PR #22: URL: https://github.com/apache/flink-connector-prometheus/pull/22#discussion_r2072389078
########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/PrometheusConnectorOption.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import java.util.List; + +public class PrometheusConnectorOption { + + // ----------------------------------------------------------------------------------------- + // Prometheus connector specific options + // ----------------------------------------------------------------------------------------- + + public static final ConfigOption<String> METRIC_NAME = + ConfigOptions.key("metric.name") Review Comment: Thanks Lorenzo for the review and apology for the misleading example as there were typo in there, I have updated description and below with an accurate example: In the example below, a table is created where the metricName, labels are STRING and there are 2 labels (String/String) pairs used as you have correctly pointed out. ``` Flink SQL> CREATE TABLE PromTable ( > `my_metric_name` STRING, > `my_label_1` STRING, > `my_label_2` STRING, > `sample_value` BIGINT, > `sample_ts` TIMESTAMP(3) > ) > WITH ( > 'connector' = 'prometheus', > 'metric.endpoint-url' = 'https://aps-workspaces.us-east-1.amazonaws.com/workspaces/ws-bb306bbf-e744-42e9-98a4-d90d7b2ed073/api/v1/remote_write', > 'metric.name' = 'my_metric_name', > 'metric.label.keys' = '[my_label_1,my_label_2]', > 'metric.sample.key' = 'sample_value', > 'metric.sample.timestamp' = 'sample_ts' > ); [INFO] Execute statement succeed. ``` Next, a row of sample data is inserted into the table where metricName="sample_name", and the label pairs are ("my_label_1":"sample_label_1", "my_label_2":"sample_label_2") ``` Flink SQL> INSERT INTO PromTable VALUES ('sample_name', 'sample_label_1', 'sample_label_2', 123, TIMESTAMP '2025-04-30 10:00:00.000'); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 596c86fb5485ddd3c1a3594e3c837d02 ``` ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java: ########## @@ -49,11 +49,11 @@ public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.Ti @SuppressWarnings("checkstyle:RegexpSingleline") protected PrometheusSink( - ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + ElementConverter<IN, Types.TimeSeries> elementConverter, Review Comment: I have taken a look and have found a way to support TableAPI without breaking change. In summary, the @PublicEvolving API here are: - PrometheusSink - PrometheusSinkBuilder - PrometheusTimeSeriesConverter The challenge here is that the above 3 classes need to support RowData as inputType for TableAPI implementation. There are two ways to solve this: - Option 1 - Make all 3 classes take a generic input type (Causes breaking change) - Option 2 - Introduce a Base class with generic input type for all 3 classes which can now inherit it. (No breaking change) From your comment, I agree that breaking change is unnecessary and I have updated the PR to go for Option 2. ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java: ########## @@ -49,11 +49,11 @@ public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.Ti @SuppressWarnings("checkstyle:RegexpSingleline") protected PrometheusSink( - ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + ElementConverter<IN, Types.TimeSeries> elementConverter, int maxInFlightRequests, int maxBufferedRequests, int maxBatchSizeInSamples, - int maxRecordSizeInSamples, + long maxRecordSizeInSamples, Review Comment: It's because I noticed that `maxBatchSizeInBytes` attribute in `AsyncSinkBase` class is Long and we have casted it from int to long in the super constructor of PrometheusSink. So, I thought we should match it to the super class, but I realised this is a breaking change that is not needed, so I have reverted it. ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesConverter.java: ########## @@ -22,42 +22,22 @@ import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import java.util.UnknownFormatConversionException; + /** * Converts the sink input {@link PrometheusTimeSeries} into the Protobuf {@link Types.TimeSeries} * that are sent to Prometheus. */ @PublicEvolving -public class PrometheusTimeSeriesConverter - implements ElementConverter<PrometheusTimeSeries, Types.TimeSeries> { - - private static final String METRIC_NAME_LABEL_NAME = "__name__"; +public class PrometheusTimeSeriesConverter<IN> implements ElementConverter<IN, Types.TimeSeries> { @Override - public Types.TimeSeries apply(PrometheusTimeSeries element, SinkWriter.Context context) { - Types.TimeSeries.Builder builder = - Types.TimeSeries.newBuilder() - .addLabels( - Types.Label.newBuilder() - .setName(METRIC_NAME_LABEL_NAME) - .setValue(element.getMetricName()) - .build()); - - for (PrometheusTimeSeries.Label label : element.getLabels()) { - builder.addLabels( - Types.Label.newBuilder() - .setName(label.getName()) - .setValue(label.getValue()) - .build()); - } - - for (PrometheusTimeSeries.Sample sample : element.getSamples()) { - builder.addSamples( - Types.Sample.newBuilder() - .setValue(sample.getValue()) - .setTimestamp(sample.getTimestamp()) - .build()); + public Types.TimeSeries apply(IN element, SinkWriter.Context context) { Review Comment: Answered in previous point: we will not introduce breaking change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org