Allright! Thanks! I tried to dig a bit deeper and see if there is any workaround for that problem. I tried to switch off reporting the Kafka metrics, but I was not quite successful. I am using the table api Kafka connector.
Do you have any suggestions on how to overcome this? Could you also provide the ticket number after creation? Thanks, Peter On Wed, May 4, 2022 at 1:22 PM Chesnay Schepler <ches...@apache.org> wrote: > Yes, that looks like a new bug in 1.15. > The migration to the new non-deprecated Kafka API in the > KafkaMetricMutableWrapper was done incorrectly. > > This should affect every job that uses the new kafka connector. > > Thank you for debugging the issue! > > I will create a ticket. > > On 04/05/2022 12:24, Peter Schrott wrote: > > As the stracktrace says, class cast exception occurs here: > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java#L37 > > I found the following metrics to be affected (might be more): > MetricName [name=version, group=app-info, description=Metric indicating > version, tags={client-id=producer-3}] > -> value: "6.2.2-ccs" (String) > > MetricName [name=start-time-ms, group=app-info, description=Metric > indicating start-time-ms, tags={client-id=producer-3}] > -> value: 1651654724987 (Long) > > MetricName [name=commit-id, group=app-info, description=Metric indicating > commit-id, tags={client-id=producer-3}] > -> value: "2ceb5dc7891720b7" (String) > > Problematic code part seems to be introduced with "Bump Kafka version to > 2.8": > > https://github.com/apache/flink/commit/b367407d08b6dd69a52886a1c6232a9d8ee2ec0a#diff-bb47c4c2d77fd57da49a6cf5227d43ba352c2ea916776bdae92a7436dea50068 > > Is this a potential bug introduced in 1.15.0? > > Best, Peter > > On Wed, May 4, 2022 at 9:58 AM Peter Schrott <pe...@bluerootlabs.io> > wrote: > >> Sorry for the spamming! >> >> Just after jumping into the debug-session I noticed that there are indeed >> exceptions thrown when fetching the metrics on port 9200: >> >> 13657 INFO [ScalaTest-run] com.sun.net.httpserver - HttpServer created >> http 0.0.0.0/0.0.0.0:920013658 INFO [ScalaTest-run] com.sun.net.httpserver >> - context created: /13658 INFO [ScalaTest-run] com.sun.net.httpserver - >> context created: /metrics13659 INFO [ScalaTest-run] >> o.a.f.m.p.PrometheusReporter - Started PrometheusReporter HTTP server on >> port 9200.13745 DEBUG [prometheus-http-1-1] com.sun.net.httpserver - GET / >> HTTP/1.1 [200 OK] ()14028 DEBUG [prometheus-http-1-2] >> com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] ()14998 DEBUG >> [prometheus-http-1-3] com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] >> ()15580 DEBUG [prometheus-http-1-4] com.sun.net.httpserver - GET / HTTP/1.1 >> [200 OK] ()16022 DEBUG [prometheus-http-1-5] com.sun.net.httpserver - GET >> / HTTP/1.1 [200 OK] ()16458 DEBUG [prometheus-http-1-1] >> com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] ()16885 DEBUG >> [prometheus-http-1-2] com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] >> ()17381 DEBUG [prometheus-http-1-3] com.sun.net.httpserver - GET / HTTP/1.1 >> [200 OK] ()17809 DEBUG [prometheus-http-1-4] com.sun.net.httpserver - GET >> / HTTP/1.1 [200 OK] ()18259 DEBUG [prometheus-http-1-5] >> com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] ()18695 DEBUG >> [prometheus-http-1-1] com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] >> ()19159 DEBUG [prometheus-http-1-2] com.sun.net.httpserver - GET / HTTP/1.1 >> [200 OK] ()19758 DEBUG [prometheus-http-1-3] com.sun.net.httpserver - GET >> / HTTP/1.1 [200 OK] ()20112 DEBUG [prometheus-http-1-4] >> com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] ()20544 DEBUG >> [prometheus-http-1-5] com.sun.net.httpserver - GET / HTTP/1.1 [200 OK] >> ()20989 DEBUG [prometheus-http-1-1] com.sun.net.httpserver - GET / HTTP/1.1 >> [200 OK] ()21419 DEBUG [prometheus-http-1-2] o.a.f.m.p.PrometheusReporter >> - Invalid type for Gauge >> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$LatestCompletedCheckpointExternalPathGauge@3fae55e7: >> java.lang.String, only number types and booleans are supported by this >> reporter.21421 DEBUG [prometheus-http-1-2] com.sun.net.httpserver - GET / >> HTTP/1.1 [200 OK] ()21847 DEBUG [prometheus-http-1-3] >> o.a.f.m.p.PrometheusReporter - Invalid type for Gauge >> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics$$Lambda$4076/388206242@78846648: >> java.lang.String, only number types and booleans are supported by this >> reporter.21851 DEBUG [prometheus-http-1-3] com.sun.net.httpserver - >> ServerImpl.Exchange (2)java.lang.ClassCastException: java.lang.Long cannot >> be cast to java.lang.Double at >> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:37) >> at >> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:27) >> at >> org.apache.flink.metrics.prometheus.AbstractPrometheusReporter$2.get(AbstractPrometheusReporter.java:262) >> at io.prometheus.client.Gauge.collect(Gauge.java:317) at >> io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.findNextElement(CollectorRegistry.java:190) >> at >> io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:223) >> at >> io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:144) >> at >> io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:22) >> at >> io.prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:60) >> at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) at >> sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83) at >> com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82) at >> sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675) >> at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) at >> sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647) at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748)21851 TRACE [prometheus-http-1-3] >> com.sun.net.httpserver - Closing connection: >> java.nio.channels.SocketChannel[connected local=/127.0.0.1:9200 >> remote=/127.0.0.1:50508] >> >> >> For my defence: This jul - slf4j - logback setup is really nasty :O >> >> Best, Peter >> >> >> >> On Wed, May 4, 2022 at 9:47 AM Peter Schrott <pe...@bluerootlabs.io> >> wrote: >> >>> Hi Chesnay, >>> >>> Thanks for that support! Just for compilation: Running the "Problem-Job" >>> locally as test in Intellij (as Chesney suggested above) reproduces the >>> described problem: >>> >>> ➜ ~ curl localhost:9200curl: (52) Empty reply from server >>> >>> Doing the same with other jobs metrics are available on localhost:9200. >>> >>> One other thing I noticed yesterday in the cluster is that job/task >>> specific metrics are available for a very short time after the job is >>> started (for around a few seconds). E.g: >>> >>> # HELP flink_taskmanager_job_task_backPressuredTimeMsPerSecond >>> backPressuredTimeMsPerSecond (scope: taskmanager_job_task) >>> >>> After all tasks are "green" in the webui, the "empty reply from server" >>> is back. >>> >>> 1) >>> I changed the prometheus config in my cluster, but as you saied, it does >>> not have any impact. >>> >>> 2) >>> For the logging in a test scenario, I also had to add the following >>> lines in my test class: >>> >>> SLF4JBridgeHandler.removeHandlersForRootLogger() >>> SLF4JBridgeHandler.install() >>> >>> (source: >>> https://www.slf4j.org/api/org/slf4j/bridge/SLF4JBridgeHandler.html) >>> As well as resetting log levels for jul in my logback.xml: >>> >>> <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"> >>> <resetJUL>true</resetJUL></contextListener> >>> >>> This infos just for completeness, if someone else stumbles upon. >>> >>> I set the following loggers to lvl TRACE: >>> >>> <logger name="com.sun.net.httpserver" level="TRACE" additive="false"> >>> <appender-ref ref="ASYNC_FILE" /></logger><logger >>> name="org.apache.flink.metrics.prometheus" level="TRACE" additive="false"> >>> <appender-ref ref="ASYNC_FILE" /></logger><logger >>> name="io.prometheus.client" level="TRACE" additive="false"> >>> <appender-ref ref="ASYNC_FILE" /></logger> >>> >>> When running the job in a local test as suggested above I get the >>> following log messages: >>> >>> 12701 INFO [ScalaTest-run] com.sun.net.httpserver - HttpServer created >>> http 0.0.0.0/0.0.0.0:920012703 INFO [ScalaTest-run] com.sun.net.httpserver >>> - context created: /12703 INFO [ScalaTest-run] com.sun.net.httpserver - >>> context created: /metrics12704 INFO [ScalaTest-run] >>> o.a.f.m.p.PrometheusReporter - Started PrometheusReporter HTTP server on >>> port 9200. >>> >>> >>> 3) >>> I have not tried to reproduce in a local cluster yet, as the issue is >>> also reproducible in the test environment. But thanks for the hint - could >>> be very helpful! >>> >>> __ >>> >>> From the observations it does not seem like there is a problem with the >>> http server itself. I am just making assumptions: It feels like there is a >>> problem with reading and providing the metrics. As the issue >>> reproducible in the local setup I have the comfy option to debug in >>> Intellij now - I'll spend my day with this if no other hints or ideas arise. >>> >>> Thanks & Best, Peter >>> >>> On Tue, May 3, 2022 at 4:01 PM Chesnay Schepler <ches...@apache.org> >>> wrote: >>> >>>> > I noticed that my config of the PrometheusReporter is different here. >>>> I have: `metrics.reporter.prom.class: >>>> org.apache.flink.metrics.prometheus.PrometheusReporter`. I will investigate >>>> if this is a problem. >>>> >>>> That's not a problem. >>>> >>>> > Which trace logs are interesting? >>>> >>>> The logging config I provided should highlight the relevant bits >>>> (com.sun.net.httpserver). >>>> At least in my local tests this is where any interesting things were >>>> logged. >>>> Note that this part of the code uses java.util.logging, not slf4j/log4j. >>>> >>>> > When running a local flink (start-cluster.sh), I do not have a >>>> certain url/port to access the taskmanager, right? >>>> >>>> If you configure a port range it should be as simple as curl >>>> localhost:<port>. >>>> You can find the used port in the taskmanager logs. >>>> Or just try the first N ports in the range ;) >>>> >>>> On 03/05/2022 14:11, Peter Schrott wrote: >>>> >>>> Hi Chesnay, >>>> >>>> Thanks for the code snipped. Which trace logs are interesting? Of " >>>> org.apache.flink.metrics.prometheus.PrometheusReporter"? >>>> I could also add this logger settings in the environment where the >>>> problem is present. >>>> >>>> Other than that, I am not sure how to reproduce this issue in a local >>>> setup. In the cluster where the metrics are missing I am navigating to the >>>> certain taskmanager and try to access the metrics via the configured >>>> prometheus port. When running a local flink (start-cluster.sh), I do not >>>> have a certain url/port to access the taskmanager, right? >>>> >>>> I noticed that my config of the PrometheusReporter is different here. I >>>> have: `metrics.reporter.prom.class: >>>> org.apache.flink.metrics.prometheus.PrometheusReporter`. I will investigate >>>> if this is a problem. >>>> >>>> Unfortunately I can not provide my job at the moment. It >>>> contains business logic and it is tightly coupled with our Kafka systems. I >>>> will check the option of creating a sample job to reproduce the problem. >>>> >>>> Best, Peter >>>> >>>> On Tue, May 3, 2022 at 12:48 PM Chesnay Schepler <ches...@apache.org> >>>> wrote: >>>> >>>>> You'd help me out greatly if you could provide me with a sample job >>>>> that runs into the issue. >>>>> >>>>> So far I wasn't able to reproduce the issue, >>>>> but it should be clear that there is some given 3 separate reports, >>>>> although it is strange that so far it was only reported for Prometheus. >>>>> >>>>> If one of you is able to reproduce the issue within a Test and is >>>>> feeling adventurous, >>>>> then you might be able to get more information by forwarding the >>>>> java.util.logging >>>>> to SLF4J. Below is some code to get you started. >>>>> >>>>> DebuggingTest.java: >>>>> >>>>> class DebuggingTest { >>>>> >>>>> static { >>>>> LogManager.getLogManager().getLogger("").setLevel(Level.FINEST); >>>>> SLF4JBridgeHandler.removeHandlersForRootLogger(); >>>>> SLF4JBridgeHandler.install(); >>>>> miniClusterExtension = >>>>> new MiniClusterExtension( >>>>> new MiniClusterResourceConfiguration.Builder() >>>>> .setConfiguration(getConfiguration()) >>>>> .setNumberSlotsPerTaskManager(1) >>>>> .build()); >>>>> } >>>>> >>>>> @RegisterExtension private static final MiniClusterExtension >>>>> miniClusterExtension; >>>>> >>>>> private static Configuration getConfiguration() { >>>>> final Configuration configuration = new Configuration(); >>>>> >>>>> configuration.setString( >>>>> "metrics.reporter.prom.factory.class", >>>>> PrometheusReporterFactory.class.getName()); >>>>> configuration.setString("metrics.reporter.prom.port", >>>>> "9200-9300"); >>>>> >>>>> return configuration; >>>>> } >>>>> >>>>> @Test >>>>> void runJob() throws Exception { >>>>> <run job> >>>>> } >>>>> } >>>>> >>>>> >>>>> pom.xml: >>>>> >>>>> <dependency> >>>>> <groupId>org.slf4j</groupId> >>>>> <artifactId>jul-to-slf4j</artifactId> >>>>> <version>1.7.32</version> >>>>> </dependency> >>>>> >>>>> log4j2-test.properties: >>>>> >>>>> rootLogger.level = off >>>>> rootLogger.appenderRef.test.ref = TestLogger >>>>> logger.http.name = com.sun.net.httpserver >>>>> logger.http.level = trace >>>>> appender.testlogger.name = TestLogger >>>>> appender.testlogger.type = CONSOLE >>>>> appender.testlogger.target = SYSTEM_ERR >>>>> appender.testlogger.layout.type = PatternLayout >>>>> appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n >>>>> >>>>> On 03/05/2022 10:41, ChangZhuo Chen (陳昌倬) wrote: >>>>> >>>>> On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote: >>>>> >>>>> Hi! >>>>> >>>>> I also discovered problems with the PrometheusReporter on Flink 1.15.0, >>>>> coming from 1.14.4. I already consulted the mailing >>>>> list:https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc >>>>> I have not found the underlying problem or a solution to it. >>>>> >>>>> Actually, after re-checking, I see the same log WARNINGS as >>>>> ChangZhou described. >>>>> >>>>> As I described, it seems to be an issue with my job. If no job, or an >>>>> example job runs on the taskmanager the basic metrics work just fine. >>>>> Maybe >>>>> ChangZhou can confirm this? >>>>> >>>>> @ChangZhou what's your job setup? I am running a streaming SQL job, but >>>>> also using data streams API to create the streaming environment and from >>>>> that the table environment and finally using a StatementSet to execute >>>>> multiple SQL statements in one job. >>>>> >>>>> We are running a streaming application with low level API with >>>>> Kubernetes operator FlinkDeployment. >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >