As the stracktrace says, class cast exception occurs here:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java#L37

I found the following metrics to be affected (might be more):
MetricName [name=version, group=app-info, description=Metric indicating
version, tags={client-id=producer-3}]
-> value: "6.2.2-ccs" (String)

MetricName [name=start-time-ms, group=app-info, description=Metric
indicating start-time-ms, tags={client-id=producer-3}]
-> value: 1651654724987 (Long)

MetricName [name=commit-id, group=app-info, description=Metric indicating
commit-id, tags={client-id=producer-3}]
-> value: "2ceb5dc7891720b7" (String)

Problematic code part seems to be introduced with "Bump Kafka version to
2.8":
https://github.com/apache/flink/commit/b367407d08b6dd69a52886a1c6232a9d8ee2ec0a#diff-bb47c4c2d77fd57da49a6cf5227d43ba352c2ea916776bdae92a7436dea50068

Is this a potential bug introduced in 1.15.0?

Best, Peter

On Wed, May 4, 2022 at 9:58 AM Peter Schrott <pe...@bluerootlabs.io> wrote:

> Sorry for the spamming!
>
> Just after jumping into the debug-session I noticed that there are indeed
> exceptions thrown when fetching the metrics on port 9200:
>
> 13657 INFO  [ScalaTest-run] com.sun.net.httpserver  - HttpServer created http 
> 0.0.0.0/0.0.0.0:9200
> 13658 INFO  [ScalaTest-run] com.sun.net.httpserver  - context created: /
> 13658 INFO  [ScalaTest-run] com.sun.net.httpserver  - context created: 
> /metrics
> 13659 INFO  [ScalaTest-run] o.a.f.m.p.PrometheusReporter  - Started 
> PrometheusReporter HTTP server on port 9200.
> 13745 DEBUG [prometheus-http-1-1] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 14028 DEBUG [prometheus-http-1-2] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 14998 DEBUG [prometheus-http-1-3] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 15580 DEBUG [prometheus-http-1-4] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 16022 DEBUG [prometheus-http-1-5] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 16458 DEBUG [prometheus-http-1-1] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 16885 DEBUG [prometheus-http-1-2] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 17381 DEBUG [prometheus-http-1-3] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 17809 DEBUG [prometheus-http-1-4] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 18259 DEBUG [prometheus-http-1-5] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 18695 DEBUG [prometheus-http-1-1] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 19159 DEBUG [prometheus-http-1-2] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 19758 DEBUG [prometheus-http-1-3] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 20112 DEBUG [prometheus-http-1-4] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 20544 DEBUG [prometheus-http-1-5] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 20989 DEBUG [prometheus-http-1-1] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 21419 DEBUG [prometheus-http-1-2] o.a.f.m.p.PrometheusReporter  - Invalid 
> type for Gauge 
> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$LatestCompletedCheckpointExternalPathGauge@3fae55e7:
>  java.lang.String, only number types and booleans are supported by this 
> reporter.
> 21421 DEBUG [prometheus-http-1-2] com.sun.net.httpserver  - GET / HTTP/1.1 
> [200  OK] ()
> 21847 DEBUG [prometheus-http-1-3] o.a.f.m.p.PrometheusReporter  - Invalid 
> type for Gauge 
> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics$$Lambda$4076/388206242@78846648:
>  java.lang.String, only number types and booleans are supported by this 
> reporter.
> 21851 DEBUG [prometheus-http-1-3] com.sun.net.httpserver  - 
> ServerImpl.Exchange (2)
> java.lang.ClassCastException: java.lang.Long cannot be cast to 
> java.lang.Double
>    at 
> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:37)
>    at 
> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:27)
>    at 
> org.apache.flink.metrics.prometheus.AbstractPrometheusReporter$2.get(AbstractPrometheusReporter.java:262)
>    at io.prometheus.client.Gauge.collect(Gauge.java:317)
>    at 
> io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.findNextElement(CollectorRegistry.java:190)
>    at 
> io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:223)
>    at 
> io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:144)
>    at 
> io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:22)
>    at 
> io.prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:60)
>    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
>    at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
>    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
>    at 
> sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675)
>    at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
>    at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647)
>    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>    at java.lang.Thread.run(Thread.java:748)
> 21851 TRACE [prometheus-http-1-3] com.sun.net.httpserver  - Closing 
> connection: java.nio.channels.SocketChannel[connected local=/127.0.0.1:9200 
> remote=/127.0.0.1:50508]
>
>
> For my defence: This jul - slf4j - logback setup is really nasty :O
>
> Best, Peter
>
>
>
> On Wed, May 4, 2022 at 9:47 AM Peter Schrott <pe...@bluerootlabs.io>
> wrote:
>
>> Hi Chesnay,
>>
>> Thanks for that support! Just for compilation: Running the "Problem-Job"
>> locally as test in Intellij (as Chesney suggested above) reproduces the
>> described problem:
>>
>> ➜  ~ curl localhost:9200curl: (52) Empty reply from server
>>
>> Doing the same with other jobs metrics are available on localhost:9200.
>>
>> One other thing I noticed yesterday in the cluster is that job/task
>> specific metrics are available for a very short time after the job is
>> started (for around a few seconds). E.g:
>>
>> # HELP flink_taskmanager_job_task_backPressuredTimeMsPerSecond 
>> backPressuredTimeMsPerSecond (scope: taskmanager_job_task)
>>
>> After all tasks are "green" in the webui, the "empty reply from server"
>> is back.
>>
>> 1)
>> I changed the prometheus config in my cluster, but as you saied, it does
>> not have any impact.
>>
>> 2)
>> For the logging in a test scenario, I also had to add the following lines
>> in my test class:
>>
>> SLF4JBridgeHandler.removeHandlersForRootLogger()
>> SLF4JBridgeHandler.install()
>>
>> (source:
>> https://www.slf4j.org/api/org/slf4j/bridge/SLF4JBridgeHandler.html)
>>  As well as resetting log levels for jul in my logback.xml:
>>
>> <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
>>     <resetJUL>true</resetJUL>
>> </contextListener>
>>
>> This infos just for completeness, if someone else stumbles upon.
>>
>> I set the following loggers to lvl TRACE:
>>
>> <logger name="com.sun.net.httpserver" level="TRACE" additive="false">
>>     <appender-ref ref="ASYNC_FILE" />
>> </logger>
>>
>> <logger name="org.apache.flink.metrics.prometheus" level="TRACE" 
>> additive="false">
>>     <appender-ref ref="ASYNC_FILE" />
>> </logger>
>>
>> <logger name="io.prometheus.client" level="TRACE" additive="false">
>>     <appender-ref ref="ASYNC_FILE" />
>> </logger>
>>
>> When running the job in a local test as suggested above I get the
>> following log messages:
>>
>> 12701 INFO  [ScalaTest-run] com.sun.net.httpserver  - HttpServer created 
>> http 0.0.0.0/0.0.0.0:9200
>> 12703 INFO  [ScalaTest-run] com.sun.net.httpserver  - context created: /
>> 12703 INFO  [ScalaTest-run] com.sun.net.httpserver  - context created: 
>> /metrics
>> 12704 INFO  [ScalaTest-run] o.a.f.m.p.PrometheusReporter  - Started 
>> PrometheusReporter HTTP server on port 9200.
>>
>>
>> 3)
>> I have not tried to reproduce in a local cluster yet, as the issue is
>> also reproducible in the test environment. But thanks for the hint - could
>> be very helpful!
>>
>>  __
>>
>> From the observations it does not seem like there is a problem with the
>> http server itself. I am just making assumptions: It feels like there is a
>> problem with reading and providing the metrics. As the issue
>> reproducible in the local setup I have the comfy option to debug in
>> Intellij now - I'll spend my day with this if no other hints or ideas arise.
>>
>> Thanks & Best, Peter
>>
>> On Tue, May 3, 2022 at 4:01 PM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> > I noticed that my config of the PrometheusReporter is different here.
>>> I have: `metrics.reporter.prom.class:
>>> org.apache.flink.metrics.prometheus.PrometheusReporter`. I will investigate
>>> if this is a problem.
>>>
>>> That's not a problem.
>>>
>>> > Which trace logs are interesting?
>>>
>>> The logging config I provided should highlight the relevant bits
>>> (com.sun.net.httpserver).
>>> At least in my local tests this is where any interesting things were
>>> logged.
>>> Note that this part of the code uses java.util.logging, not slf4j/log4j.
>>>
>>> > When running a local flink (start-cluster.sh), I do not have a certain
>>> url/port to access the taskmanager, right?
>>>
>>> If you configure a port range it should be as simple as curl
>>> localhost:<port>.
>>> You can find the used port in the taskmanager logs.
>>> Or just try the first N ports in the range ;)
>>>
>>> On 03/05/2022 14:11, Peter Schrott wrote:
>>>
>>> Hi Chesnay,
>>>
>>> Thanks for the code snipped. Which trace logs are interesting? Of "
>>> org.apache.flink.metrics.prometheus.PrometheusReporter"?
>>> I could also add this logger settings in the environment where the
>>> problem is present.
>>>
>>> Other than that, I am not sure how to reproduce this issue in a local
>>> setup. In the cluster where the metrics are missing I am navigating to the
>>> certain taskmanager and try to access the metrics via the configured
>>> prometheus port. When running a local flink (start-cluster.sh), I do not
>>> have a certain url/port to access the taskmanager, right?
>>>
>>> I noticed that my config of the PrometheusReporter is different here. I
>>> have: `metrics.reporter.prom.class:
>>> org.apache.flink.metrics.prometheus.PrometheusReporter`. I will investigate
>>> if this is a problem.
>>>
>>> Unfortunately I can not provide my job at the moment. It
>>> contains business logic and it is tightly coupled with our Kafka systems. I
>>> will check the option of creating a sample job to reproduce the problem.
>>>
>>> Best, Peter
>>>
>>> On Tue, May 3, 2022 at 12:48 PM Chesnay Schepler <ches...@apache.org>
>>> wrote:
>>>
>>>> You'd help me out greatly if you could provide me with a sample job
>>>> that runs into the issue.
>>>>
>>>> So far I wasn't able to reproduce the issue,
>>>> but it should be clear that there is some given 3 separate reports,
>>>> although it is strange that so far it was only reported for Prometheus.
>>>>
>>>> If one of you is able to reproduce the issue within a Test and is
>>>> feeling adventurous,
>>>> then you might be able to get more information by forwarding the
>>>> java.util.logging
>>>> to SLF4J. Below is some code to get you started.
>>>>
>>>> DebuggingTest.java:
>>>>
>>>> class DebuggingTest {
>>>>
>>>>     static {
>>>>         LogManager.getLogManager().getLogger("").setLevel(Level.FINEST);
>>>>         SLF4JBridgeHandler.removeHandlersForRootLogger();
>>>>         SLF4JBridgeHandler.install();
>>>>         miniClusterExtension =
>>>>                 new MiniClusterExtension(
>>>>                         new MiniClusterResourceConfiguration.Builder()
>>>>                                 .setConfiguration(getConfiguration())
>>>>                                 .setNumberSlotsPerTaskManager(1)
>>>>                                 .build());
>>>>     }
>>>>
>>>>     @RegisterExtension private static final MiniClusterExtension 
>>>> miniClusterExtension;
>>>>
>>>>     private static Configuration getConfiguration() {
>>>>         final Configuration configuration = new Configuration();
>>>>
>>>>         configuration.setString(
>>>>                 "metrics.reporter.prom.factory.class", 
>>>> PrometheusReporterFactory.class.getName());
>>>>         configuration.setString("metrics.reporter.prom.port", "9200-9300");
>>>>
>>>>         return configuration;
>>>>     }
>>>>
>>>>     @Test
>>>>     void runJob() throws Exception {
>>>>         <run job>
>>>>     }
>>>> }
>>>>
>>>>
>>>> pom.xml:
>>>>
>>>> <dependency>
>>>>    <groupId>org.slf4j</groupId>
>>>>    <artifactId>jul-to-slf4j</artifactId>
>>>>    <version>1.7.32</version>
>>>> </dependency>
>>>>
>>>> log4j2-test.properties:
>>>>
>>>> rootLogger.level = off
>>>> rootLogger.appenderRef.test.ref = TestLogger
>>>> logger.http.name = com.sun.net.httpserver
>>>> logger.http.level = trace
>>>> appender.testlogger.name = TestLogger
>>>> appender.testlogger.type = CONSOLE
>>>> appender.testlogger.target = SYSTEM_ERR
>>>> appender.testlogger.layout.type = PatternLayout
>>>> appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
>>>>
>>>> On 03/05/2022 10:41, ChangZhuo Chen (陳昌倬) wrote:
>>>>
>>>> On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote:
>>>>
>>>> Hi!
>>>>
>>>> I also discovered problems with the PrometheusReporter on Flink 1.15.0,
>>>> coming from 1.14.4. I already consulted the mailing 
>>>> list:https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc
>>>> I have not found the underlying problem or a solution to it.
>>>>
>>>> Actually, after re-checking, I see the same log WARNINGS as
>>>> ChangZhou described.
>>>>
>>>> As I described, it seems to be an issue with my job. If no job, or an
>>>> example job runs on the taskmanager the basic metrics work just fine. Maybe
>>>> ChangZhou can confirm this?
>>>>
>>>> @ChangZhou what's your job setup? I am running a streaming SQL job, but
>>>> also using data streams API to create the streaming environment and from
>>>> that the table environment and finally using a StatementSet to execute
>>>> multiple SQL statements in one job.
>>>>
>>>> We are running a streaming application with low level API with
>>>> Kubernetes operator FlinkDeployment.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>

Reply via email to