Hi, We are getting the error i've pasted at the bottom of the mail in a Samza task, it never recovers and keeps printing messages like this in the log:
2016-07-27 17:46:20 BrokerProxy [DEBUG] Removed [event,1] 2016-07-27 17:46:20 KafkaSystemConsumer [INFO] Abdicating for [event,1] 2016-07-27 17:46:20 KafkaSystemConsumer [INFO] Refreshing brokers for: Map([event,1] -> 146737816) 2016-07-27 17:46:20 BrokerProxy [DEBUG] Adding new topic and partition [event,1] to queue for dockerc3.adjs.net 2016-07-27 17:46:20 GetOffset [INFO] Validating offset 146737816 for topic and partition [event,1] 2016-07-27 17:46:20 GetOffset [INFO] Able to successfully read from offset 146737816 for topic and partition [event,1]. Using it to instantiate consumer. 2016-07-27 17:46:20 BrokerProxy [DEBUG] Got offset 146737816 for new topic and partition [event,1]. 2016-07-27 17:46:20 BrokerProxy [DEBUG] Tried to start an already started broker proxy (BrokerProxy for dockerc3.adjs.net:9092). Ignoring. 2016-07-27 17:46:20 KafkaSystemConsumer [DEBUG] Claimed topic-partition ([event,1]) for (BrokerProxy for dockerc3.adjs.net:9092) Redeploying the task resolves the problem until the next time it happens. We are on Java 1.8, Samza 0.9.0 and Kafka 0.9. Could the problem be related to the difference in versions between the Kafka consumer used in Samza and the version of the Kafka broker? Could upgrading to Samza 10 help in this case? Would appreciate any advice. Thanks, Michael 2016-07-27 17:25:04 BrokerProxy [WARN] Restarting consumer due to kafka.common.UnknownException. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace. 2016-07-27 17:25:04 BrokerProxy [DEBUG] Exception detail: kafka.common.UnknownException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance( NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance( DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at kafka.consumer.SimpleConsumer.earliestOrLatestOffset( SimpleConsumer.scala:169) at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer. earliestOrLatestOffset(DefaultFetchSimpleConsumer.scala:56) at org.apache.samza.system.kafka.BrokerProxy$$anonfun$ refreshLatencyMetrics$1.apply(BrokerProxy.scala:310) at org.apache.samza.system.kafka.BrokerProxy$$anonfun$ refreshLatencyMetrics$1.apply(BrokerProxy.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.samza.system.kafka.BrokerProxy.refreshLatencyMetrics( BrokerProxy.scala:308) at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$ system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:187) at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$ run$1.apply(BrokerProxy.scala:146) at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$ run$1.apply(BrokerProxy.scala:133) at org.apache.samza.util.ExponentialSleepStrategy.run( ExponentialSleepStrategy.scala:82) at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run( BrokerProxy.scala:132) at java.lang.Thread.run(Thread.java:745) 2016-07-27 17:25:04 BrokerProxy [DEBUG] Removed [event,3]