Disabling the kafka metrics _should_ work.

Alternatively you could use the new generic feature to filter metrics:

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/metric_reporters/#filter-excludes

metrics.reporter.<reportername>.filter.excludes: *KafkaProducer*;*KafkaConsumer*

This should disable all kafka metrics. (You could also drill down and exclude specific problematic metrics; see the docs.)

On 04/05/2022 13:36, Peter Schrott wrote:
Allright! Thanks!

I tried to dig a bit deeper and see if there is any workaround for that problem. I tried to switch off reporting the Kafka metrics, but I was not quite successful. I am using the table api Kafka connector.

Do you have any suggestions on how to overcome this?

Could you also provide the ticket number after creation?

Thanks, Peter

On Wed, May 4, 2022 at 1:22 PM Chesnay Schepler <ches...@apache.org> wrote:

    Yes, that looks like a new bug in 1.15.
    The migration to the new non-deprecated Kafka API in the
    KafkaMetricMutableWrapper was done incorrectly.

    This should affect every job that uses the new kafka connector.

    Thank you for debugging the issue!

    I will create a ticket.

    On 04/05/2022 12:24, Peter Schrott wrote:
    As the stracktrace says, class cast exception occurs here:
    
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java#L37

    I found the following metrics to be affected (might be more):
    MetricName [name=version, group=app-info, description=Metric
    indicating version, tags={client-id=producer-3}]
    -> value: "6.2.2-ccs" (String)

    MetricName [name=start-time-ms, group=app-info,
    description=Metric indicating start-time-ms,
    tags={client-id=producer-3}]
    -> value: 1651654724987 (Long)

    MetricName [name=commit-id, group=app-info, description=Metric
    indicating commit-id, tags={client-id=producer-3}]
    -> value: "2ceb5dc7891720b7" (String)

    Problematic code part seems to be introduced with "Bump Kafka
    version to 2.8":
    
https://github.com/apache/flink/commit/b367407d08b6dd69a52886a1c6232a9d8ee2ec0a#diff-bb47c4c2d77fd57da49a6cf5227d43ba352c2ea916776bdae92a7436dea50068

    Is this a potential bug introduced in 1.15.0?

    Best, Peter

    On Wed, May 4, 2022 at 9:58 AM Peter Schrott
    <pe...@bluerootlabs.io> wrote:

        Sorry for the spamming!

        Just after jumping into the debug-session I noticed that
        there are indeed exceptions thrown when fetching the metrics
        on port 9200:

        13657 INFO   [ScalaTest-run]com.sun.net.httpserver   - HttpServer 
created http0.0.0.0/0.0.0.0:9200 13658 INFO   
[ScalaTest-run]com.sun.net.httpserver   - context created: /
        13658 INFO   [ScalaTest-run]com.sun.net.httpserver   - context created: 
/metrics
        13659 INFO   [ScalaTest-run]o.a.f.m.p.PrometheusReporter   - Started 
PrometheusReporter HTTP server on port9200.
        13745 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        14028 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        14998 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        15580 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        16022 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        16458 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        16885 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        17381 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        17809 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        18259 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        18695 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        19159 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        19758 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        20112 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        20544 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        20989 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        21419 DEBUG  [prometheus-http-1-2]o.a.f.m.p.PrometheusReporter   - 
Invalid type for 
Gaugeorg.apache.flink.runtime.checkpoint.CheckpointStatsTracker$LatestCompletedCheckpointExternalPathGauge@3fae55e7:java.lang.String,
 only number types and booleans are supported by this reporter.
        21421 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
        21847 DEBUG  [prometheus-http-1-3]o.a.f.m.p.PrometheusReporter   - 
Invalid type for 
Gaugeorg.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics$$Lambda$4076/388206242@78846648:java.lang.String,
 only number types and booleans are supported by this reporter.
        21851 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   
-ServerImpl.Exchange  (2)
        java.lang.ClassCastException:java.lang.Long  cannot be cast 
tojava.lang.Double at
        
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:37)
        at
        
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:27)
        at
        
org.apache.flink.metrics.prometheus.AbstractPrometheusReporter$2.get(AbstractPrometheusReporter.java:262)
        at io.prometheus.client.Gauge.collect(Gauge.java:317) at
        
io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.findNextElement(CollectorRegistry.java:190)
        at
        
io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:223)
        at
        
io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:144)
        at
        
io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:22)
        at
        
io.prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:60)
        at
        com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
        at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
        at
        com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
        at
        
sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675)
        at
        com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
        at
        sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647)
        at
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748) 21851  TRACE 
[prometheus-http-1-3]com.sun.net.httpserver   - Closing 
connection:java.nio.channels.SocketChannel[connected local=/127.0.0.1:9200  
remote=/127.0.0.1:50508]


        For my defence: This jul - slf4j - logback setup is really
        nasty :O

        Best, Peter



        On Wed, May 4, 2022 at 9:47 AM Peter Schrott
        <pe...@bluerootlabs.io> wrote:

            Hi Chesnay,

            Thanks for that support! Just for compilation: Running
            the "Problem-Job" locally as test in Intellij (as Chesney
            suggested above) reproduces the described problem:

            ➜  ~ curl localhost:9200
curl: (52) Empty reply from server
            Doing the same with other jobs metrics are available on
            localhost:9200.

            One other thing I noticed yesterday in the cluster is
            that job/task specific metrics are available for a very
            short time after the job is started (for around a
            few seconds). E.g:

            # HELP flink_taskmanager_job_task_backPressuredTimeMsPerSecond 
backPressuredTimeMsPerSecond (scope: taskmanager_job_task)

            After all tasks are "green" in the webui, the "empty
            reply from server" is back.

            1)
            I changed the prometheus config in my cluster, but as you
            saied, it does not have any impact.

            2)
            For the logging in a test scenario, I also had to add the
            following lines in my test class:

            SLF4JBridgeHandler.removeHandlersForRootLogger()
            SLF4JBridgeHandler.install()

            (source:
            https://www.slf4j.org/api/org/slf4j/bridge/SLF4JBridgeHandler.html)
             As well as resetting log levels for jul in my logback.xml:

            <contextListener
            class="ch.qos.logback.classic.jul.LevelChangePropagator">
            <resetJUL>true</resetJUL> </contextListener>

            This infos just for completeness, if someone else
            stumbles upon.

            I set the following loggers to lvl TRACE:

            <logger name="com.sun.net.httpserver" level="TRACE"
            additive="false"> <appender-ref ref="ASYNC_FILE" />
            </logger> <logger
            name="org.apache.flink.metrics.prometheus" level="TRACE"
            additive="false"> <appender-ref ref="ASYNC_FILE" />
            </logger> <logger name="io.prometheus.client"
            level="TRACE" additive="false"> <appender-ref
            ref="ASYNC_FILE" /> </logger>

            When running the job in a local test as suggested above I
            get the following log messages:

            12701 INFO   [ScalaTest-run]com.sun.net.httpserver   - HttpServer 
created http0.0.0.0/0.0.0.0:9200 12703 INFO   
[ScalaTest-run]com.sun.net.httpserver   - context created: /
            12703 INFO   [ScalaTest-run]com.sun.net.httpserver   - context 
created: /metrics
            12704 INFO   [ScalaTest-run]o.a.f.m.p.PrometheusReporter   - 
Started PrometheusReporter HTTP server on port9200.


            3)
            I have not tried to reproduce in a local cluster yet, as
            the issue is also reproducible in the test environment.
            But thanks for the hint - could be very helpful!

             __

            From the observations it does not seem like there is a
            problem with the http server itself. I am just making
            assumptions: It feels like there is a problem with
            reading and providing the metrics. As the issue
            reproducible in the local setup I have the comfy option
            to debug in Intellij now - I'll spend my day with this if
            no other hints or ideas arise.

            Thanks & Best, Peter

            On Tue, May 3, 2022 at 4:01 PM Chesnay Schepler
            <ches...@apache.org> wrote:

                > I noticed that my config of the PrometheusReporter
                is different here. I have:
                `metrics.reporter.prom.class:
                org.apache.flink.metrics.prometheus.PrometheusReporter`.
                I will investigate if this is a problem.

                That's not a problem.

                > Which trace logs are interesting?

                The logging config I provided should highlight the
                relevant bits (com.sun.net.httpserver).
                At least in my local tests this is where any
                interesting things were logged.
                Note that this part of the code uses
                java.util.logging, not slf4j/log4j.

                > When running a local flink (start-cluster.sh), I do
                not have a certain url/port to access the
                taskmanager, right?

                If you configure a port range it should be as simple
                as curl localhost:<port>.
                You can find the used port in the taskmanager logs.
                Or just try the first N ports in the range ;)

                On 03/05/2022 14:11, Peter Schrott wrote:
                Hi Chesnay,

                Thanks for the code snipped. Which trace logs are
                interesting? Of
                "org.apache.flink.metrics.prometheus.PrometheusReporter"?
                I could also add this logger settings in the
                environment where the problem is present.

                Other than that, I am not sure how to reproduce this
                issue in a local setup. In the cluster where the
                metrics are missing I am navigating to the certain
                taskmanager and try to access the metrics via the
                configured prometheus port. When running a local
                flink (start-cluster.sh), I do not have a certain
                url/port to access the taskmanager, right?

                I noticed that my config of the PrometheusReporter
                is different here. I have:
                `metrics.reporter.prom.class:
                org.apache.flink.metrics.prometheus.PrometheusReporter`.
                I will investigate if this is a problem.

                Unfortunately I can not provide my job at the
                moment. It contains business logic and it is tightly
                coupled with our Kafka systems. I will check the
                option of creating a sample job to reproduce the
                problem.

                Best, Peter

                On Tue, May 3, 2022 at 12:48 PM Chesnay Schepler
                <ches...@apache.org> wrote:

                    You'd help me out greatly if you could provide
                    me with a sample job that runs into the issue.

                    So far I wasn't able to reproduce the issue,
                    but it should be clear that there is some given
                    3 separate reports,
                    although it is strange that so far it was only
                    reported for Prometheus.

                    If one of you is able to reproduce the issue
                    within a Test and is feeling adventurous,
                    then you might be able to get more information
                    by forwarding the java.util.logging
                    to SLF4J. Below is some code to get you started.

                    DebuggingTest.java:

                    class DebuggingTest {

                         static {
                             
LogManager.getLogManager().getLogger("").setLevel(Level.FINEST);
                             SLF4JBridgeHandler.removeHandlersForRootLogger();
                             SLF4JBridgeHandler.install();
                             miniClusterExtension =
                                     new MiniClusterExtension(
                                             new 
MiniClusterResourceConfiguration.Builder()
                                                     
.setConfiguration(getConfiguration())
                                                     
.setNumberSlotsPerTaskManager(1)
                                                     .build());
                         }

                         @RegisterExtension private static final 
MiniClusterExtension miniClusterExtension;

                         private static Configuration getConfiguration() {
                             final Configuration configuration = new 
Configuration();

                             configuration.setString(
                                     "metrics.reporter.prom.factory.class", 
PrometheusReporterFactory.class.getName());
                             configuration.setString("metrics.reporter.prom.port", 
"9200-9300");

                             return configuration;
                         }

                         @Test
                         void runJob() throws Exception {
                             <run job>
                         }
                    }


                    pom.xml:

                    <dependency>
                        <groupId>org.slf4j</groupId>
                        <artifactId>jul-to-slf4j</artifactId>
                        <version>1.7.32</version>
                    </dependency>
                    log4j2-test.properties:

                    rootLogger.level = off
                    rootLogger.appenderRef.test.ref = TestLogger

                    logger.http.name  <http://logger.http.name>  = 
com.sun.net.httpserver
                    logger.http.level = trace

                    appender.testlogger.name  <http://appender.testlogger.name> 
 = TestLogger
                    appender.testlogger.type = CONSOLE
                    appender.testlogger.target = SYSTEM_ERR
                    appender.testlogger.layout.type = PatternLayout
                    appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - 
%m%n

                    On 03/05/2022 10:41, ChangZhuo Chen (陳昌倬) wrote:
                    On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott 
wrote:
                    Hi!

                    I also discovered problems with the PrometheusReporter on 
Flink 1.15.0,
                    coming from 1.14.4. I already consulted the mailing list:
                    
https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc
                    I have not found the underlying problem or a solution to it.

                    Actually, after re-checking, I see the same log WARNINGS as
                    ChangZhou described.

                    As I described, it seems to be an issue with my job. If no 
job, or an
                    example job runs on the taskmanager the basic metrics work 
just fine. Maybe
                    ChangZhou can confirm this?

                    @ChangZhou what's your job setup? I am running a streaming 
SQL job, but
                    also using data streams API to create the streaming 
environment and from
                    that the table environment and finally using a StatementSet 
to execute
                    multiple SQL statements in one job.
                    We are running a streaming application with low level API 
with
                    Kubernetes operator FlinkDeployment.





Reply via email to