darenwkt commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/22#discussion_r2072389078


##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/PrometheusConnectorOption.java:
##########
@@ -0,0 +1,67 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.util.List;
+
+public class PrometheusConnectorOption {
+
+    // 
-----------------------------------------------------------------------------------------
+    // Prometheus connector specific options
+    // 
-----------------------------------------------------------------------------------------
+
+    public static final ConfigOption<String> METRIC_NAME =
+            ConfigOptions.key("metric.name")

Review Comment:
   Thanks Lorenzo for the review and apology for the misleading example as 
there were typo in there, I have updated description and below with an accurate 
example:
   
   In the example below, a table is created where the metricName, labels are 
STRING and there are 2 labels (String/String) pairs used as you have correctly 
pointed out.
   
   ```
   Flink SQL> CREATE TABLE PromTable (
   >   `my_metric_name` STRING,
   >   `my_label_1` STRING,
   >   `my_label_2` STRING,
   >   `sample_value` BIGINT,
   >   `sample_ts` TIMESTAMP(3)
   > )
   > WITH (
   >   'connector' = 'prometheus',
   >   'metric.endpoint-url' = 
'https://aps-workspaces.us-east-1.amazonaws.com/workspaces/ws-bb306bbf-e744-42e9-98a4-d90d7b2ed073/api/v1/remote_write',
   >   'metric.name' = 'my_metric_name',
   >   'metric.label.keys' = '[my_label_1,my_label_2]',
   >   'metric.sample.key' = 'sample_value',
   >   'metric.sample.timestamp' = 'sample_ts'
   > );
   [INFO] Execute statement succeed.
   ```
   
   Next, a row of sample data is inserted into the table where 
metricName="sample_name", and the label pairs are 
("my_label_1":"sample_label_1", "my_label_2":"sample_label_2")
   
   ```
   Flink SQL> INSERT INTO PromTable VALUES ('sample_name', 'sample_label_1', 
'sample_label_2', 123, TIMESTAMP '2025-04-30 10:00:00.000');
   [INFO] Submitting SQL update statement to the cluster...
   [INFO] SQL update statement has been successfully submitted to the cluster:
   Job ID: 596c86fb5485ddd3c1a3594e3c837d02
   ```
   



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java:
##########
@@ -49,11 +49,11 @@ public class PrometheusSink extends 
AsyncSinkBase<PrometheusTimeSeries, Types.Ti
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     protected PrometheusSink(
-            ElementConverter<PrometheusTimeSeries, Types.TimeSeries> 
elementConverter,
+            ElementConverter<IN, Types.TimeSeries> elementConverter,

Review Comment:
   I have taken a look and have found a way to support TableAPI without 
breaking change.
   
   In summary, the @PublicEvolving API here are:
   - PrometheusSink
   - PrometheusSinkBuilder
   - PrometheusTimeSeriesConverter
   
   The challenge here is that the above 3 classes need to support RowData as 
inputType for TableAPI implementation.
   
   There are two ways to solve this:
   - Option 1 - Make all 3 classes take a generic input type (Causes breaking 
change)
   - Option 2 - Introduce a Base class with generic input type for all 3 
classes which can now inherit it. (No breaking change)
   
    From your comment, I agree that breaking change is unnecessary and I have 
updated the PR to go for Option 2.



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java:
##########
@@ -49,11 +49,11 @@ public class PrometheusSink extends 
AsyncSinkBase<PrometheusTimeSeries, Types.Ti
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     protected PrometheusSink(
-            ElementConverter<PrometheusTimeSeries, Types.TimeSeries> 
elementConverter,
+            ElementConverter<IN, Types.TimeSeries> elementConverter,
             int maxInFlightRequests,
             int maxBufferedRequests,
             int maxBatchSizeInSamples,
-            int maxRecordSizeInSamples,
+            long maxRecordSizeInSamples,

Review Comment:
   It's because I noticed that `maxBatchSizeInBytes` attribute in 
`AsyncSinkBase` class is Long and we have casted it from int to long in the 
super constructor of PrometheusSink. So, I thought we should match it to the 
super class, but I realised this is a breaking change that is not needed, so I 
have reverted it.



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesConverter.java:
##########
@@ -22,42 +22,22 @@
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.connector.prometheus.sink.prometheus.Types;
 
+import java.util.UnknownFormatConversionException;
+
 /**
  * Converts the sink input {@link PrometheusTimeSeries} into the Protobuf 
{@link Types.TimeSeries}
  * that are sent to Prometheus.
  */
 @PublicEvolving
-public class PrometheusTimeSeriesConverter
-        implements ElementConverter<PrometheusTimeSeries, Types.TimeSeries> {
-
-    private static final String METRIC_NAME_LABEL_NAME = "__name__";
+public class PrometheusTimeSeriesConverter<IN> implements ElementConverter<IN, 
Types.TimeSeries> {
 
     @Override
-    public Types.TimeSeries apply(PrometheusTimeSeries element, 
SinkWriter.Context context) {
-        Types.TimeSeries.Builder builder =
-                Types.TimeSeries.newBuilder()
-                        .addLabels(
-                                Types.Label.newBuilder()
-                                        .setName(METRIC_NAME_LABEL_NAME)
-                                        .setValue(element.getMetricName())
-                                        .build());
-
-        for (PrometheusTimeSeries.Label label : element.getLabels()) {
-            builder.addLabels(
-                    Types.Label.newBuilder()
-                            .setName(label.getName())
-                            .setValue(label.getValue())
-                            .build());
-        }
-
-        for (PrometheusTimeSeries.Sample sample : element.getSamples()) {
-            builder.addSamples(
-                    Types.Sample.newBuilder()
-                            .setValue(sample.getValue())
-                            .setTimestamp(sample.getTimestamp())
-                            .build());
+    public Types.TimeSeries apply(IN element, SinkWriter.Context context) {

Review Comment:
   Answered in previous point: we will not introduce breaking change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to