How do the shard name and id appear in the tags when you remove the metric groups?

There should be name clashes within Flink for any consumer that reads from multiple shards, since the metrics for individual shards are no longer uniquely identified.

Which reporter are you using?

I would recommend to modify the reporter you're using to use the logical scope instead of the metric identifier, like the PrometheusReporter does. This gives you a consistent metric identifier (something like "taskmanager.task.operator.kinesis.shard_name.shard_id.max_records_per_fetch"), while tags would be used to differentiate between specific instances.

On 07/10/2019 10:26, Yitzchak Lieberman wrote:
Hi. yes, I prefer to have the option to remove new metric groups.
It shouldn't do any name clashes as it appears on the tags.
Right now I've compiled flink kinesis connector with boolean option to control it.

On Mon, Oct 7, 2019 at 11:05 AM Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    What exactly would you prefer? Without the stream name and shard
    id you'd end up with name clashes all over the place.

    Why can you not aggregate them? Surely Datadog supports some way
    to define a wildcard when definying the tags to aggregate.


    On 03/10/2019 09:09, Yitzchak Lieberman wrote:
    Hi.

    I would like to have the ability to control the metric group of
    flink kinesis consumer:
    As written below it creates metric identifier for each stream
    name and shard id (in our case more than 1000 metric
    identifiers), in such matter it cannot be aggregated in data dog
    graph
    private static ShardMetricsReporterregisterShardMetrics(MetricGroup 
metricGroup, KinesisStreamShardState shardState) {
        ShardMetricsReporter shardMetrics =new ShardMetricsReporter(); 
MetricGroup streamShardMetricGroup = metricGroup
           .addGroup(
              KinesisConsumerMetricConstants.STREAM_METRICS_GROUP, 
shardState.getStreamShardHandle().getStreamName())
           .addGroup(
              KinesisConsumerMetricConstants.SHARD_METRICS_GROUP, 
shardState.getStreamShardHandle().getShard().getShardId()); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE,
 shardMetrics::getMillisBehindLatest); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH,
 shardMetrics::getMaxNumberOfRecordsPerFetch); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH,
 shardMetrics::getNumberOfAggregatedRecords); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH,
 shardMetrics::getNumberOfDeaggregatedRecords); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES,
 shardMetrics::getAverageRecordSizeBytes); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, 
shardMetrics::getBytesPerRead); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, 
shardMetrics::getRunLoopTimeNanos); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, 
shardMetrics::getLoopFrequencyHz); 
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, 
shardMetrics::getSleepTimeMillis); return shardMetrics; }
    Would be happy for your advice.
    Thanks,
    Yitzchak.



Reply via email to