nicusX commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/22#discussion_r2071357779


##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesConverter.java:
##########
@@ -22,42 +22,22 @@
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.connector.prometheus.sink.prometheus.Types;
 
+import java.util.UnknownFormatConversionException;
+
 /**
  * Converts the sink input {@link PrometheusTimeSeries} into the Protobuf 
{@link Types.TimeSeries}
  * that are sent to Prometheus.
  */
 @PublicEvolving
-public class PrometheusTimeSeriesConverter
-        implements ElementConverter<PrometheusTimeSeries, Types.TimeSeries> {
-
-    private static final String METRIC_NAME_LABEL_NAME = "__name__";
+public class PrometheusTimeSeriesConverter<IN> implements ElementConverter<IN, 
Types.TimeSeries> {
 
     @Override
-    public Types.TimeSeries apply(PrometheusTimeSeries element, 
SinkWriter.Context context) {
-        Types.TimeSeries.Builder builder =
-                Types.TimeSeries.newBuilder()
-                        .addLabels(
-                                Types.Label.newBuilder()
-                                        .setName(METRIC_NAME_LABEL_NAME)
-                                        .setValue(element.getMetricName())
-                                        .build());
-
-        for (PrometheusTimeSeries.Label label : element.getLabels()) {
-            builder.addLabels(
-                    Types.Label.newBuilder()
-                            .setName(label.getName())
-                            .setValue(label.getValue())
-                            .build());
-        }
-
-        for (PrometheusTimeSeries.Sample sample : element.getSamples()) {
-            builder.addSamples(
-                    Types.Sample.newBuilder()
-                            .setValue(sample.getValue())
-                            .setTimestamp(sample.getTimestamp())
-                            .build());
+    public Types.TimeSeries apply(IN element, SinkWriter.Context context) {

Review Comment:
   As my previous point: if we still support PrometheusTimeSeries only, is it 
worth breaking the public API?



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java:
##########
@@ -49,11 +49,11 @@ public class PrometheusSink extends 
AsyncSinkBase<PrometheusTimeSeries, Types.Ti
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     protected PrometheusSink(
-            ElementConverter<PrometheusTimeSeries, Types.TimeSeries> 
elementConverter,
+            ElementConverter<IN, Types.TimeSeries> elementConverter,
             int maxInFlightRequests,
             int maxBufferedRequests,
             int maxBatchSizeInSamples,
-            int maxRecordSizeInSamples,
+            long maxRecordSizeInSamples,

Review Comment:
   Why changing this? The batch size will never be > Integer.MAX_VALUE samples



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/PrometheusConnectorOption.java:
##########
@@ -0,0 +1,67 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.util.List;
+
+public class PrometheusConnectorOption {
+
+    // 
-----------------------------------------------------------------------------------------
+    // Prometheus connector specific options
+    // 
-----------------------------------------------------------------------------------------
+
+    public static final ConfigOption<String> METRIC_NAME =
+            ConfigOptions.key("metric.name")

Review Comment:
   I am not sure I understand the interface.
   Is this configuration specifying the metric name as a constant string, or 
the field of the table that contains the metric name?
   
   In the example, it seems to be the latter.
   It should be, because in a realistic use each record can be from a different 
metric.
   
   However, looking at the example
   ```
   CREATE TABLE PromTable (
     `my_metric_name` BIGINT,
     `my_label` BIGINT,
     ...
   )
   WITH (
     'connector' = 'prometheus',
     'metric.name' = 'my_metric_name',
     'metric.label.keys' = 'my_label'
     ....
   );
   ```
   but the field is a BIGINT, while it should be a VARCHAR.
   
   Also, labels are key/value (String/String) pairs. 
   Each record (each measurement) can have a different set of Labels (Set of 
K/V) where both key and value may change
   



##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java:
##########
@@ -49,11 +49,11 @@ public class PrometheusSink extends 
AsyncSinkBase<PrometheusTimeSeries, Types.Ti
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     protected PrometheusSink(
-            ElementConverter<PrometheusTimeSeries, Types.TimeSeries> 
elementConverter,
+            ElementConverter<IN, Types.TimeSeries> elementConverter,

Review Comment:
   Is this change required for supporting Table API or just a matter of 
generalizing the API for something we are not really generalizing? The 
DataStream sink only supports PrometheusTimeSeries as input
   
   Breaking the public interface not only forces a major version, but because 
it forces existing users to change their code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to