As the stracktrace says, class cast exception occurs here: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java#L37
I found the following metrics to be affected (might be more): MetricName [name=version, group=app-info, description=Metric indicating version, tags={client-id=producer-3}] -> value: "6.2.2-ccs" (String) MetricName [name=start-time-ms, group=app-info, description=Metric indicating start-time-ms, tags={client-id=producer-3}] -> value: 1651654724987 (Long) MetricName [name=commit-id, group=app-info, description=Metric indicating commit-id, tags={client-id=producer-3}] -> value: "2ceb5dc7891720b7" (String) Problematic code part seems to be introduced with "Bump Kafka version to 2.8": https://github.com/apache/flink/commit/b367407d08b6dd69a52886a1c6232a9d8ee2ec0a#diff-bb47c4c2d77fd57da49a6cf5227d43ba352c2ea916776bdae92a7436dea50068 Is this a potential bug introduced in 1.15.0? Best, Peter On Wed, May 4, 2022 at 9:58 AM Peter Schrott <pe...@bluerootlabs.io> wrote: > Sorry for the spamming! > > Just after jumping into the debug-session I noticed that there are indeed > exceptions thrown when fetching the metrics on port 9200: > > 13657 INFO [ScalaTest-run] com.sun.net.httpserver - HttpServer created http > 0.0.0.0/0.0.0.0:9200 > 13658 INFO [ScalaTest-run] com.sun.net.httpserver - context created: / > 13658 INFO [ScalaTest-run] com.sun.net.httpserver - context created: > /metrics > 13659 INFO [ScalaTest-run] o.a.f.m.p.PrometheusReporter - Started > PrometheusReporter HTTP server on port 9200. > 13745 DEBUG [prometheus-http-1-1] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 14028 DEBUG [prometheus-http-1-2] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 14998 DEBUG [prometheus-http-1-3] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 15580 DEBUG [prometheus-http-1-4] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 16022 DEBUG [prometheus-http-1-5] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 16458 DEBUG [prometheus-http-1-1] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 16885 DEBUG [prometheus-http-1-2] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 17381 DEBUG [prometheus-http-1-3] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 17809 DEBUG [prometheus-http-1-4] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 18259 DEBUG [prometheus-http-1-5] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 18695 DEBUG [prometheus-http-1-1] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 19159 DEBUG [prometheus-http-1-2] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 19758 DEBUG [prometheus-http-1-3] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 20112 DEBUG [prometheus-http-1-4] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 20544 DEBUG [prometheus-http-1-5] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 20989 DEBUG [prometheus-http-1-1] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 21419 DEBUG [prometheus-http-1-2] o.a.f.m.p.PrometheusReporter - Invalid > type for Gauge > org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$LatestCompletedCheckpointExternalPathGauge@3fae55e7: > java.lang.String, only number types and booleans are supported by this > reporter. > 21421 DEBUG [prometheus-http-1-2] com.sun.net.httpserver - GET / HTTP/1.1 > [200 OK] () > 21847 DEBUG [prometheus-http-1-3] o.a.f.m.p.PrometheusReporter - Invalid > type for Gauge > org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics$$Lambda$4076/388206242@78846648: > java.lang.String, only number types and booleans are supported by this > reporter. > 21851 DEBUG [prometheus-http-1-3] com.sun.net.httpserver - > ServerImpl.Exchange (2) > java.lang.ClassCastException: java.lang.Long cannot be cast to > java.lang.Double > at > org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:37) > at > org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:27) > at > org.apache.flink.metrics.prometheus.AbstractPrometheusReporter$2.get(AbstractPrometheusReporter.java:262) > at io.prometheus.client.Gauge.collect(Gauge.java:317) > at > io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.findNextElement(CollectorRegistry.java:190) > at > io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:223) > at > io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:144) > at > io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:22) > at > io.prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:60) > at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) > at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83) > at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82) > at > sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675) > at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) > at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 21851 TRACE [prometheus-http-1-3] com.sun.net.httpserver - Closing > connection: java.nio.channels.SocketChannel[connected local=/127.0.0.1:9200 > remote=/127.0.0.1:50508] > > > For my defence: This jul - slf4j - logback setup is really nasty :O > > Best, Peter > > > > On Wed, May 4, 2022 at 9:47 AM Peter Schrott <pe...@bluerootlabs.io> > wrote: > >> Hi Chesnay, >> >> Thanks for that support! Just for compilation: Running the "Problem-Job" >> locally as test in Intellij (as Chesney suggested above) reproduces the >> described problem: >> >> ➜ ~ curl localhost:9200curl: (52) Empty reply from server >> >> Doing the same with other jobs metrics are available on localhost:9200. >> >> One other thing I noticed yesterday in the cluster is that job/task >> specific metrics are available for a very short time after the job is >> started (for around a few seconds). E.g: >> >> # HELP flink_taskmanager_job_task_backPressuredTimeMsPerSecond >> backPressuredTimeMsPerSecond (scope: taskmanager_job_task) >> >> After all tasks are "green" in the webui, the "empty reply from server" >> is back. >> >> 1) >> I changed the prometheus config in my cluster, but as you saied, it does >> not have any impact. >> >> 2) >> For the logging in a test scenario, I also had to add the following lines >> in my test class: >> >> SLF4JBridgeHandler.removeHandlersForRootLogger() >> SLF4JBridgeHandler.install() >> >> (source: >> https://www.slf4j.org/api/org/slf4j/bridge/SLF4JBridgeHandler.html) >> As well as resetting log levels for jul in my logback.xml: >> >> <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"> >> <resetJUL>true</resetJUL> >> </contextListener> >> >> This infos just for completeness, if someone else stumbles upon. >> >> I set the following loggers to lvl TRACE: >> >> <logger name="com.sun.net.httpserver" level="TRACE" additive="false"> >> <appender-ref ref="ASYNC_FILE" /> >> </logger> >> >> <logger name="org.apache.flink.metrics.prometheus" level="TRACE" >> additive="false"> >> <appender-ref ref="ASYNC_FILE" /> >> </logger> >> >> <logger name="io.prometheus.client" level="TRACE" additive="false"> >> <appender-ref ref="ASYNC_FILE" /> >> </logger> >> >> When running the job in a local test as suggested above I get the >> following log messages: >> >> 12701 INFO [ScalaTest-run] com.sun.net.httpserver - HttpServer created >> http 0.0.0.0/0.0.0.0:9200 >> 12703 INFO [ScalaTest-run] com.sun.net.httpserver - context created: / >> 12703 INFO [ScalaTest-run] com.sun.net.httpserver - context created: >> /metrics >> 12704 INFO [ScalaTest-run] o.a.f.m.p.PrometheusReporter - Started >> PrometheusReporter HTTP server on port 9200. >> >> >> 3) >> I have not tried to reproduce in a local cluster yet, as the issue is >> also reproducible in the test environment. But thanks for the hint - could >> be very helpful! >> >> __ >> >> From the observations it does not seem like there is a problem with the >> http server itself. I am just making assumptions: It feels like there is a >> problem with reading and providing the metrics. As the issue >> reproducible in the local setup I have the comfy option to debug in >> Intellij now - I'll spend my day with this if no other hints or ideas arise. >> >> Thanks & Best, Peter >> >> On Tue, May 3, 2022 at 4:01 PM Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> > I noticed that my config of the PrometheusReporter is different here. >>> I have: `metrics.reporter.prom.class: >>> org.apache.flink.metrics.prometheus.PrometheusReporter`. I will investigate >>> if this is a problem. >>> >>> That's not a problem. >>> >>> > Which trace logs are interesting? >>> >>> The logging config I provided should highlight the relevant bits >>> (com.sun.net.httpserver). >>> At least in my local tests this is where any interesting things were >>> logged. >>> Note that this part of the code uses java.util.logging, not slf4j/log4j. >>> >>> > When running a local flink (start-cluster.sh), I do not have a certain >>> url/port to access the taskmanager, right? >>> >>> If you configure a port range it should be as simple as curl >>> localhost:<port>. >>> You can find the used port in the taskmanager logs. >>> Or just try the first N ports in the range ;) >>> >>> On 03/05/2022 14:11, Peter Schrott wrote: >>> >>> Hi Chesnay, >>> >>> Thanks for the code snipped. Which trace logs are interesting? Of " >>> org.apache.flink.metrics.prometheus.PrometheusReporter"? >>> I could also add this logger settings in the environment where the >>> problem is present. >>> >>> Other than that, I am not sure how to reproduce this issue in a local >>> setup. In the cluster where the metrics are missing I am navigating to the >>> certain taskmanager and try to access the metrics via the configured >>> prometheus port. When running a local flink (start-cluster.sh), I do not >>> have a certain url/port to access the taskmanager, right? >>> >>> I noticed that my config of the PrometheusReporter is different here. I >>> have: `metrics.reporter.prom.class: >>> org.apache.flink.metrics.prometheus.PrometheusReporter`. I will investigate >>> if this is a problem. >>> >>> Unfortunately I can not provide my job at the moment. It >>> contains business logic and it is tightly coupled with our Kafka systems. I >>> will check the option of creating a sample job to reproduce the problem. >>> >>> Best, Peter >>> >>> On Tue, May 3, 2022 at 12:48 PM Chesnay Schepler <ches...@apache.org> >>> wrote: >>> >>>> You'd help me out greatly if you could provide me with a sample job >>>> that runs into the issue. >>>> >>>> So far I wasn't able to reproduce the issue, >>>> but it should be clear that there is some given 3 separate reports, >>>> although it is strange that so far it was only reported for Prometheus. >>>> >>>> If one of you is able to reproduce the issue within a Test and is >>>> feeling adventurous, >>>> then you might be able to get more information by forwarding the >>>> java.util.logging >>>> to SLF4J. Below is some code to get you started. >>>> >>>> DebuggingTest.java: >>>> >>>> class DebuggingTest { >>>> >>>> static { >>>> LogManager.getLogManager().getLogger("").setLevel(Level.FINEST); >>>> SLF4JBridgeHandler.removeHandlersForRootLogger(); >>>> SLF4JBridgeHandler.install(); >>>> miniClusterExtension = >>>> new MiniClusterExtension( >>>> new MiniClusterResourceConfiguration.Builder() >>>> .setConfiguration(getConfiguration()) >>>> .setNumberSlotsPerTaskManager(1) >>>> .build()); >>>> } >>>> >>>> @RegisterExtension private static final MiniClusterExtension >>>> miniClusterExtension; >>>> >>>> private static Configuration getConfiguration() { >>>> final Configuration configuration = new Configuration(); >>>> >>>> configuration.setString( >>>> "metrics.reporter.prom.factory.class", >>>> PrometheusReporterFactory.class.getName()); >>>> configuration.setString("metrics.reporter.prom.port", "9200-9300"); >>>> >>>> return configuration; >>>> } >>>> >>>> @Test >>>> void runJob() throws Exception { >>>> <run job> >>>> } >>>> } >>>> >>>> >>>> pom.xml: >>>> >>>> <dependency> >>>> <groupId>org.slf4j</groupId> >>>> <artifactId>jul-to-slf4j</artifactId> >>>> <version>1.7.32</version> >>>> </dependency> >>>> >>>> log4j2-test.properties: >>>> >>>> rootLogger.level = off >>>> rootLogger.appenderRef.test.ref = TestLogger >>>> logger.http.name = com.sun.net.httpserver >>>> logger.http.level = trace >>>> appender.testlogger.name = TestLogger >>>> appender.testlogger.type = CONSOLE >>>> appender.testlogger.target = SYSTEM_ERR >>>> appender.testlogger.layout.type = PatternLayout >>>> appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n >>>> >>>> On 03/05/2022 10:41, ChangZhuo Chen (陳昌倬) wrote: >>>> >>>> On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote: >>>> >>>> Hi! >>>> >>>> I also discovered problems with the PrometheusReporter on Flink 1.15.0, >>>> coming from 1.14.4. I already consulted the mailing >>>> list:https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc >>>> I have not found the underlying problem or a solution to it. >>>> >>>> Actually, after re-checking, I see the same log WARNINGS as >>>> ChangZhou described. >>>> >>>> As I described, it seems to be an issue with my job. If no job, or an >>>> example job runs on the taskmanager the basic metrics work just fine. Maybe >>>> ChangZhou can confirm this? >>>> >>>> @ChangZhou what's your job setup? I am running a streaming SQL job, but >>>> also using data streams API to create the streaming environment and from >>>> that the table environment and finally using a StatementSet to execute >>>> multiple SQL statements in one job. >>>> >>>> We are running a streaming application with low level API with >>>> Kubernetes operator FlinkDeployment. >>>> >>>> >>>> >>>> >>>> >>>