nicusX commented on code in PR #22: URL: https://github.com/apache/flink-connector-prometheus/pull/22#discussion_r2071357779
########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesConverter.java: ########## @@ -22,42 +22,22 @@ import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import java.util.UnknownFormatConversionException; + /** * Converts the sink input {@link PrometheusTimeSeries} into the Protobuf {@link Types.TimeSeries} * that are sent to Prometheus. */ @PublicEvolving -public class PrometheusTimeSeriesConverter - implements ElementConverter<PrometheusTimeSeries, Types.TimeSeries> { - - private static final String METRIC_NAME_LABEL_NAME = "__name__"; +public class PrometheusTimeSeriesConverter<IN> implements ElementConverter<IN, Types.TimeSeries> { @Override - public Types.TimeSeries apply(PrometheusTimeSeries element, SinkWriter.Context context) { - Types.TimeSeries.Builder builder = - Types.TimeSeries.newBuilder() - .addLabels( - Types.Label.newBuilder() - .setName(METRIC_NAME_LABEL_NAME) - .setValue(element.getMetricName()) - .build()); - - for (PrometheusTimeSeries.Label label : element.getLabels()) { - builder.addLabels( - Types.Label.newBuilder() - .setName(label.getName()) - .setValue(label.getValue()) - .build()); - } - - for (PrometheusTimeSeries.Sample sample : element.getSamples()) { - builder.addSamples( - Types.Sample.newBuilder() - .setValue(sample.getValue()) - .setTimestamp(sample.getTimestamp()) - .build()); + public Types.TimeSeries apply(IN element, SinkWriter.Context context) { Review Comment: As my previous point: if we still support PrometheusTimeSeries only, is it worth breaking the public API? ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java: ########## @@ -49,11 +49,11 @@ public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.Ti @SuppressWarnings("checkstyle:RegexpSingleline") protected PrometheusSink( - ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + ElementConverter<IN, Types.TimeSeries> elementConverter, int maxInFlightRequests, int maxBufferedRequests, int maxBatchSizeInSamples, - int maxRecordSizeInSamples, + long maxRecordSizeInSamples, Review Comment: Why changing this? The batch size will never be > Integer.MAX_VALUE samples ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/PrometheusConnectorOption.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import java.util.List; + +public class PrometheusConnectorOption { + + // ----------------------------------------------------------------------------------------- + // Prometheus connector specific options + // ----------------------------------------------------------------------------------------- + + public static final ConfigOption<String> METRIC_NAME = + ConfigOptions.key("metric.name") Review Comment: I am not sure I understand the interface. Is this configuration specifying the metric name as a constant string, or the field of the table that contains the metric name? In the example, it seems to be the latter. It should be, because in a realistic use each record can be from a different metric. However, looking at the example ``` CREATE TABLE PromTable ( `my_metric_name` BIGINT, `my_label` BIGINT, ... ) WITH ( 'connector' = 'prometheus', 'metric.name' = 'my_metric_name', 'metric.label.keys' = 'my_label' .... ); ``` but the field is a BIGINT, while it should be a VARCHAR. Also, labels are key/value (String/String) pairs. Each record (each measurement) can have a different set of Labels (Set of K/V) where both key and value may change ########## flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java: ########## @@ -49,11 +49,11 @@ public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.Ti @SuppressWarnings("checkstyle:RegexpSingleline") protected PrometheusSink( - ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + ElementConverter<IN, Types.TimeSeries> elementConverter, Review Comment: Is this change required for supporting Table API or just a matter of generalizing the API for something we are not really generalizing? The DataStream sink only supports PrometheusTimeSeries as input Breaking the public interface not only forces a major version, but because it forces existing users to change their code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org