+1 for adding system name to the client id. - Prateek
On Thu, Jul 20, 2017 at 10:43 AM, Navina Ramesh (Apache) <nav...@apache.org> wrote: > Hi David, > > I think this is expected to occur as a warning since we spin up all kafka > clients with the same client-id, which is $job.name + $job.id. > > As Jagadish mentioned, it will be great if you can provide us the entire > log so that we can take a look. > > As a side note for the samza contributors, I do believe the container spins > up kafka clients for each kafka systems defined, even if it is not used. > Iirc, we use `KafkaUtil.getClientId` for generating the client id. Perhaps > it makes sense to append another identifier with the client id (such as > system name or component name). That way, we won't lose the kafka-client > related metrics and there will be no overlap between the client ids. > Thoughts? > > Thanks! > Navina > > On Thu, Jul 20, 2017 at 9:13 AM, Jagadish Venkatraman < > jagadish1...@gmail.com> wrote: > > > Can you share the entire log file if that's okay? The warning should be a > > red-herring IMHO. > > > > On Thu, Jul 20, 2017 at 7:50 AM Davide Simoncelli < > netcelli....@gmail.com> > > wrote: > > > > > Hi, > > > > > > Thanks for the reply. > > > > > > It is a warning, but the application fails. Here is the logging: > > > > > > > > > 017-07-20 10:43:06.349 [main] AppInfoParser [INFO] Kafka version : > > 0.10.1.1 > > > 2017-07-20 10:43:06.349 [main] AppInfoParser [INFO] Kafka commitId : > > > f10ef2720b03b247 > > > 2017-07-20 10:43:06.351 [main] AppInfoParser [WARN] Error registering > > > AppInfo mbean > > > javax.management.InstanceAlreadyExistsException: > > > kafka.producer:type=app-info,id=samza_producer-wikipedia_feed-1 > > > at com.sun.jmx.mbeanserver.Repository.addMBean( > > Repository.java:437) > > > at > > > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > > registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > > > at > > > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > > registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > > > at > > > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject( > > DefaultMBeanServerInterceptor.java:900) > > > at > > > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean( > > DefaultMBeanServerInterceptor.java:324) > > > at > > > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean( > > JmxMBeanServer.java:522) > > > at > > > org.apache.kafka.common.utils.AppInfoParser.registerAppInfo( > > AppInfoParser.java:58) > > > at > > > org.apache.kafka.clients.producer.KafkaProducer.<init>( > > KafkaProducer.java:331) > > > at > > > org.apache.kafka.clients.producer.KafkaProducer.<init>( > > KafkaProducer.java:163) > > > at > > > org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3. > > apply(KafkaSystemFactory.scala:89) > > > at > > > org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3. > > apply(KafkaSystemFactory.scala:89) > > > at > > > org.apache.samza.system.kafka.KafkaSystemProducer.send( > > KafkaSystemProducer.scala:144) > > > at > > > org.apache.samza.coordinator.stream.CoordinatorStreamSystemProduce > > r.send(CoordinatorStreamSystemProducer.java:113) > > > at > > > org.apache.samza.coordinator.stream.CoordinatorStreamWriter. > > sendSetConfigMessage(CoordinatorStreamWriter.java:98) > > > at > > > org.apache.samza.coordinator.stream.CoordinatorStreamWriter. > sendMessage( > > CoordinatorStreamWriter.java:82) > > > at > > > org.apache.samza.job.yarn.SamzaYarnAppMasterService.onInit( > > SamzaYarnAppMasterService.scala:68) > > > at > > > org.apache.samza.job.yarn.YarnClusterResourceManager.start( > > YarnClusterResourceManager.java:180) > > > at > > > org.apache.samza.clustermanager.ContainerProcessManager.start( > > ContainerProcessManager.java:167) > > > at > > > org.apache.samza.clustermanager.ClusterBasedJobCoordinator.run( > > ClusterBasedJobCoordinator.java:154) > > > at > > > org.apache.samza.clustermanager.ClusterBasedJobCoordinator.main( > > ClusterBasedJobCoordinator.java:222) > > > 2017-07-20 10:43:06.549 [main] CoordinatorStreamWriter [INFO] Stopping > > the > > > coordinator stream producer. > > > 2017-07-20 10:43:06.549 [main] CoordinatorStreamSystemProducer [INFO] > > > Stopping coordinator stream producer. > > > 2017-07-20 10:43:06.549 [main] KafkaProducer [INFO] Closing the Kafka > > > producer with timeoutMillis = 9223372036854775807 ms. > > > > > > > > > > On 20 Jul 2017, at 3:16 pm, Jagadish Venkatraman < > > jagadish1...@gmail.com> > > > wrote: > > > > > > > > Hi Davide, > > > > > > > > Is this logged as an error or as a warning? > > > > > > > > IIUC, this warning should not fail the job. It may not cause some > Mbean > > > > sensors / metrics emitted from Kafka to be correctly reported (since, > > > those > > > > are reported per-clientId). > > > > > > > > The job should still continue to run. > > > > > > > > The entire log file will be helpful for further debugging! > > > > > > > > On Thu, Jul 20, 2017 at 3:32 AM, Davide Simoncelli < > > > netcelli....@gmail.com <mailto:netcelli....@gmail.com>> > > > > wrote: > > > > > > > >> Hello, > > > >> > > > >> We are running Kafka 0.10.1.1 in production. Unfortunately the Samza > > app > > > >> fails to start because of this bug: https://issues.apache.org/ < > > > https://issues.apache.org/> > > > >> jira/browse/SAMZA-1027 < > > > https://issues.apache.org/jira/browse/SAMZA-1027 < > > > https://issues.apache.org/jira/browse/SAMZA-1027>>. > > > >> > > > >> Even hello-samza on YARN fails to start. Here its the error: > > > >> > > > >> javax.management.InstanceAlreadyExistsException: > > > >> kafka.producer:type=app-info,id=samza_producer-wikipedia_feed-1 > > > >> at com.sun.jmx.mbeanserver.Repository.addMBean( > > > >> Repository.java:437) > > > >> at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > > > >> registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > > > >> at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > > > >> registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > > > >> at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > > > >> registerObject(DefaultMBeanServerInterceptor.java:900) > > > >> at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > > > >> registerMBean(DefaultMBeanServerInterceptor.java:324) > > > >> at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean( > > > >> JmxMBeanServer.java:522) > > > >> at org.apache.kafka.common.utils. > > AppInfoParser.registerAppInfo( > > > >> AppInfoParser.java:58) > > > >> at org.apache.kafka.clients.producer.KafkaProducer.<init>( > > > >> KafkaProducer.java:331) > > > >> at org.apache.kafka.clients.producer.KafkaProducer.<init>( > > > >> KafkaProducer.java:163) > > > >> at org.apache.samza.system.kafka. > > KafkaSystemFactory$$anonfun$3. > > > >> apply(KafkaSystemFactory.scala:89) > > > >> at org.apache.samza.system.kafka. > > KafkaSystemFactory$$anonfun$3. > > > >> apply(KafkaSystemFactory.scala:89) > > > >> at org.apache.samza.system.kafka.KafkaSystemProducer.send( > > > >> KafkaSystemProducer.scala:144) > > > >> at org.apache.samza.coordinator.stream. > > > >> CoordinatorStreamSystemProducer.send(CoordinatorStreamSystemProduce > > > >> r.java:113) > > > >> at org.apache.samza.coordinator.stream. > > CoordinatorStreamWriter. > > > >> sendSetConfigMessage(CoordinatorStreamWriter.java:98) > > > >> at org.apache.samza.coordinator.stream. > > CoordinatorStreamWriter. > > > >> sendMessage(CoordinatorStreamWriter.java:82) > > > >> at org.apache.samza.job.yarn.SamzaYarnAppMasterService. > onInit( > > > >> SamzaYarnAppMasterService.scala:68) > > > >> at org.apache.samza.job.yarn.YarnClusterResourceManager. > start( > > > >> YarnClusterResourceManager.java:180) > > > >> at org.apache.samza.clustermanager. > > ContainerProcessManager.start( > > > >> ContainerProcessManager.java:167) > > > >> at > > > org.apache.samza.clustermanager.ClusterBasedJobCoordinator.run( > > > >> ClusterBasedJobCoordinator.java:154) > > > >> at org.apache.samza.clustermanager. > ClusterBasedJobCoordinator. > > > >> main(ClusterBasedJobCoordinator.java:222) > > > >> > > > >> > > > >> According to samza-job-coordinator.log file, it is creating two > > > producers > > > >> with the same client ID: > > > >> > > > >> 2017-07-20 04:03:12.208 [main] KafkaSystemProducer [INFO] Creating a > > new > > > >> producer for system kafka. > > > >> 2017-07-20 04:03:12.224 [main] ProducerConfig [INFO] ProducerConfig > > > values: > > > >> acks = 1 > > > >> batch.size = 16384 > > > >> block.on.buffer.full = false > > > >> buffer.memory = 33554432 > > > >> client.id = samza_producer-wikipedia_feed-1 > > > >> > > > >> > > > >> 2017-07-20 04:03:13.510 [main] KafkaSystemProducer [INFO] Creating a > > new > > > >> producer for system kafka. > > > >> 2017-07-20 04:03:13.510 [main] ProducerConfig [INFO] ProducerConfig > > > values: > > > >> acks = 1 > > > >> batch.size = 16384 > > > >> block.on.buffer.full = false > > > >> buffer.memory = 33554432 > > > >> client.id = samza_producer-wikipedia_feed-1 > > > >> compression.type = none > > > >> > > > >> Any idea why it is happening? > > > >> > > > >> Thanks > > > >> > > > >> Davide > > > > > > > > > > > > > > > > > > > > -- > > > > Jagadish V, > > > > Graduate Student, > > > > Department of Computer Science, > > > > Stanford University > > > > > > -- > > Sent from my iphone. > > >