Hi David,

I think this is expected to occur as a warning since we spin up all kafka
clients with the same client-id, which is $job.name + $job.id.

As Jagadish mentioned, it will be great if you can provide us the entire
log so that we can take a look.

As a side note for the samza contributors, I do believe the container spins
up kafka clients for each kafka systems defined, even if it is not used.
Iirc, we use `KafkaUtil.getClientId` for generating the client id. Perhaps
it makes sense to append another identifier with the client id (such as
system name or component name). That way, we won't lose the kafka-client
related metrics and there will be no overlap between the client ids.
Thoughts?

Thanks!
Navina

On Thu, Jul 20, 2017 at 9:13 AM, Jagadish Venkatraman <
jagadish1...@gmail.com> wrote:

> Can you share the entire log file if that's okay? The warning should be a
> red-herring IMHO.
>
> On Thu, Jul 20, 2017 at 7:50 AM Davide Simoncelli <netcelli....@gmail.com>
> wrote:
>
> > Hi,
> >
> > Thanks for the reply.
> >
> > It is a warning, but the application fails. Here is the logging:
> >
> >
> > 017-07-20 10:43:06.349 [main] AppInfoParser [INFO] Kafka version :
> 0.10.1.1
> > 2017-07-20 10:43:06.349 [main] AppInfoParser [INFO] Kafka commitId :
> > f10ef2720b03b247
> > 2017-07-20 10:43:06.351 [main] AppInfoParser [WARN] Error registering
> > AppInfo mbean
> > javax.management.InstanceAlreadyExistsException:
> > kafka.producer:type=app-info,id=samza_producer-wikipedia_feed-1
> >         at com.sun.jmx.mbeanserver.Repository.addMBean(
> Repository.java:437)
> >         at
> > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> >         at
> > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> >         at
> > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(
> DefaultMBeanServerInterceptor.java:900)
> >         at
> > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(
> DefaultMBeanServerInterceptor.java:324)
> >         at
> > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(
> JmxMBeanServer.java:522)
> >         at
> > org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(
> AppInfoParser.java:58)
> >         at
> > org.apache.kafka.clients.producer.KafkaProducer.<init>(
> KafkaProducer.java:331)
> >         at
> > org.apache.kafka.clients.producer.KafkaProducer.<init>(
> KafkaProducer.java:163)
> >         at
> > org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.
> apply(KafkaSystemFactory.scala:89)
> >         at
> > org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.
> apply(KafkaSystemFactory.scala:89)
> >         at
> > org.apache.samza.system.kafka.KafkaSystemProducer.send(
> KafkaSystemProducer.scala:144)
> >         at
> > org.apache.samza.coordinator.stream.CoordinatorStreamSystemProduce
> r.send(CoordinatorStreamSystemProducer.java:113)
> >         at
> > org.apache.samza.coordinator.stream.CoordinatorStreamWriter.
> sendSetConfigMessage(CoordinatorStreamWriter.java:98)
> >         at
> > org.apache.samza.coordinator.stream.CoordinatorStreamWriter.sendMessage(
> CoordinatorStreamWriter.java:82)
> >         at
> > org.apache.samza.job.yarn.SamzaYarnAppMasterService.onInit(
> SamzaYarnAppMasterService.scala:68)
> >         at
> > org.apache.samza.job.yarn.YarnClusterResourceManager.start(
> YarnClusterResourceManager.java:180)
> >         at
> > org.apache.samza.clustermanager.ContainerProcessManager.start(
> ContainerProcessManager.java:167)
> >         at
> > org.apache.samza.clustermanager.ClusterBasedJobCoordinator.run(
> ClusterBasedJobCoordinator.java:154)
> >         at
> > org.apache.samza.clustermanager.ClusterBasedJobCoordinator.main(
> ClusterBasedJobCoordinator.java:222)
> > 2017-07-20 10:43:06.549 [main] CoordinatorStreamWriter [INFO] Stopping
> the
> > coordinator stream producer.
> > 2017-07-20 10:43:06.549 [main] CoordinatorStreamSystemProducer [INFO]
> > Stopping coordinator stream producer.
> > 2017-07-20 10:43:06.549 [main] KafkaProducer [INFO] Closing the Kafka
> > producer with timeoutMillis = 9223372036854775807 ms.
> >
> >
> > > On 20 Jul 2017, at 3:16 pm, Jagadish Venkatraman <
> jagadish1...@gmail.com>
> > wrote:
> > >
> > > Hi Davide,
> > >
> > > Is this logged as an error or as a warning?
> > >
> > > IIUC, this warning should not fail the job. It may not cause some Mbean
> > > sensors / metrics emitted from Kafka to be correctly reported (since,
> > those
> > > are reported per-clientId).
> > >
> > > The job should still continue to run.
> > >
> > > The entire log file will be helpful for further debugging!
> > >
> > > On Thu, Jul 20, 2017 at 3:32 AM, Davide Simoncelli <
> > netcelli....@gmail.com <mailto:netcelli....@gmail.com>>
> > > wrote:
> > >
> > >> Hello,
> > >>
> > >> We are running Kafka 0.10.1.1 in production. Unfortunately the Samza
> app
> > >> fails to start because of this bug: https://issues.apache.org/ <
> > https://issues.apache.org/>
> > >> jira/browse/SAMZA-1027 <
> > https://issues.apache.org/jira/browse/SAMZA-1027 <
> > https://issues.apache.org/jira/browse/SAMZA-1027>>.
> > >>
> > >> Even hello-samza on YARN fails to start. Here its the error:
> > >>
> > >> javax.management.InstanceAlreadyExistsException:
> > >> kafka.producer:type=app-info,id=samza_producer-wikipedia_feed-1
> > >>        at com.sun.jmx.mbeanserver.Repository.addMBean(
> > >> Repository.java:437)
> > >>        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> > >> registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> > >>        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> > >> registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> > >>        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> > >> registerObject(DefaultMBeanServerInterceptor.java:900)
> > >>        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> > >> registerMBean(DefaultMBeanServerInterceptor.java:324)
> > >>        at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(
> > >> JmxMBeanServer.java:522)
> > >>        at org.apache.kafka.common.utils.
> AppInfoParser.registerAppInfo(
> > >> AppInfoParser.java:58)
> > >>        at org.apache.kafka.clients.producer.KafkaProducer.<init>(
> > >> KafkaProducer.java:331)
> > >>        at org.apache.kafka.clients.producer.KafkaProducer.<init>(
> > >> KafkaProducer.java:163)
> > >>        at org.apache.samza.system.kafka.
> KafkaSystemFactory$$anonfun$3.
> > >> apply(KafkaSystemFactory.scala:89)
> > >>        at org.apache.samza.system.kafka.
> KafkaSystemFactory$$anonfun$3.
> > >> apply(KafkaSystemFactory.scala:89)
> > >>        at org.apache.samza.system.kafka.KafkaSystemProducer.send(
> > >> KafkaSystemProducer.scala:144)
> > >>        at org.apache.samza.coordinator.stream.
> > >> CoordinatorStreamSystemProducer.send(CoordinatorStreamSystemProduce
> > >> r.java:113)
> > >>        at org.apache.samza.coordinator.stream.
> CoordinatorStreamWriter.
> > >> sendSetConfigMessage(CoordinatorStreamWriter.java:98)
> > >>        at org.apache.samza.coordinator.stream.
> CoordinatorStreamWriter.
> > >> sendMessage(CoordinatorStreamWriter.java:82)
> > >>        at org.apache.samza.job.yarn.SamzaYarnAppMasterService.onInit(
> > >> SamzaYarnAppMasterService.scala:68)
> > >>        at org.apache.samza.job.yarn.YarnClusterResourceManager.start(
> > >> YarnClusterResourceManager.java:180)
> > >>        at org.apache.samza.clustermanager.
> ContainerProcessManager.start(
> > >> ContainerProcessManager.java:167)
> > >>        at
> > org.apache.samza.clustermanager.ClusterBasedJobCoordinator.run(
> > >> ClusterBasedJobCoordinator.java:154)
> > >>        at org.apache.samza.clustermanager.ClusterBasedJobCoordinator.
> > >> main(ClusterBasedJobCoordinator.java:222)
> > >>
> > >>
> > >> According to samza-job-coordinator.log file, it is creating two
> > producers
> > >> with the same client ID:
> > >>
> > >> 2017-07-20 04:03:12.208 [main] KafkaSystemProducer [INFO] Creating a
> new
> > >> producer for system kafka.
> > >> 2017-07-20 04:03:12.224 [main] ProducerConfig [INFO] ProducerConfig
> > values:
> > >>        acks = 1
> > >>        batch.size = 16384
> > >>        block.on.buffer.full = false
> > >>        buffer.memory = 33554432
> > >>        client.id = samza_producer-wikipedia_feed-1
> > >>
> > >>
> > >> 2017-07-20 04:03:13.510 [main] KafkaSystemProducer [INFO] Creating a
> new
> > >> producer for system kafka.
> > >> 2017-07-20 04:03:13.510 [main] ProducerConfig [INFO] ProducerConfig
> > values:
> > >>        acks = 1
> > >>        batch.size = 16384
> > >>        block.on.buffer.full = false
> > >>        buffer.memory = 33554432
> > >>        client.id = samza_producer-wikipedia_feed-1
> > >>        compression.type = none
> > >>
> > >> Any idea why it is happening?
> > >>
> > >> Thanks
> > >>
> > >> Davide
> > >
> > >
> > >
> > >
> > > --
> > > Jagadish V,
> > > Graduate Student,
> > > Department of Computer Science,
> > > Stanford University
> >
> > --
> Sent from my iphone.
>

Reply via email to