hi everyone,

    In my topology, my kafkaspout has many problems and I don't know how it 
happens
  1. the latency is very high , I have 10 kafkaspout and the average latency is 
up to 10k ms
  2. Failed number is very high, the number of failed is about 50% of emitted 
tuple.
  3. An error often occurs, the error info is :

     java.lang.NoClassDefFoundError: Could not initialize class 
com.yammer.metrics.Metrics at 
kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:52) at 
kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:25)
 at 
kafka.consumer.FetchRequestAndResponseMetrics.<init>(FetchRequestAndResponseStats.scala:26)
 at 
kafka.consumer.FetchRequestAndResponseStats.<init>(FetchRequestAndResponseStats.scala:37)
 at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:50)
 at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:50)
 at kafka.utils.Pool.getAndMaybePut(Pool.scala:61) at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:54)
 at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39) at 
kafka.javaapi.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:34) at 
storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:60)
 at storm.kafka.PartitionManager.<init>(PartitionManager.java:64) at 
storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) at 
storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) at 
storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) at 
backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565)
 at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at 
clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)

     java.lang.ExceptionInInitializerError at 
kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:52) at 
kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:25)
 at 
kafka.consumer.FetchRequestAndResponseMetrics.<init>(FetchRequestAndResponseStats.scala:26)
 at 
kafka.consumer.FetchRequestAndResponseStats.<init>(FetchRequestAndResponseStats.scala:37)
 at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:50)
 at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:50)
 at kafka.utils.Pool.getAndMaybePut(Pool.scala:61) at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:54)
 at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39) at 
kafka.javaapi.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:34) at 
storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:60)
 at storm.kafka.PartitionManager.<init>(PartitionManager.java:64) at 
storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) at 
storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) at 
storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) at 
backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565)
 at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at 
clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.IllegalStateException: Shutdown in progress at 
java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66) at 
java.lang.Runtime.addShutdownHook(Runtime.java:211) at 
com.yammer.metrics.Metrics.<clinit>(Metrics.java:21) ...

   
   My storm version is 0.9.3 and kafka version is 2.9.2-0.8.1.1,and kafkaspout 
is the external of storm 0.9.3. and my kafka topic have 20 partitions. can 
anyone tell me how to fix it?

    thanks for any response.

Jun.


  

Reply via email to