Hi Yun, I'm trying to use inside a custom *DeserializationSchema*. Here is
the constructor of *FlinkKafkaConsumer*. Inside *DeserializationSchema* I
can't use *getRuntimeContext()*.

FlinkKafkaConsumer
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html#FlinkKafkaConsumer-java.util.List-org.apache.flink.api.common.serialization.DeserializationSchema-java.util.Properties->
(List
<http://docs.oracle.com/javase/7/docs/api/java/util/List.html?is-external=true>
<String
<http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
> topics, DeserializationSchema
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html>
<T
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html>
> deserializer, Properties
<http://docs.oracle.com/javase/7/docs/api/java/util/Properties.html?is-external=true>
 props)

On Wed, Jan 22, 2020 at 3:21 AM Yun Tang <myas...@live.com> wrote:

> Hi David
>
> FlinkKafkaConsumer in itself is RichParallelSourceFunction, and you could
> call function below to register your metrics group:
>
> getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")
>
>
>
>
> Best
> Yun Tang
> ------------------------------
> *From:* David Magalhães <speeddra...@gmail.com>
> *Sent:* Tuesday, January 21, 2020 3:45
> *To:* user <user@flink.apache.org>
> *Subject:* Custom Metrics outside RichFunctions
>
> Hi, I want to create a custom metric that shows the number of message that
> couldn't be deserialized using a custom deserializer inside
> FlinkKafkaConsumer.
>
> Looking into Metrics page (
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
>  )
> that doesn't seem to be possible, because it it's a RichFunction.
>
> Anyone know another way to achieve this ?
>
> Thanks,
> David
>

Reply via email to