It is not possible to access metrics from within a schema.

I can't think of a non-hacky workaround (the hacky one being to create a custom kafka consumer that checks the schema class, casts it to your specific class, and then calls a method on your schema that accepts a metric group).

On 22/01/2020 14:33, David Magalhães wrote:
Hi Yun, I'm trying to use inside a custom *DeserializationSchema*. Here is the constructor of *FlinkKafkaConsumer*. Inside *DeserializationSchema* I can't use *getRuntimeContext()*.

FlinkKafkaConsumer <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html#FlinkKafkaConsumer-java.util.List-org.apache.flink.api.common.serialization.DeserializationSchema-java.util.Properties->(List <http://docs.oracle.com/javase/7/docs/api/java/util/List.html?is-external=true><String <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>> topics, DeserializationSchema <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html><T <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html>> deserializer, Properties <http://docs.oracle.com/javase/7/docs/api/java/util/Properties.html?is-external=true> props)

On Wed, Jan 22, 2020 at 3:21 AM Yun Tang <myas...@live.com <mailto:myas...@live.com>> wrote:

    Hi David

    FlinkKafkaConsumer in itself is RichParallelSourceFunction, and
    you could call function below to register your metrics group:

        
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")
        ||||



    Best
    Yun Tang
    ------------------------------------------------------------------------
    *From:* David Magalhães <speeddra...@gmail.com
    <mailto:speeddra...@gmail.com>>
    *Sent:* Tuesday, January 21, 2020 3:45
    *To:* user <user@flink.apache.org <mailto:user@flink.apache.org>>
    *Subject:* Custom Metrics outside RichFunctions
    Hi, I want to create a custom metric that shows the number of
    message that couldn't be deserialized using a custom deserializer
    inside FlinkKafkaConsumer.

    Looking into Metrics page (
    
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html )
    that doesn't seem to be possible, because it it's a RichFunction.

    Anyone know another way to achieve this ?

    Thanks,
    David


Reply via email to