Hi Chesnay, Thanks again for the hints.
Unfortunately the metrics filtering feature is not part of 1.15.0. It seems to be part of 1.16.0: https://issues.apache.org/jira/browse/FLINK-21585 I was already wondering why I could not find the feature in the docs you linked. > Disabling the kafka metrics _should_ work Setting `'properties.register.consumer.metrics' = 'false',` and 'properties.register.producer.metrics' = 'false',` in the SQL table options for source / sink works. Remaining metrics are exposed on 9200. The thing is I wanted to investigate in the consumer behavior in the first place :D That`s how I came across the bug. Anyways, big thanks for your greate support! On Wed, May 4, 2022 at 1:53 PM Chesnay Schepler <ches...@apache.org> wrote: > Disabling the kafka metrics _should_ work. > > Alternatively you could use the new generic feature to filter metrics: > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/metric_reporters/#filter-excludes > > metrics.reporter.<reportername>.filter.excludes: > *KafkaProducer*;*KafkaConsumer* > > This should disable all kafka metrics. (You could also drill down and > exclude specific problematic metrics; see the docs.) > On 04/05/2022 13:36, Peter Schrott wrote: > > Allright! Thanks! > > I tried to dig a bit deeper and see if there is any workaround for that > problem. I tried to switch off reporting the Kafka metrics, but I was not > quite successful. I am using the table api Kafka connector. > > Do you have any suggestions on how to overcome this? > > Could you also provide the ticket number after creation? > > Thanks, Peter > > On Wed, May 4, 2022 at 1:22 PM Chesnay Schepler <ches...@apache.org> > wrote: > >> Yes, that looks like a new bug in 1.15. >> The migration to the new non-deprecated Kafka API in the >> KafkaMetricMutableWrapper was done incorrectly. >> >> This should affect every job that uses the new kafka connector. >> >> Thank you for debugging the issue! >> >> I will create a ticket. >> >> On 04/05/2022 12:24, Peter Schrott wrote: >> >> As the stracktrace says, class cast exception occurs here: >> >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java#L37 >> >> I found the following metrics to be affected (might be more): >> MetricName [name=version, group=app-info, description=Metric indicating >> version, tags={client-id=producer-3}] >> -> value: "6.2.2-ccs" (String) >> >> MetricName [name=start-time-ms, group=app-info, description=Metric >> indicating start-time-ms, tags={client-id=producer-3}] >> -> value: 1651654724987 (Long) >> >> MetricName [name=commit-id, group=app-info, description=Metric indicating >> commit-id, tags={client-id=producer-3}] >> -> value: "2ceb5dc7891720b7" (String) >> >> Problematic code part seems to be introduced with "Bump Kafka version to >> 2.8": >> >> https://github.com/apache/flink/commit/b367407d08b6dd69a52886a1c6232a9d8ee2ec0a#diff-bb47c4c2d77fd57da49a6cf5227d43ba352c2ea916776bdae92a7436dea50068 >> >> Is this a potential bug introduced in 1.15.0? >> >> Best, Peter >> >> On Wed, May 4, 2022 at 9:58 AM Peter Schrott <pe...@bluerootlabs.io> >> wrote: >> >>> Sorry for the spamming! >>> >>> Just after jumping into the debug-session I noticed that there are >>> indeed exceptions thrown when fetching the metrics on port 9200: >>> >>> 13657 INFO [ScalaTest-run] com.sun.net.httpserver - HttpServer created >>> http 0.0.0.0/0.0.0.0:920013658 INFO [ScalaTest-run] com.sun.net.httpserver >>> - context created: /13658 INFO [ScalaTest-run] com.sun.net.httpserver - >>> context created: /metrics13659 INFO [ScalaTest-run] >>> o.a.f.m.p.PrometheusReporter - Started PrometheusReporter HTTP server on >>> port 9200.13745 DEBUG [prometheus-http-1-1] com.sun.net.httpserver - GET / >>> HTTP/1.1 [200 OK] ()14028 DEBUG [prometheus-http-1-2] >>> com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] ()14998 DEBUG >>> [prometheus-http-1-3] com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] >>> ()15580 DEBUG [prometheus-http-1-4] com.sun.net.httpserver - GET / >>> HTTP/1.1 [200 OK] ()16022 DEBUG [prometheus-http-1-5] >>> com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] ()16458 DEBUG >>> [prometheus-http-1-1] com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] >>> ()16885 DEBUG [prometheus-http-1-2] com.sun.net.httpserver - GET / >>> HTTP/1.1 [200 OK] ()17381 DEBUG [prometheus-http-1-3] >>> com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] ()17809 DEBUG >>> [prometheus-http-1-4] com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] >>> ()18259 DEBUG [prometheus-http-1-5] com.sun.net.httpserver - GET / >>> HTTP/1.1 [200 OK] ()18695 DEBUG [prometheus-http-1-1] >>> com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] ()19159 DEBUG >>> [prometheus-http-1-2] com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] >>> ()19758 DEBUG [prometheus-http-1-3] com.sun.net.httpserver - GET / >>> HTTP/1.1 [200 OK] ()20112 DEBUG [prometheus-http-1-4] >>> com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] ()20544 DEBUG >>> [prometheus-http-1-5] com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] >>> ()20989 DEBUG [prometheus-http-1-1] com.sun.net.httpserver - GET / >>> HTTP/1.1 [200 OK] ()21419 DEBUG [prometheus-http-1-2] >>> o.a.f.m.p.PrometheusReporter - Invalid type for Gauge >>> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$LatestCompletedCheckpointExternalPathGauge@3fae55e7: >>> java.lang.String, only number types and booleans are supported by this >>> reporter.21421 DEBUG [prometheus-http-1-2] com.sun.net.httpserver - GET / >>> HTTP/1.1 [200 OK] ()21847 DEBUG [prometheus-http-1-3] >>> o.a.f.m.p.PrometheusReporter - Invalid type for Gauge >>> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics$$Lambda$4076/388206242@78846648: >>> java.lang.String, only number types and booleans are supported by this >>> reporter.21851 DEBUG [prometheus-http-1-3] com.sun.net.httpserver - >>> ServerImpl.Exchange (2)java.lang.ClassCastException: java.lang.Long cannot >>> be cast to java.lang.Double at >>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:37) >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:27) >>> at >>> org.apache.flink.metrics.prometheus.AbstractPrometheusReporter$2.get(AbstractPrometheusReporter.java:262) >>> at io.prometheus.client.Gauge.collect(Gauge.java:317) at >>> io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.findNextElement(CollectorRegistry.java:190) >>> at >>> io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:223) >>> at >>> io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:144) >>> at >>> io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:22) >>> at >>> io.prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:60) >>> at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) at >>> sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83) at >>> com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82) at >>> sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675) >>> at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) at >>> sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647) at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748)21851 TRACE >>> [prometheus-http-1-3] com.sun.net.httpserver - Closing connection: >>> java.nio.channels.SocketChannel[connected local=/127.0.0.1:9200 >>> remote=/127.0.0.1:50508] >>> >>> >>> For my defence: This jul - slf4j - logback setup is really nasty :O >>> >>> Best, Peter >>> >>> >>> >>> On Wed, May 4, 2022 at 9:47 AM Peter Schrott <pe...@bluerootlabs.io> >>> wrote: >>> >>>> Hi Chesnay, >>>> >>>> Thanks for that support! Just for compilation: Running the >>>> "Problem-Job" locally as test in Intellij (as Chesney suggested above) >>>> reproduces the described problem: >>>> >>>> ➜ ~ curl localhost:9200curl: (52) Empty reply from server >>>> >>>> Doing the same with other jobs metrics are available on localhost:9200. >>>> >>>> One other thing I noticed yesterday in the cluster is that job/task >>>> specific metrics are available for a very short time after the job is >>>> started (for around a few seconds). E.g: >>>> >>>> # HELP flink_taskmanager_job_task_backPressuredTimeMsPerSecond >>>> backPressuredTimeMsPerSecond (scope: taskmanager_job_task) >>>> >>>> After all tasks are "green" in the webui, the "empty reply from server" >>>> is back. >>>> >>>> 1) >>>> I changed the prometheus config in my cluster, but as you saied, it >>>> does not have any impact. >>>> >>>> 2) >>>> For the logging in a test scenario, I also had to add the following >>>> lines in my test class: >>>> >>>> SLF4JBridgeHandler.removeHandlersForRootLogger() >>>> SLF4JBridgeHandler.install() >>>> >>>> (source: >>>> https://www.slf4j.org/api/org/slf4j/bridge/SLF4JBridgeHandler.html) >>>> As well as resetting log levels for jul in my logback.xml: >>>> >>>> <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"> >>>> <resetJUL>true</resetJUL></contextListener> >>>> >>>> This infos just for completeness, if someone else stumbles upon. >>>> >>>> I set the following loggers to lvl TRACE: >>>> >>>> <logger name="com.sun.net.httpserver" level="TRACE" additive="false"> >>>> <appender-ref ref="ASYNC_FILE" /></logger><logger >>>> name="org.apache.flink.metrics.prometheus" level="TRACE" additive="false"> >>>> <appender-ref ref="ASYNC_FILE" /></logger><logger >>>> name="io.prometheus.client" level="TRACE" additive="false"> >>>> <appender-ref ref="ASYNC_FILE" /></logger> >>>> >>>> When running the job in a local test as suggested above I get the >>>> following log messages: >>>> >>>> 12701 INFO [ScalaTest-run] com.sun.net.httpserver - HttpServer created >>>> http 0.0.0.0/0.0.0.0:920012703 INFO [ScalaTest-run] >>>> com.sun.net.httpserver - context created: /12703 INFO [ScalaTest-run] >>>> com.sun.net.httpserver - context created: /metrics12704 INFO >>>> [ScalaTest-run] o.a.f.m.p.PrometheusReporter - Started PrometheusReporter >>>> HTTP server on port 9200. >>>> >>>> >>>> 3) >>>> I have not tried to reproduce in a local cluster yet, as the issue is >>>> also reproducible in the test environment. But thanks for the hint - could >>>> be very helpful! >>>> >>>> __ >>>> >>>> From the observations it does not seem like there is a problem with the >>>> http server itself. I am just making assumptions: It feels like there is a >>>> problem with reading and providing the metrics. As the issue >>>> reproducible in the local setup I have the comfy option to debug in >>>> Intellij now - I'll spend my day with this if no other hints or ideas >>>> arise. >>>> >>>> Thanks & Best, Peter >>>> >>>> On Tue, May 3, 2022 at 4:01 PM Chesnay Schepler <ches...@apache.org> >>>> wrote: >>>> >>>>> > I noticed that my config of the PrometheusReporter is >>>>> different here. I have: `metrics.reporter.prom.class: >>>>> org.apache.flink.metrics.prometheus.PrometheusReporter`. I will >>>>> investigate >>>>> if this is a problem. >>>>> >>>>> That's not a problem. >>>>> >>>>> > Which trace logs are interesting? >>>>> >>>>> The logging config I provided should highlight the relevant bits >>>>> (com.sun.net.httpserver). >>>>> At least in my local tests this is where any interesting things were >>>>> logged. >>>>> Note that this part of the code uses java.util.logging, not >>>>> slf4j/log4j. >>>>> >>>>> > When running a local flink (start-cluster.sh), I do not have a >>>>> certain url/port to access the taskmanager, right? >>>>> >>>>> If you configure a port range it should be as simple as curl >>>>> localhost:<port>. >>>>> You can find the used port in the taskmanager logs. >>>>> Or just try the first N ports in the range ;) >>>>> >>>>> On 03/05/2022 14:11, Peter Schrott wrote: >>>>> >>>>> Hi Chesnay, >>>>> >>>>> Thanks for the code snipped. Which trace logs are interesting? Of " >>>>> org.apache.flink.metrics.prometheus.PrometheusReporter"? >>>>> I could also add this logger settings in the environment where the >>>>> problem is present. >>>>> >>>>> Other than that, I am not sure how to reproduce this issue in a local >>>>> setup. In the cluster where the metrics are missing I am navigating to the >>>>> certain taskmanager and try to access the metrics via the configured >>>>> prometheus port. When running a local flink (start-cluster.sh), I do not >>>>> have a certain url/port to access the taskmanager, right? >>>>> >>>>> I noticed that my config of the PrometheusReporter is different here. >>>>> I have: `metrics.reporter.prom.class: >>>>> org.apache.flink.metrics.prometheus.PrometheusReporter`. I will >>>>> investigate >>>>> if this is a problem. >>>>> >>>>> Unfortunately I can not provide my job at the moment. It >>>>> contains business logic and it is tightly coupled with our Kafka systems. >>>>> I >>>>> will check the option of creating a sample job to reproduce the problem. >>>>> >>>>> Best, Peter >>>>> >>>>> On Tue, May 3, 2022 at 12:48 PM Chesnay Schepler <ches...@apache.org> >>>>> wrote: >>>>> >>>>>> You'd help me out greatly if you could provide me with a sample job >>>>>> that runs into the issue. >>>>>> >>>>>> So far I wasn't able to reproduce the issue, >>>>>> but it should be clear that there is some given 3 separate reports, >>>>>> although it is strange that so far it was only reported for >>>>>> Prometheus. >>>>>> >>>>>> If one of you is able to reproduce the issue within a Test and is >>>>>> feeling adventurous, >>>>>> then you might be able to get more information by forwarding the >>>>>> java.util.logging >>>>>> to SLF4J. Below is some code to get you started. >>>>>> >>>>>> DebuggingTest.java: >>>>>> >>>>>> class DebuggingTest { >>>>>> >>>>>> static { >>>>>> LogManager.getLogManager().getLogger("").setLevel(Level.FINEST); >>>>>> SLF4JBridgeHandler.removeHandlersForRootLogger(); >>>>>> SLF4JBridgeHandler.install(); >>>>>> miniClusterExtension = >>>>>> new MiniClusterExtension( >>>>>> new MiniClusterResourceConfiguration.Builder() >>>>>> .setConfiguration(getConfiguration()) >>>>>> .setNumberSlotsPerTaskManager(1) >>>>>> .build()); >>>>>> } >>>>>> >>>>>> @RegisterExtension private static final MiniClusterExtension >>>>>> miniClusterExtension; >>>>>> >>>>>> private static Configuration getConfiguration() { >>>>>> final Configuration configuration = new Configuration(); >>>>>> >>>>>> configuration.setString( >>>>>> "metrics.reporter.prom.factory.class", >>>>>> PrometheusReporterFactory.class.getName()); >>>>>> configuration.setString("metrics.reporter.prom.port", >>>>>> "9200-9300"); >>>>>> >>>>>> return configuration; >>>>>> } >>>>>> >>>>>> @Test >>>>>> void runJob() throws Exception { >>>>>> <run job> >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> pom.xml: >>>>>> >>>>>> <dependency> >>>>>> <groupId>org.slf4j</groupId> >>>>>> <artifactId>jul-to-slf4j</artifactId> >>>>>> <version>1.7.32</version> >>>>>> </dependency> >>>>>> >>>>>> log4j2-test.properties: >>>>>> >>>>>> rootLogger.level = off >>>>>> rootLogger.appenderRef.test.ref = TestLogger >>>>>> logger.http.name = com.sun.net.httpserver >>>>>> logger.http.level = trace >>>>>> appender.testlogger.name = TestLogger >>>>>> appender.testlogger.type = CONSOLE >>>>>> appender.testlogger.target = SYSTEM_ERR >>>>>> appender.testlogger.layout.type = PatternLayout >>>>>> appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n >>>>>> >>>>>> On 03/05/2022 10:41, ChangZhuo Chen (陳昌倬) wrote: >>>>>> >>>>>> On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote: >>>>>> >>>>>> Hi! >>>>>> >>>>>> I also discovered problems with the PrometheusReporter on Flink 1.15.0, >>>>>> coming from 1.14.4. I already consulted the mailing >>>>>> list:https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc >>>>>> I have not found the underlying problem or a solution to it. >>>>>> >>>>>> Actually, after re-checking, I see the same log WARNINGS as >>>>>> ChangZhou described. >>>>>> >>>>>> As I described, it seems to be an issue with my job. If no job, or an >>>>>> example job runs on the taskmanager the basic metrics work just fine. >>>>>> Maybe >>>>>> ChangZhou can confirm this? >>>>>> >>>>>> @ChangZhou what's your job setup? I am running a streaming SQL job, but >>>>>> also using data streams API to create the streaming environment and from >>>>>> that the table environment and finally using a StatementSet to execute >>>>>> multiple SQL statements in one job. >>>>>> >>>>>> We are running a streaming application with low level API with >>>>>> Kubernetes operator FlinkDeployment. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >> >