+1 for adding system name to the client id.

- Prateek

On Thu, Jul 20, 2017 at 10:43 AM, Navina Ramesh (Apache) <nav...@apache.org>
wrote:

> Hi David,
>
> I think this is expected to occur as a warning since we spin up all kafka
> clients with the same client-id, which is $job.name + $job.id.
>
> As Jagadish mentioned, it will be great if you can provide us the entire
> log so that we can take a look.
>
> As a side note for the samza contributors, I do believe the container spins
> up kafka clients for each kafka systems defined, even if it is not used.
> Iirc, we use `KafkaUtil.getClientId` for generating the client id. Perhaps
> it makes sense to append another identifier with the client id (such as
> system name or component name). That way, we won't lose the kafka-client
> related metrics and there will be no overlap between the client ids.
> Thoughts?
>
> Thanks!
> Navina
>
> On Thu, Jul 20, 2017 at 9:13 AM, Jagadish Venkatraman <
> jagadish1...@gmail.com> wrote:
>
> > Can you share the entire log file if that's okay? The warning should be a
> > red-herring IMHO.
> >
> > On Thu, Jul 20, 2017 at 7:50 AM Davide Simoncelli <
> netcelli....@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > Thanks for the reply.
> > >
> > > It is a warning, but the application fails. Here is the logging:
> > >
> > >
> > > 017-07-20 10:43:06.349 [main] AppInfoParser [INFO] Kafka version :
> > 0.10.1.1
> > > 2017-07-20 10:43:06.349 [main] AppInfoParser [INFO] Kafka commitId :
> > > f10ef2720b03b247
> > > 2017-07-20 10:43:06.351 [main] AppInfoParser [WARN] Error registering
> > > AppInfo mbean
> > > javax.management.InstanceAlreadyExistsException:
> > > kafka.producer:type=app-info,id=samza_producer-wikipedia_feed-1
> > >         at com.sun.jmx.mbeanserver.Repository.addMBean(
> > Repository.java:437)
> > >         at
> > > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> > registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> > >         at
> > > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> > registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> > >         at
> > > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(
> > DefaultMBeanServerInterceptor.java:900)
> > >         at
> > > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(
> > DefaultMBeanServerInterceptor.java:324)
> > >         at
> > > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(
> > JmxMBeanServer.java:522)
> > >         at
> > > org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(
> > AppInfoParser.java:58)
> > >         at
> > > org.apache.kafka.clients.producer.KafkaProducer.<init>(
> > KafkaProducer.java:331)
> > >         at
> > > org.apache.kafka.clients.producer.KafkaProducer.<init>(
> > KafkaProducer.java:163)
> > >         at
> > > org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.
> > apply(KafkaSystemFactory.scala:89)
> > >         at
> > > org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.
> > apply(KafkaSystemFactory.scala:89)
> > >         at
> > > org.apache.samza.system.kafka.KafkaSystemProducer.send(
> > KafkaSystemProducer.scala:144)
> > >         at
> > > org.apache.samza.coordinator.stream.CoordinatorStreamSystemProduce
> > r.send(CoordinatorStreamSystemProducer.java:113)
> > >         at
> > > org.apache.samza.coordinator.stream.CoordinatorStreamWriter.
> > sendSetConfigMessage(CoordinatorStreamWriter.java:98)
> > >         at
> > > org.apache.samza.coordinator.stream.CoordinatorStreamWriter.
> sendMessage(
> > CoordinatorStreamWriter.java:82)
> > >         at
> > > org.apache.samza.job.yarn.SamzaYarnAppMasterService.onInit(
> > SamzaYarnAppMasterService.scala:68)
> > >         at
> > > org.apache.samza.job.yarn.YarnClusterResourceManager.start(
> > YarnClusterResourceManager.java:180)
> > >         at
> > > org.apache.samza.clustermanager.ContainerProcessManager.start(
> > ContainerProcessManager.java:167)
> > >         at
> > > org.apache.samza.clustermanager.ClusterBasedJobCoordinator.run(
> > ClusterBasedJobCoordinator.java:154)
> > >         at
> > > org.apache.samza.clustermanager.ClusterBasedJobCoordinator.main(
> > ClusterBasedJobCoordinator.java:222)
> > > 2017-07-20 10:43:06.549 [main] CoordinatorStreamWriter [INFO] Stopping
> > the
> > > coordinator stream producer.
> > > 2017-07-20 10:43:06.549 [main] CoordinatorStreamSystemProducer [INFO]
> > > Stopping coordinator stream producer.
> > > 2017-07-20 10:43:06.549 [main] KafkaProducer [INFO] Closing the Kafka
> > > producer with timeoutMillis = 9223372036854775807 ms.
> > >
> > >
> > > > On 20 Jul 2017, at 3:16 pm, Jagadish Venkatraman <
> > jagadish1...@gmail.com>
> > > wrote:
> > > >
> > > > Hi Davide,
> > > >
> > > > Is this logged as an error or as a warning?
> > > >
> > > > IIUC, this warning should not fail the job. It may not cause some
> Mbean
> > > > sensors / metrics emitted from Kafka to be correctly reported (since,
> > > those
> > > > are reported per-clientId).
> > > >
> > > > The job should still continue to run.
> > > >
> > > > The entire log file will be helpful for further debugging!
> > > >
> > > > On Thu, Jul 20, 2017 at 3:32 AM, Davide Simoncelli <
> > > netcelli....@gmail.com <mailto:netcelli....@gmail.com>>
> > > > wrote:
> > > >
> > > >> Hello,
> > > >>
> > > >> We are running Kafka 0.10.1.1 in production. Unfortunately the Samza
> > app
> > > >> fails to start because of this bug: https://issues.apache.org/ <
> > > https://issues.apache.org/>
> > > >> jira/browse/SAMZA-1027 <
> > > https://issues.apache.org/jira/browse/SAMZA-1027 <
> > > https://issues.apache.org/jira/browse/SAMZA-1027>>.
> > > >>
> > > >> Even hello-samza on YARN fails to start. Here its the error:
> > > >>
> > > >> javax.management.InstanceAlreadyExistsException:
> > > >> kafka.producer:type=app-info,id=samza_producer-wikipedia_feed-1
> > > >>        at com.sun.jmx.mbeanserver.Repository.addMBean(
> > > >> Repository.java:437)
> > > >>        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> > > >> registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> > > >>        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> > > >> registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> > > >>        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> > > >> registerObject(DefaultMBeanServerInterceptor.java:900)
> > > >>        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> > > >> registerMBean(DefaultMBeanServerInterceptor.java:324)
> > > >>        at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(
> > > >> JmxMBeanServer.java:522)
> > > >>        at org.apache.kafka.common.utils.
> > AppInfoParser.registerAppInfo(
> > > >> AppInfoParser.java:58)
> > > >>        at org.apache.kafka.clients.producer.KafkaProducer.<init>(
> > > >> KafkaProducer.java:331)
> > > >>        at org.apache.kafka.clients.producer.KafkaProducer.<init>(
> > > >> KafkaProducer.java:163)
> > > >>        at org.apache.samza.system.kafka.
> > KafkaSystemFactory$$anonfun$3.
> > > >> apply(KafkaSystemFactory.scala:89)
> > > >>        at org.apache.samza.system.kafka.
> > KafkaSystemFactory$$anonfun$3.
> > > >> apply(KafkaSystemFactory.scala:89)
> > > >>        at org.apache.samza.system.kafka.KafkaSystemProducer.send(
> > > >> KafkaSystemProducer.scala:144)
> > > >>        at org.apache.samza.coordinator.stream.
> > > >> CoordinatorStreamSystemProducer.send(CoordinatorStreamSystemProduce
> > > >> r.java:113)
> > > >>        at org.apache.samza.coordinator.stream.
> > CoordinatorStreamWriter.
> > > >> sendSetConfigMessage(CoordinatorStreamWriter.java:98)
> > > >>        at org.apache.samza.coordinator.stream.
> > CoordinatorStreamWriter.
> > > >> sendMessage(CoordinatorStreamWriter.java:82)
> > > >>        at org.apache.samza.job.yarn.SamzaYarnAppMasterService.
> onInit(
> > > >> SamzaYarnAppMasterService.scala:68)
> > > >>        at org.apache.samza.job.yarn.YarnClusterResourceManager.
> start(
> > > >> YarnClusterResourceManager.java:180)
> > > >>        at org.apache.samza.clustermanager.
> > ContainerProcessManager.start(
> > > >> ContainerProcessManager.java:167)
> > > >>        at
> > > org.apache.samza.clustermanager.ClusterBasedJobCoordinator.run(
> > > >> ClusterBasedJobCoordinator.java:154)
> > > >>        at org.apache.samza.clustermanager.
> ClusterBasedJobCoordinator.
> > > >> main(ClusterBasedJobCoordinator.java:222)
> > > >>
> > > >>
> > > >> According to samza-job-coordinator.log file, it is creating two
> > > producers
> > > >> with the same client ID:
> > > >>
> > > >> 2017-07-20 04:03:12.208 [main] KafkaSystemProducer [INFO] Creating a
> > new
> > > >> producer for system kafka.
> > > >> 2017-07-20 04:03:12.224 [main] ProducerConfig [INFO] ProducerConfig
> > > values:
> > > >>        acks = 1
> > > >>        batch.size = 16384
> > > >>        block.on.buffer.full = false
> > > >>        buffer.memory = 33554432
> > > >>        client.id = samza_producer-wikipedia_feed-1
> > > >>
> > > >>
> > > >> 2017-07-20 04:03:13.510 [main] KafkaSystemProducer [INFO] Creating a
> > new
> > > >> producer for system kafka.
> > > >> 2017-07-20 04:03:13.510 [main] ProducerConfig [INFO] ProducerConfig
> > > values:
> > > >>        acks = 1
> > > >>        batch.size = 16384
> > > >>        block.on.buffer.full = false
> > > >>        buffer.memory = 33554432
> > > >>        client.id = samza_producer-wikipedia_feed-1
> > > >>        compression.type = none
> > > >>
> > > >> Any idea why it is happening?
> > > >>
> > > >> Thanks
> > > >>
> > > >> Davide
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Jagadish V,
> > > > Graduate Student,
> > > > Department of Computer Science,
> > > > Stanford University
> > >
> > > --
> > Sent from my iphone.
> >
>

Reply via email to