[ https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15631889#comment-15631889 ]
Bill Zhang commented on KAFKA-955: ---------------------------------- I am using Flume with Kafka Channel & facing below issues. Kafka Version: kafka_2.9.1-0.8.2.0 Flume Version: apache-flume-1.6.0 It seems was resolved from below : Step 1: copy zookeeper Jar file to Flume classpath Step 2: a1.channels.c1.kafka.producer.type = async Note: i didn't change default value of request.required.acks. It seems works, it is still in testing... ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Issue 1: 02 Nov 2016 22:20:06,201 WARN [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2] (kafka.utils.Logging$class.warn:83) - Reconnect due to socket error: null 02 Nov 2016 22:20:06,203 INFO [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2], Stopped 02 Nov 2016 22:20:06,203 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2], Shutdown completed 02 Nov 2016 22:20:06,203 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1], Shutting down 02 Nov 2016 22:20:06,204 WARN [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1] (kafka.utils.Logging$class.warn:83) - Reconnect due to socket error: null 02 Nov 2016 22:20:06,204 INFO [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1], Stopped 02 Nov 2016 22:20:06,204 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1], Shutdown completed 02 Nov 2016 22:20:06,205 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478087994042] All connections stopped 02 Nov 2016 22:20:06,207 INFO [ZkClient-EventThread-58-SATL2036:2181,SATL2037:2181,SATL2038:2181/kafka] (org.I0Itec.zkclient.ZkEventThread.run:82) - Terminate ZkClient event thread. 02 Nov 2016 22:20:06,212 WARN [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.warn:89) - Failed to send producer request with correlation id 34198503 to broker 1 with data for partitions [channel-tbox-parsed-topic,3] java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:511) at java.nio.channels.SocketChannel.write(SocketChannel.java:502) at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56) at kafka.network.Send$class.writeCompletely(Transmission.scala:75) at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26) at kafka.network.BlockingChannel.send(BlockingChannel.scala:92) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:100) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:42) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139) at java.lang.Thread.run(Thread.java:745) 02 Nov 2016 22:20:06,214 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Back off for 100 ms before retrying send. Remaining retries = 3 02 Nov 2016 22:20:06,214 WARN [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit:363) - Sending events to Kafka failed java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:76) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:42) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139) at java.lang.Thread.run(Thread.java:745) 02 Nov 2016 22:20:06,215 ERROR [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.source.kafka.KafkaSource.process:153) - KafkaSource EXCEPTION, {} org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.kafka.KafkaChannel{name: c1} at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flume.ChannelException: Commit failed as send to Kafka failed at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:364) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) ... 3 more Caused by: java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:76) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:42) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357) ... 5 more 02 Nov 2016 22:20:06,233 INFO [agent-shutdown-hook] (org.apache.zookeeper.ZooKeeper.close:684) - Session: 0x2581dc726ab01ad closed 02 Nov 2016 22:20:06,233 INFO [lifecycleSupervisor-1-1-EventThread] (org.apache.zookeeper.ClientCnxn$EventThread.run:512) - EventThread shut down 02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume_tbox-topic_SATL2036-1478087994030-54387da2], ZKConsumerConnector shut down completed 02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:150) - Component type: SOURCE, name: r1 stopped 02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:156) - Shutdown Metric for type: SOURCE, name: r1. source.start.time == 1478087994119 02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:162) - Shutdown Metric for type: SOURCE, name: r1. source.stop.time == 1478096406239 02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. source.kafka.commit.time == 555 02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. source.kafka.event.get.time == 1409513 02 Nov 2016 22:20:06,239 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.append-batch.accepted == 0 02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.append-batch.received == 0 02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.append.accepted == 0 02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.append.received == 0 02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.events.accepted == 343 02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.events.received == 343 02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: SOURCE, name: r1. src.open-connection.count == 0 02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (org.apache.flume.source.kafka.KafkaSource.stop:237) - Kafka Source r1 stopped. Metrics: SOURCE:r1{src.events.accepted=343, src.open-connection.count=0, src.append.received=0, source.kafka.event.get.time=1409513, src.append-batch.received=0, src.append-batch.accepted=0, src.append.accepted=0, src.events.received=343, source.kafka.commit.time=555} 02 Nov 2016 22:20:06,240 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21], ZKConsumerConnector shutting down 02 Nov 2016 22:20:06,241 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478087993018] Stopping leader finder thread 02 Nov 2016 22:20:06,241 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread], Shutting down 02 Nov 2016 22:20:06,241 INFO [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread] (kafka.utils.Logging$class.info:68) - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread], Stopped 02 Nov 2016 22:20:06,241 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread], Shutdown completed 02 Nov 2016 22:20:06,241 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478087993018] Stopping all fetchers 02 Nov 2016 22:20:06,241 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1], Shutting down 02 Nov 2016 22:20:06,242 WARN [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1] (kafka.utils.Logging$class.warn:83) - Reconnect due to socket error: null 02 Nov 2016 22:20:06,242 INFO [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1], Stopped 02 Nov 2016 22:20:06,242 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1], Shutdown completed 02 Nov 2016 22:20:06,242 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478087993018] All connections stopped 02 Nov 2016 22:20:06,243 INFO [ZkClient-EventThread-42-SATL2036:2181,SATL2037:2181,SATL2038:2181/kafka] (org.I0Itec.zkclient.ZkEventThread.run:82) - Terminate ZkClient event thread. 02 Nov 2016 22:20:06,244 INFO [agent-shutdown-hook] (org.apache.zookeeper.ZooKeeper.close:684) - Session: 0x356eb2d4b833fab closed 02 Nov 2016 22:20:06,244 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21], ZKConsumerConnector shut down completed 02 Nov 2016 22:20:06,244 INFO [SinkRunner-PollingRunner-FailoverSinkProcessor-EventThread] (org.apache.zookeeper.ClientCnxn$EventThread.run:512) - EventThread shut down 02 Nov 2016 22:20:06,246 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - Shutting down producer 02 Nov 2016 22:20:06,247 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - Closing all sync producers 02 Nov 2016 22:20:06,256 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - Disconnecting from 10.25.20.36:9092 02 Nov 2016 22:20:06,256 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:150) - Component type: CHANNEL, name: c1 stopped 02 Nov 2016 22:20:06,256 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:156) - Shutdown Metric for type: CHANNEL, name: c1. channel.start.time == 1478087992836 02 Nov 2016 22:20:06,256 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:162) - Shutdown Metric for type: CHANNEL, name: c1. channel.stop.time == 1478096406256 02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.capacity == 0 02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.current.size == 0 02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.attempt == 0 02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.success == 343 02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.attempt == 0 02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.success == 342 02 Nov 2016 22:20:06,257 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.kafka.commit.time == 201 02 Nov 2016 22:20:06,258 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.kafka.event.get.time == 531 02 Nov 2016 22:20:06,258 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.kafka.event.send.time == 1789 02 Nov 2016 22:20:06,258 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178) - Shutdown Metric for type: CHANNEL, name: c1. channel.rollback.count == 0 02 Nov 2016 22:20:06,258 INFO [agent-shutdown-hook] (org.apache.flume.channel.kafka.KafkaChannel.stop:123) - Kafka channel c1 stopped. Metrics: CHANNEL:c1{channel.event.put.attempt=0, channel.event.put.success=343, channel.kafka.event.get.time=531, channel.current.size=0, channel.event.take.attempt=0, channel.event.take.success=342, channel.kafka.event.send.time=1789, channel.capacity=0, channel.kafka.commit.time=201, channel.rollback.count=0} 02 Nov 2016 22:20:06,264 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332) - Error while getting events from Kafka java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088) at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) 02 Nov 2016 22:20:06,274 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kafka.KafkaSink.process:139) - Failed to publish events org.apache.flume.ChannelException: Error while getting events from Kafka at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088) at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306) ... 6 more 02 Nov 2016 22:20:06,275 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: Failed to publish events at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:150) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97) ... 3 more Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088) at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306) ... 6 more 02 Nov 2016 22:20:06,794 INFO [flume_tbox-topic_SATL2036-1478087994030-54387da2_watcher_executor] (kafka.utils.Logging$class.info:68) - [flume_tbox-topic_SATL2036-1478087994030-54387da2], stopping watcher executor thread for consumer flume_tbox-topic_SATL2036-1478087994030-54387da2 02 Nov 2016 22:20:06,816 INFO [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21_watcher_executor] (kafka.utils.Logging$class.info:68) - [flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21], stopping watcher executor thread for consumer flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21 02 Nov 2016 22:20:06,887 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332) - Error while getting events from Kafka java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) 02 Nov 2016 22:20:06,888 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:459) - process failed org.apache.flume.ChannelException: Error while getting events from Kafka at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310) ... 6 more 02 Nov 2016 22:20:06,889 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.FailoverSinkProcessor.process:185) - Sink k1 failed and has been sent to failover list org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Error while getting events from Kafka at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463) at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) ... 3 more Caused by: java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310) ... 6 more 02 Nov 2016 22:20:06,896 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332) - Error while getting events from Kafka java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) 02 Nov 2016 22:20:06,897 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:459) - process failed org.apache.flume.ChannelException: Error while getting events from Kafka at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310) ... 6 more 02 Nov 2016 22:20:06,898 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.FailoverSinkProcessor.process:185) - Sink k2 failed and has been sent to failover list org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Error while getting events from Kafka at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463) at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) ... 3 more Caused by: java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310) ... 6 more 02 Nov 2016 22:20:06,898 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332) - Error while getting events from Kafka java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) 02 Nov 2016 22:20:06,898 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:459) - process failed org.apache.flume.ChannelException: Error while getting events from Kafka at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310) ... 6 more 02 Nov 2016 22:20:06,899 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.FailoverSinkProcessor.process:185) - Sink k3 failed and has been sent to failover list org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Error while getting events from Kafka at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463) at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) ... 3 more Caused by: java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310) ... 6 more 02 Nov 2016 22:20:06,899 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: All sinks failed to process, nothing left to failover to at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:191) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) 02 Nov 2016 22:20:07,216 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0], ZKConsumerConnector shutting down 02 Nov 2016 22:20:07,217 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478088061100] Stopping leader finder thread 02 Nov 2016 22:20:07,217 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread], Shutting down 02 Nov 2016 22:20:07,218 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread], Shutdown completed 02 Nov 2016 22:20:07,218 INFO [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread] (kafka.utils.Logging$class.info:68) - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread], Stopped 02 Nov 2016 22:20:07,218 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478088061100] Stopping all fetchers 02 Nov 2016 22:20:07,218 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2], Shutting down 02 Nov 2016 22:20:07,219 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2], Shutdown completed 02 Nov 2016 22:20:07,219 INFO [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2], Stopped 02 Nov 2016 22:20:07,220 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1], Shutting down 02 Nov 2016 22:20:07,220 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1], Shutdown completed 02 Nov 2016 22:20:07,220 INFO [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1], Stopped 02 Nov 2016 22:20:07,221 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [ConsumerFetcherManager-1478088061100] All connections stopped 02 Nov 2016 22:20:07,223 INFO [ZkClient-EventThread-48-SATL2036:2181,SATL2037:2181,SATL2038:2181/kafka] (org.I0Itec.zkclient.ZkEventThread.run:82) - Terminate ZkClient event thread. 02 Nov 2016 22:20:07,226 INFO [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0_watcher_executor] (kafka.utils.Logging$class.info:68) - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0], stopping watcher executor thread for consumer flume_tbox_parsed_SATL2036-1478088061094-7badc6c0 02 Nov 2016 22:20:07,226 INFO [agent-shutdown-hook] (org.apache.zookeeper.ZooKeeper.close:684) - Session: 0x156eb2d4a70421e closed 02 Nov 2016 22:20:07,226 INFO [lifecycleSupervisor-1-0-EventThread] (org.apache.zookeeper.ClientCnxn$EventThread.run:512) - EventThread shut down 02 Nov 2016 22:20:07,227 INFO [agent-shutdown-hook] (kafka.utils.Logging$class.info:68) - [flume_tbox_parsed_SATL2036-1478088061094-7badc6c0], ZKConsumerConnector shut down completed Issue 2: 03 Nov 2016 13:31:29,287 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Back off for 100 ms before retrying send. Remaining retries = 1 03 Nov 2016 13:31:29,388 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Fetching metadata from broker id:1,host:SATL2037,port:9092 with correlation id 2307612 for 1 topic(s) Set(channel-tbox-topic) 03 Nov 2016 13:31:29,388 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Connected to SATL2037:9092 for producing 03 Nov 2016 13:31:29,389 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Disconnecting from SATL2037:9092 03 Nov 2016 13:31:29,443 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Connected to SATL2037:9092 for producing 03 Nov 2016 13:31:29,549 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Disconnecting from SATL2037:9092 03 Nov 2016 13:31:29,550 WARN [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.warn:89) - Failed to send producer request with correlation id 2308613 to broker 2 with data for partitions [channel-tbox-topic,4] java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.writev0(Native Method) at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) at sun.nio.ch.IOUtil.write(IOUtil.java:148) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) at java.nio.channels.SocketChannel.write(SocketChannel.java:502) at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56) at kafka.network.Send$class.writeCompletely(Transmission.scala:75) at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26) at kafka.network.BlockingChannel.send(BlockingChannel.scala:92) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:100) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:42) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139) at java.lang.Thread.run(Thread.java:745) 03 Nov 2016 13:31:29,550 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Back off for 100 ms before retrying send. Remaining retries = 0 03 Nov 2016 13:31:29,651 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Fetching metadata from broker id:0,host:SATL2036,port:9092 with correlation id 2308614 for 1 topic(s) Set(channel-tbox-topic) 03 Nov 2016 13:31:29,651 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Connected to SATL2036:9092 for producing 03 Nov 2016 13:31:29,652 INFO [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.info:68) - Disconnecting from SATL2036:9092 03 Nov 2016 13:31:29,653 ERROR [PollableSourceRunner-KafkaSource-r1] (kafka.utils.Logging$class.error:97) - Failed to send requests for topics channel-tbox-topic with correlation ids in [2304607,2308614] 03 Nov 2016 13:31:29,653 WARN [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit:363) - Sending events to Kafka failed kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:42) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139) at java.lang.Thread.run(Thread.java:745) 03 Nov 2016 13:31:29,653 ERROR [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.source.kafka.KafkaSource.process:153) - KafkaSource EXCEPTION, {} org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.kafka.KafkaChannel{name: c1} at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flume.ChannelException: Commit failed as send to Kafka failed at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:364) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) ... 3 more Caused by: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:42) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357) ... 5 more > After a leader change, messages sent with ack=0 are lost > -------------------------------------------------------- > > Key: KAFKA-955 > URL: https://issues.apache.org/jira/browse/KAFKA-955 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8.0 > Reporter: Jason Rosenberg > Assignee: Guozhang Wang > Fix For: 0.8.0 > > Attachments: KAFKA-955-followup.v1.patch, KAFKA-955.v1.patch, > KAFKA-955.v1.patch, KAFKA-955.v2.patch, KAFKA-955.v3.patch, > KAFKA-955.v4.patch, KAFKA-955.v5.patch, KAFKA-955.v6.patch, KAFKA-955.v7.patch > > > If the leader changes for a partition, and a producer is sending messages > with ack=0, then messages will be lost, since the producer has no active way > of knowing that the leader has changed, until it's next metadata refresh > update. > The broker receiving the message, which is no longer the leader, logs a > message like this: > Produce request with correlation id 7136261 from client on partition > [mytopic,0] failed due to Leader not local for partition [mytopic,0] on > broker 508818741 > This is exacerbated by the controlled shutdown mechanism, which forces an > immediate leader change. > A possible solution to this would be for a broker which receives a message, > for a topic that it is no longer the leader for (and if the ack level is 0), > then the broker could just silently forward the message over to the current > leader. -- This message was sent by Atlassian JIRA (v6.3.4#6332)