Hi Chesnay,

Thanks again for the hints.

Unfortunately the metrics filtering feature is not part of 1.15.0. It seems
to be part of 1.16.0: https://issues.apache.org/jira/browse/FLINK-21585
I was already wondering why I could not find the feature in the docs you
linked.

> Disabling the kafka metrics _should_ work
Setting `'properties.register.consumer.metrics' = 'false',` and
'properties.register.producer.metrics'
= 'false',` in the SQL table options for source / sink works. Remaining
metrics are exposed on 9200.
The thing is I wanted to investigate in the consumer behavior in the first
place :D That`s how I came across the bug.

Anyways, big thanks for your greate support!


On Wed, May 4, 2022 at 1:53 PM Chesnay Schepler <ches...@apache.org> wrote:

> Disabling the kafka metrics _should_ work.
>
> Alternatively you could use the new generic feature to filter metrics:
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/metric_reporters/#filter-excludes
>
> metrics.reporter.<reportername>.filter.excludes:
> *KafkaProducer*;*KafkaConsumer*
>
> This should disable all kafka metrics. (You could also drill down and
> exclude specific problematic metrics; see the docs.)
> On 04/05/2022 13:36, Peter Schrott wrote:
>
> Allright! Thanks!
>
> I tried to dig a bit deeper and see if there is any workaround for that
> problem. I tried to switch off reporting the Kafka metrics, but I was not
> quite successful. I am using the table api Kafka connector.
>
> Do you have any suggestions on how to overcome this?
>
> Could you also provide the ticket number after creation?
>
> Thanks, Peter
>
> On Wed, May 4, 2022 at 1:22 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> Yes, that looks like a new bug in 1.15.
>> The migration to the new non-deprecated Kafka API in the
>> KafkaMetricMutableWrapper was done incorrectly.
>>
>> This should affect every job that uses the new kafka connector.
>>
>> Thank you for debugging the issue!
>>
>> I will create a ticket.
>>
>> On 04/05/2022 12:24, Peter Schrott wrote:
>>
>> As the stracktrace says, class cast exception occurs here:
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java#L37
>>
>> I found the following metrics to be affected (might be more):
>> MetricName [name=version, group=app-info, description=Metric indicating
>> version, tags={client-id=producer-3}]
>> -> value: "6.2.2-ccs" (String)
>>
>> MetricName [name=start-time-ms, group=app-info, description=Metric
>> indicating start-time-ms, tags={client-id=producer-3}]
>> -> value: 1651654724987 (Long)
>>
>> MetricName [name=commit-id, group=app-info, description=Metric indicating
>> commit-id, tags={client-id=producer-3}]
>> -> value: "2ceb5dc7891720b7" (String)
>>
>> Problematic code part seems to be introduced with "Bump Kafka version to
>> 2.8":
>>
>> https://github.com/apache/flink/commit/b367407d08b6dd69a52886a1c6232a9d8ee2ec0a#diff-bb47c4c2d77fd57da49a6cf5227d43ba352c2ea916776bdae92a7436dea50068
>>
>> Is this a potential bug introduced in 1.15.0?
>>
>> Best, Peter
>>
>> On Wed, May 4, 2022 at 9:58 AM Peter Schrott <pe...@bluerootlabs.io>
>> wrote:
>>
>>> Sorry for the spamming!
>>>
>>> Just after jumping into the debug-session I noticed that there are
>>> indeed exceptions thrown when fetching the metrics on port 9200:
>>>
>>> 13657 INFO  [ScalaTest-run] com.sun.net.httpserver  - HttpServer created 
>>> http 0.0.0.0/0.0.0.0:920013658 INFO  [ScalaTest-run] com.sun.net.httpserver 
>>>  - context created: /13658 INFO  [ScalaTest-run] com.sun.net.httpserver  - 
>>> context created: /metrics13659 INFO  [ScalaTest-run] 
>>> o.a.f.m.p.PrometheusReporter  - Started PrometheusReporter HTTP server on 
>>> port 9200.13745 DEBUG [prometheus-http-1-1] com.sun.net.httpserver  - GET / 
>>> HTTP/1.1 [200  OK] ()14028 DEBUG [prometheus-http-1-2] 
>>> com.sun.net.httpserver  - GET / HTTP/1.1 [200  OK] ()14998 DEBUG 
>>> [prometheus-http-1-3] com.sun.net.httpserver  - GET / HTTP/1.1 [200  OK] 
>>> ()15580 DEBUG [prometheus-http-1-4] com.sun.net.httpserver  - GET / 
>>> HTTP/1.1 [200  OK] ()16022 DEBUG [prometheus-http-1-5] 
>>> com.sun.net.httpserver  - GET / HTTP/1.1 [200  OK] ()16458 DEBUG 
>>> [prometheus-http-1-1] com.sun.net.httpserver  - GET / HTTP/1.1 [200  OK] 
>>> ()16885 DEBUG [prometheus-http-1-2] com.sun.net.httpserver  - GET / 
>>> HTTP/1.1 [200  OK] ()17381 DEBUG [prometheus-http-1-3] 
>>> com.sun.net.httpserver  - GET / HTTP/1.1 [200  OK] ()17809 DEBUG 
>>> [prometheus-http-1-4] com.sun.net.httpserver  - GET / HTTP/1.1 [200  OK] 
>>> ()18259 DEBUG [prometheus-http-1-5] com.sun.net.httpserver  - GET / 
>>> HTTP/1.1 [200  OK] ()18695 DEBUG [prometheus-http-1-1] 
>>> com.sun.net.httpserver  - GET / HTTP/1.1 [200  OK] ()19159 DEBUG 
>>> [prometheus-http-1-2] com.sun.net.httpserver  - GET / HTTP/1.1 [200  OK] 
>>> ()19758 DEBUG [prometheus-http-1-3] com.sun.net.httpserver  - GET / 
>>> HTTP/1.1 [200  OK] ()20112 DEBUG [prometheus-http-1-4] 
>>> com.sun.net.httpserver  - GET / HTTP/1.1 [200  OK] ()20544 DEBUG 
>>> [prometheus-http-1-5] com.sun.net.httpserver  - GET / HTTP/1.1 [200  OK] 
>>> ()20989 DEBUG [prometheus-http-1-1] com.sun.net.httpserver  - GET / 
>>> HTTP/1.1 [200  OK] ()21419 DEBUG [prometheus-http-1-2] 
>>> o.a.f.m.p.PrometheusReporter  - Invalid type for Gauge 
>>> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$LatestCompletedCheckpointExternalPathGauge@3fae55e7:
>>>  java.lang.String, only number types and booleans are supported by this 
>>> reporter.21421 DEBUG [prometheus-http-1-2] com.sun.net.httpserver  - GET / 
>>> HTTP/1.1 [200  OK] ()21847 DEBUG [prometheus-http-1-3] 
>>> o.a.f.m.p.PrometheusReporter  - Invalid type for Gauge 
>>> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics$$Lambda$4076/388206242@78846648:
>>>  java.lang.String, only number types and booleans are supported by this 
>>> reporter.21851 DEBUG [prometheus-http-1-3] com.sun.net.httpserver  - 
>>> ServerImpl.Exchange (2)java.lang.ClassCastException: java.lang.Long cannot 
>>> be cast to java.lang.Double   at 
>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:37)
>>>    at 
>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:27)
>>>    at 
>>> org.apache.flink.metrics.prometheus.AbstractPrometheusReporter$2.get(AbstractPrometheusReporter.java:262)
>>>    at io.prometheus.client.Gauge.collect(Gauge.java:317)   at 
>>> io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.findNextElement(CollectorRegistry.java:190)
>>>    at 
>>> io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:223)
>>>    at 
>>> io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:144)
>>>    at 
>>> io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:22)
>>>    at 
>>> io.prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:60)
>>>    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)   at 
>>> sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)   at 
>>> com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)   at 
>>> sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675)
>>>    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)   at 
>>> sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647)   at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>    at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>    at java.lang.Thread.run(Thread.java:748)21851 TRACE 
>>> [prometheus-http-1-3] com.sun.net.httpserver  - Closing connection: 
>>> java.nio.channels.SocketChannel[connected local=/127.0.0.1:9200 
>>> remote=/127.0.0.1:50508]
>>>
>>>
>>> For my defence: This jul - slf4j - logback setup is really nasty :O
>>>
>>> Best, Peter
>>>
>>>
>>>
>>> On Wed, May 4, 2022 at 9:47 AM Peter Schrott <pe...@bluerootlabs.io>
>>> wrote:
>>>
>>>> Hi Chesnay,
>>>>
>>>> Thanks for that support! Just for compilation: Running the
>>>> "Problem-Job" locally as test in Intellij (as Chesney suggested above)
>>>> reproduces the described problem:
>>>>
>>>> ➜  ~ curl localhost:9200curl: (52) Empty reply from server
>>>>
>>>> Doing the same with other jobs metrics are available on localhost:9200.
>>>>
>>>> One other thing I noticed yesterday in the cluster is that job/task
>>>> specific metrics are available for a very short time after the job is
>>>> started (for around a few seconds). E.g:
>>>>
>>>> # HELP flink_taskmanager_job_task_backPressuredTimeMsPerSecond 
>>>> backPressuredTimeMsPerSecond (scope: taskmanager_job_task)
>>>>
>>>> After all tasks are "green" in the webui, the "empty reply from server"
>>>> is back.
>>>>
>>>> 1)
>>>> I changed the prometheus config in my cluster, but as you saied, it
>>>> does not have any impact.
>>>>
>>>> 2)
>>>> For the logging in a test scenario, I also had to add the following
>>>> lines in my test class:
>>>>
>>>> SLF4JBridgeHandler.removeHandlersForRootLogger()
>>>> SLF4JBridgeHandler.install()
>>>>
>>>> (source:
>>>> https://www.slf4j.org/api/org/slf4j/bridge/SLF4JBridgeHandler.html)
>>>>  As well as resetting log levels for jul in my logback.xml:
>>>>
>>>> <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"> 
>>>>    <resetJUL>true</resetJUL></contextListener>
>>>>
>>>> This infos just for completeness, if someone else stumbles upon.
>>>>
>>>> I set the following loggers to lvl TRACE:
>>>>
>>>> <logger name="com.sun.net.httpserver" level="TRACE" additive="false">    
>>>> <appender-ref ref="ASYNC_FILE" /></logger><logger 
>>>> name="org.apache.flink.metrics.prometheus" level="TRACE" additive="false"> 
>>>>    <appender-ref ref="ASYNC_FILE" /></logger><logger 
>>>> name="io.prometheus.client" level="TRACE" additive="false">    
>>>> <appender-ref ref="ASYNC_FILE" /></logger>
>>>>
>>>> When running the job in a local test as suggested above I get the
>>>> following log messages:
>>>>
>>>> 12701 INFO  [ScalaTest-run] com.sun.net.httpserver  - HttpServer created 
>>>> http 0.0.0.0/0.0.0.0:920012703 INFO  [ScalaTest-run] 
>>>> com.sun.net.httpserver  - context created: /12703 INFO  [ScalaTest-run] 
>>>> com.sun.net.httpserver  - context created: /metrics12704 INFO  
>>>> [ScalaTest-run] o.a.f.m.p.PrometheusReporter  - Started PrometheusReporter 
>>>> HTTP server on port 9200.
>>>>
>>>>
>>>> 3)
>>>> I have not tried to reproduce in a local cluster yet, as the issue is
>>>> also reproducible in the test environment. But thanks for the hint - could
>>>> be very helpful!
>>>>
>>>>  __
>>>>
>>>> From the observations it does not seem like there is a problem with the
>>>> http server itself. I am just making assumptions: It feels like there is a
>>>> problem with reading and providing the metrics. As the issue
>>>> reproducible in the local setup I have the comfy option to debug in
>>>> Intellij now - I'll spend my day with this if no other hints or ideas 
>>>> arise.
>>>>
>>>> Thanks & Best, Peter
>>>>
>>>> On Tue, May 3, 2022 at 4:01 PM Chesnay Schepler <ches...@apache.org>
>>>> wrote:
>>>>
>>>>> > I noticed that my config of the PrometheusReporter is
>>>>> different here. I have: `metrics.reporter.prom.class:
>>>>> org.apache.flink.metrics.prometheus.PrometheusReporter`. I will 
>>>>> investigate
>>>>> if this is a problem.
>>>>>
>>>>> That's not a problem.
>>>>>
>>>>> > Which trace logs are interesting?
>>>>>
>>>>> The logging config I provided should highlight the relevant bits
>>>>> (com.sun.net.httpserver).
>>>>> At least in my local tests this is where any interesting things were
>>>>> logged.
>>>>> Note that this part of the code uses java.util.logging, not
>>>>> slf4j/log4j.
>>>>>
>>>>> > When running a local flink (start-cluster.sh), I do not have a
>>>>> certain url/port to access the taskmanager, right?
>>>>>
>>>>> If you configure a port range it should be as simple as curl
>>>>> localhost:<port>.
>>>>> You can find the used port in the taskmanager logs.
>>>>> Or just try the first N ports in the range ;)
>>>>>
>>>>> On 03/05/2022 14:11, Peter Schrott wrote:
>>>>>
>>>>> Hi Chesnay,
>>>>>
>>>>> Thanks for the code snipped. Which trace logs are interesting? Of "
>>>>> org.apache.flink.metrics.prometheus.PrometheusReporter"?
>>>>> I could also add this logger settings in the environment where the
>>>>> problem is present.
>>>>>
>>>>> Other than that, I am not sure how to reproduce this issue in a local
>>>>> setup. In the cluster where the metrics are missing I am navigating to the
>>>>> certain taskmanager and try to access the metrics via the configured
>>>>> prometheus port. When running a local flink (start-cluster.sh), I do not
>>>>> have a certain url/port to access the taskmanager, right?
>>>>>
>>>>> I noticed that my config of the PrometheusReporter is different here.
>>>>> I have: `metrics.reporter.prom.class:
>>>>> org.apache.flink.metrics.prometheus.PrometheusReporter`. I will 
>>>>> investigate
>>>>> if this is a problem.
>>>>>
>>>>> Unfortunately I can not provide my job at the moment. It
>>>>> contains business logic and it is tightly coupled with our Kafka systems. 
>>>>> I
>>>>> will check the option of creating a sample job to reproduce the problem.
>>>>>
>>>>> Best, Peter
>>>>>
>>>>> On Tue, May 3, 2022 at 12:48 PM Chesnay Schepler <ches...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> You'd help me out greatly if you could provide me with a sample job
>>>>>> that runs into the issue.
>>>>>>
>>>>>> So far I wasn't able to reproduce the issue,
>>>>>> but it should be clear that there is some given 3 separate reports,
>>>>>> although it is strange that so far it was only reported for
>>>>>> Prometheus.
>>>>>>
>>>>>> If one of you is able to reproduce the issue within a Test and is
>>>>>> feeling adventurous,
>>>>>> then you might be able to get more information by forwarding the
>>>>>> java.util.logging
>>>>>> to SLF4J. Below is some code to get you started.
>>>>>>
>>>>>> DebuggingTest.java:
>>>>>>
>>>>>> class DebuggingTest {
>>>>>>
>>>>>>     static {
>>>>>>         LogManager.getLogManager().getLogger("").setLevel(Level.FINEST);
>>>>>>         SLF4JBridgeHandler.removeHandlersForRootLogger();
>>>>>>         SLF4JBridgeHandler.install();
>>>>>>         miniClusterExtension =
>>>>>>                 new MiniClusterExtension(
>>>>>>                         new MiniClusterResourceConfiguration.Builder()
>>>>>>                                 .setConfiguration(getConfiguration())
>>>>>>                                 .setNumberSlotsPerTaskManager(1)
>>>>>>                                 .build());
>>>>>>     }
>>>>>>
>>>>>>     @RegisterExtension private static final MiniClusterExtension 
>>>>>> miniClusterExtension;
>>>>>>
>>>>>>     private static Configuration getConfiguration() {
>>>>>>         final Configuration configuration = new Configuration();
>>>>>>
>>>>>>         configuration.setString(
>>>>>>                 "metrics.reporter.prom.factory.class", 
>>>>>> PrometheusReporterFactory.class.getName());
>>>>>>         configuration.setString("metrics.reporter.prom.port", 
>>>>>> "9200-9300");
>>>>>>
>>>>>>         return configuration;
>>>>>>     }
>>>>>>
>>>>>>     @Test
>>>>>>     void runJob() throws Exception {
>>>>>>         <run job>
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> pom.xml:
>>>>>>
>>>>>> <dependency>
>>>>>>    <groupId>org.slf4j</groupId>
>>>>>>    <artifactId>jul-to-slf4j</artifactId>
>>>>>>    <version>1.7.32</version>
>>>>>> </dependency>
>>>>>>
>>>>>> log4j2-test.properties:
>>>>>>
>>>>>> rootLogger.level = off
>>>>>> rootLogger.appenderRef.test.ref = TestLogger
>>>>>> logger.http.name = com.sun.net.httpserver
>>>>>> logger.http.level = trace
>>>>>> appender.testlogger.name = TestLogger
>>>>>> appender.testlogger.type = CONSOLE
>>>>>> appender.testlogger.target = SYSTEM_ERR
>>>>>> appender.testlogger.layout.type = PatternLayout
>>>>>> appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
>>>>>>
>>>>>> On 03/05/2022 10:41, ChangZhuo Chen (陳昌倬) wrote:
>>>>>>
>>>>>> On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote:
>>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> I also discovered problems with the PrometheusReporter on Flink 1.15.0,
>>>>>> coming from 1.14.4. I already consulted the mailing 
>>>>>> list:https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc
>>>>>> I have not found the underlying problem or a solution to it.
>>>>>>
>>>>>> Actually, after re-checking, I see the same log WARNINGS as
>>>>>> ChangZhou described.
>>>>>>
>>>>>> As I described, it seems to be an issue with my job. If no job, or an
>>>>>> example job runs on the taskmanager the basic metrics work just fine. 
>>>>>> Maybe
>>>>>> ChangZhou can confirm this?
>>>>>>
>>>>>> @ChangZhou what's your job setup? I am running a streaming SQL job, but
>>>>>> also using data streams API to create the streaming environment and from
>>>>>> that the table environment and finally using a StatementSet to execute
>>>>>> multiple SQL statements in one job.
>>>>>>
>>>>>> We are running a streaming application with low level API with
>>>>>> Kubernetes operator FlinkDeployment.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>
>

Reply via email to