Hi,

We are publishing around 200 kinds of events for 15000 customers.
Source Kafka Topics , Sink Amazon SNS Topic.
We are collecting metrics in the following combination [Event , Consumer,
PublishResult].  (Publish Result could be published or error).
So Metrics count is in the order of 200*15000*2 = 6 million and the growth
rate is expected around 20% per year

We are using Prometheus reporter for scraping metrics.
Yesterday when i found that for one customer the metric got stuck at a
particular value.
But while printing the value of "counterEventPublishedMapKey" in logs , we
are getting correct increasing values.
There are no other Warning Or Error in the logs.
Other similar metrics are being scrapped without any issues.

The Performance of the overall pipeline is good and we don't see any other
issues.
JM Memory is 4Gb (Metaspace is 1gb)
TM Memory is 4gb (Heap is 3gb)
Flink Version 1.12.2
Ran with Parallelism of 2.

There are other jobs where we have used similar structure to publish
metrics through RichMap Function and they are running successfully.

1) Can we rely on the metrics ported in this fashion to report ?
2) Has anyone faced this kind of scenario before ?
3) What do we do if a particular counter metric alone gets stuck like this
(Say other counter metrics are working).
4) Could you point out if the code structure is the reason for the same ?



public class SNSPublisher implements EventJobs {

  private static FlinkKafkaConsumer
getFlinkKafkaConsumer(ParameterTool configParams) {
    .......
  }

  @Override
  public void execute(ParameterTool configParams) throws Exception {


    final StreamExecutionEnvironment env =
        KafkaUtil.createStreamExecutionEnvironment(configParams);

    // Enabling Checkpointing

    env.enableCheckpointing(1000);
    String checkpointingDirectory =
configParams.get(AppConstant.CHECKPOINTING_DIRECTORY);
    env.setStateBackend(new FsStateBackend(checkpointingDirectory,true));
    Class<?> unmodifiableCollectionsSerializer =
Class.forName("java.util.Collections$UnmodifiableCollection");
    env.getConfig().addDefaultKryoSerializer(unmodifiableCollectionsSerializer,
        UnmodifiableCollectionsSerializer.class);
    CheckpointConfig config = env.getCheckpointConfig();
    
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

    FlinkKafkaConsumer flinkKafkaConsumer = getFlinkKafkaConsumer(configParams);

    DataStream<Tuple2<String, Event>> inputStream =
env.addSource(flinkKafkaConsumer)
        .uid(configParams.get(AppConstant.STATE_KAFKA_SOURCE_UUID))
        .name(AppConstant.SOURCE);
    DataStream<Event> eventStream = inputStream
        .map((MapFunction<Tuple2<String, Event>, Event>) value ->
value.getField(1));
    SNSMessagePublisherFunction snsMessagePublisherFunction = new
SNSMessagePublisherFunction(configParams);

    SNSMessagePublisher snsMessagePublisher = new
SNSMessagePublisherImpl(snsMessagePublisherFunction);

    DataStream<Tuple2<String, Event>> result =
snsMessagePublisher.publish(eventStream);

    result
        .keyBy(resultTuple -> getMapEventCounterKey(resultTuple))
        .map(
            new RichMapFunction<Tuple2<String, Event>, String>() {

              private transient MapState<String, Counter>
counterEventPublishedMapState;

              @Override
              public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                MapStateDescriptor<String, Counter> stateDescriptor =
                    new MapStateDescriptor<String, Counter>(
                        "counterEventPublishedMapState",
                        String.class,
                        Counter.class);

                counterEventPublishedMapState =
getRuntimeContext().getMapState(stateDescriptor);
              }

              @Override
              public String map(Tuple2<String, Event> tuple2) throws Exception {

                Event event = tuple2.getField(1);

                MetricGroup metricGroup = getRuntimeContext().getMetricGroup();

                Counter counter;
                String counterName = getResultofSNSPublish(tuple2);
                String counterEventMapKey = getMapEventCounterKey(tuple2);

                if
(counterEventPublishedMapState.contains(counterEventMapKey)) {
                  counter =
counterEventPublishedMapState.get(counterEventMapKey);
                } else {
                  counter =
                      metricGroup
                          .addGroup(AppConstant.EVENT, event.fetchEventType())
                          .addGroup(AppConstant.CONSUMER,
event.fetchConsumerID())
                          .counter(counterName);
                }
                counter.inc();
                Long counterValue = counter.getCount();
                counterEventPublishedMapState.put(counterEventMapKey, counter);

                return new StringBuilder("counterEventPublishedMapKey:")
                    .append(counterEventMapKey)
                    .append(AppConstant.COLON)
                    .append(counterValue)
                    .toString();
              }
            }).uid(configParams.get(AppConstant.COUNTER_MAP_KEY_UUID))
        .print();

    SingleOutputStreamOperator<Tuple2<String, Event>>
singleOutputStreamOperator =
        (SingleOutputStreamOperator<Tuple2<String, Event>>) result;

    
singleOutputStreamOperator.name(AppConstant.SNS_SINK_NAME).uid(configParams.get(
        AppConstant.STATE_SNS_SINK_UUID));
    env.execute(configParams.get(AppConstant.JOB_ENVIRONMENT) +
AppConstant.SNS_PUBLISHER);
  }

  private static String getMapEventCounterKey(Tuple2<String, Event>
resultTuple) {

    Event event = resultTuple.getField(1);

    return new StringBuilder(event.fetchEventType())
        .append(AppConstant.HYPEN)
        .append(event.fetchConsumerID())
        .append(AppConstant.HYPEN)
        .append(getResultofSNSPublish(resultTuple))
        .toString();
  }

  private static String getResultofSNSPublish(Tuple2<String, Event>
resultTuple) {
    //result is either  published.<SNSMessageID> or error.<error text>
    ....
  }
}



Thanks,
Prasanna.

Reply via email to