It is not possible to access metrics from within a schema.
I can't think of a non-hacky workaround (the hacky one being to create a
custom kafka consumer that checks the schema class, casts it to your
specific class, and then calls a method on your schema that accepts a
metric group).
On 22/01/2020 14:33, David Magalhães wrote:
Hi Yun, I'm trying to use inside a custom *DeserializationSchema*.
Here is the constructor of *FlinkKafkaConsumer*. Inside
*DeserializationSchema* I can't use *getRuntimeContext()*.
FlinkKafkaConsumer
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html#FlinkKafkaConsumer-java.util.List-org.apache.flink.api.common.serialization.DeserializationSchema-java.util.Properties->(List
<http://docs.oracle.com/javase/7/docs/api/java/util/List.html?is-external=true><String
<http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>> topics,
DeserializationSchema
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html><T
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html>> deserializer,
Properties
<http://docs.oracle.com/javase/7/docs/api/java/util/Properties.html?is-external=true> props)
On Wed, Jan 22, 2020 at 3:21 AM Yun Tang <myas...@live.com
<mailto:myas...@live.com>> wrote:
Hi David
FlinkKafkaConsumer in itself is RichParallelSourceFunction, and
you could call function below to register your metrics group:
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")
||||
Best
Yun Tang
------------------------------------------------------------------------
*From:* David Magalhães <speeddra...@gmail.com
<mailto:speeddra...@gmail.com>>
*Sent:* Tuesday, January 21, 2020 3:45
*To:* user <user@flink.apache.org <mailto:user@flink.apache.org>>
*Subject:* Custom Metrics outside RichFunctions
Hi, I want to create a custom metric that shows the number of
message that couldn't be deserialized using a custom deserializer
inside FlinkKafkaConsumer.
Looking into Metrics page (
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html )
that doesn't seem to be possible, because it it's a RichFunction.
Anyone know another way to achieve this ?
Thanks,
David