Theo Diefenthal created FLINK-17348:
---------------------------------------

             Summary: Expose metric group to ascendingTimestampExtractor
                 Key: FLINK-17348
                 URL: https://issues.apache.org/jira/browse/FLINK-17348
             Project: Flink
          Issue Type: Improvement
            Reporter: Theo Diefenthal


A common use case in Flink + kafka is that one has lots of kafka Partitions 
with each having ascending timestamps.

In my scenario, due to various operational reasons, we put log files from 
Filesystem to kafka, one server per partition, and then consume those in Flink.

Sometimes, it can happen that we collect the files in wrong order into kafka 
which leads to ascending timestamp problems. If that happens and we have the 
default logging violation handler enabled, we produce several gb of logs in a 
very short amount of time, which we would like to circumvent. 

What we really want : track the number of violations in a metric and define an 
alarm on that in our monitoring dashboard.

Currently, there is sadly no way to reference the metric group from the 
ascending timestamp extractor. I wish, there could be something similar like 
the open method on other rich functions. 

My current workaround is to add a custom map task post to the source. For that 
task I need to pass on the kafka partition from the source, which I usually 
don't care about and I need to keep track of each partitions current timestamp 
manually, exactly the same way as the extractor does. - > workaround with 
"polluting" my pipeline quite a bit just for a single metric. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to