[ 
https://issues.apache.org/jira/browse/FLINK-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16281834#comment-16281834
 ] 

dongtingting commented on FLINK-8093:
-------------------------------------

[~aljoscha] I think clientid is thread safe and static. But one taskmanager may 
have multi slots, different slots use different environment and kafkaProducer 
classes. So one taskmanager may have multi  same clientid, but metrics will 
register sun.jmx.mbeanserver which is one in one jvm . Then multi same clientid 
conflict while register into one sun.jmx.mbeanserver. 

We fix this problem by user code set:
properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-" + topic + 
timestamp);
This can avoid conflict。 

In addition we want to modify flink code to  avoid conflict further。 

> flink job fail because of kafka producer create fail of 
> "javax.management.InstanceAlreadyExistsException"
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8093
>                 URL: https://issues.apache.org/jira/browse/FLINK-8093
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.3.2
>         Environment: flink 1.3.2, kafka 0.9.1
>            Reporter: dongtingting
>            Priority: Critical
>
> one taskmanager has multiple taskslot, one task fail because of create 
> kafkaProducer fail,the reason for create kafkaProducer fail is 
> “javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=producer-metrics,client-id=producer-3”。 the detail trace 
> is :
> 2017-11-04 19:41:23,281 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Source: Custom Source -> Filter -> Map -> Filter -> Sink: 
> dp_client_**_log (7/80) (99551f3f892232d7df5eb9060fa9940c) switched from 
> RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getKafkaProducer(FlinkKafkaProducerBase.java:202)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:212)
>         at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>         at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.KafkaException: Error registering mbean 
> kafka.producer:type=producer-metrics,client-id=producer-3
>         at 
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
>         at 
> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
>         at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
>         at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:255)
>         at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:239)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.registerMetrics(RecordAccumulator.java:137)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.<init>(RecordAccumulator.java:111)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:261)
>         ... 9 more
> Caused by: javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=producer-metrics,client-id=producer-3
>         at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>         at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>         at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>         at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>         at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>         at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>         at 
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
>         ... 16 more
> I doubt that task in different taskslot of one taskmanager use different 
> classloader, and taskid may be  the same in one process。 So this lead to 
> create kafkaProducer fail in one taskManager。 
> Does anybody encountered the same problem? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to