[ 
https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15631889#comment-15631889
 ] 

Bill Zhang commented on KAFKA-955:
----------------------------------

I am using Flume with Kafka Channel & facing below issues. 

Kafka Version: kafka_2.9.1-0.8.2.0
Flume Version: apache-flume-1.6.0

It seems was resolved from below :
Step 1: copy zookeeper Jar file to Flume classpath
Step 2: a1.channels.c1.kafka.producer.type = async

Note:
i didn't change default value of request.required.acks. It seems works, it is 
still in testing...


~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Issue 1:
02 Nov 2016 22:20:06,201 WARN  
[ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2] 
(kafka.utils.Logging$class.warn:83)  - Reconnect due to socket error: null
02 Nov 2016 22:20:06,203 INFO  
[ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2], 
Stopped 
02 Nov 2016 22:20:06,203 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-2], 
Shutdown completed
02 Nov 2016 22:20:06,203 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1], 
Shutting down
02 Nov 2016 22:20:06,204 WARN  
[ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1] 
(kafka.utils.Logging$class.warn:83)  - Reconnect due to socket error: null
02 Nov 2016 22:20:06,204 INFO  
[ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1], 
Stopped 
02 Nov 2016 22:20:06,204 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume_tbox-topic_SATL2036-1478087994030-54387da2-0-1], 
Shutdown completed
02 Nov 2016 22:20:06,205 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478087994042] 
All connections stopped
02 Nov 2016 22:20:06,207 INFO  
[ZkClient-EventThread-58-SATL2036:2181,SATL2037:2181,SATL2038:2181/kafka] 
(org.I0Itec.zkclient.ZkEventThread.run:82)  - Terminate ZkClient event thread.
02 Nov 2016 22:20:06,212 WARN  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.warn:89)  - Failed to send producer request with 
correlation id 34198503 to broker 1 with data for partitions 
[channel-tbox-parsed-topic,3]
java.nio.channels.ClosedByInterruptException
        at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:511)
        at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
        at 
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
        at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
        at 
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
        at 
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
        at 
org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
        at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
        at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,214 INFO  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.info:68)  - Back off for 100 ms before retrying 
send. Remaining retries = 3
02 Nov 2016 22:20:06,214 WARN  [PollableSourceRunner-KafkaSource-r1] 
(org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit:363)  - 
Sending events to Kafka failed
java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:76)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
        at 
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
        at 
org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
        at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
        at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,215 ERROR [PollableSourceRunner-KafkaSource-r1] 
(org.apache.flume.source.kafka.KafkaSource.process:153)  - KafkaSource 
EXCEPTION, {}
org.apache.flume.ChannelException: Unable to put batch on required channel: 
org.apache.flume.channel.kafka.KafkaChannel{name: c1}
        at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
        at 
org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
        at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Commit failed as send to Kafka 
failed
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:364)
        at 
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
        ... 3 more
Caused by: java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:76)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
        ... 5 more
02 Nov 2016 22:20:06,233 INFO  [agent-shutdown-hook] 
(org.apache.zookeeper.ZooKeeper.close:684)  - Session: 0x2581dc726ab01ad closed
02 Nov 2016 22:20:06,233 INFO  [lifecycleSupervisor-1-1-EventThread] 
(org.apache.zookeeper.ClientCnxn$EventThread.run:512)  - EventThread shut down
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[flume_tbox-topic_SATL2036-1478087994030-54387da2], ZKConsumerConnector shut 
down completed
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:150)  - Component 
type: SOURCE, name: r1 stopped
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:156)  - Shutdown 
Metric for type: SOURCE, name: r1. source.start.time == 1478087994119
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:162)  - Shutdown 
Metric for type: SOURCE, name: r1. source.stop.time == 1478096406239
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: SOURCE, name: r1. source.kafka.commit.time == 555
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: SOURCE, name: r1. source.kafka.event.get.time == 1409513
02 Nov 2016 22:20:06,239 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: SOURCE, name: r1. src.append-batch.accepted == 0
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: SOURCE, name: r1. src.append-batch.received == 0
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: SOURCE, name: r1. src.append.accepted == 0
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: SOURCE, name: r1. src.append.received == 0
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: SOURCE, name: r1. src.events.accepted == 343
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: SOURCE, name: r1. src.events.received == 343
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: SOURCE, name: r1. src.open-connection.count == 0
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] 
(org.apache.flume.source.kafka.KafkaSource.stop:237)  - Kafka Source r1 
stopped. Metrics: SOURCE:r1{src.events.accepted=343, 
src.open-connection.count=0, src.append.received=0, 
source.kafka.event.get.time=1409513, src.append-batch.received=0, 
src.append-batch.accepted=0, src.append.accepted=0, src.events.received=343, 
source.kafka.commit.time=555}
02 Nov 2016 22:20:06,240 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21], ZKConsumerConnector 
shutting down
02 Nov 2016 22:20:06,241 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478087993018] 
Stopping leader finder thread
02 Nov 2016 22:20:06,241 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread],
 Shutting down
02 Nov 2016 22:20:06,241 INFO  
[flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread] 
(kafka.utils.Logging$class.info:68)  - 
[flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread],
 Stopped 
02 Nov 2016 22:20:06,241 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-leader-finder-thread],
 Shutdown completed
02 Nov 2016 22:20:06,241 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478087993018] 
Stopping all fetchers
02 Nov 2016 22:20:06,241 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1],
 Shutting down
02 Nov 2016 22:20:06,242 WARN  
[ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1]
 (kafka.utils.Logging$class.warn:83)  - Reconnect due to socket error: null
02 Nov 2016 22:20:06,242 INFO  
[ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1]
 (kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1],
 Stopped 
02 Nov 2016 22:20:06,242 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21-0-1],
 Shutdown completed
02 Nov 2016 22:20:06,242 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478087993018] 
All connections stopped
02 Nov 2016 22:20:06,243 INFO  
[ZkClient-EventThread-42-SATL2036:2181,SATL2037:2181,SATL2038:2181/kafka] 
(org.I0Itec.zkclient.ZkEventThread.run:82)  - Terminate ZkClient event thread.
02 Nov 2016 22:20:06,244 INFO  [agent-shutdown-hook] 
(org.apache.zookeeper.ZooKeeper.close:684)  - Session: 0x356eb2d4b833fab closed
02 Nov 2016 22:20:06,244 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21], ZKConsumerConnector 
shut down completed
02 Nov 2016 22:20:06,244 INFO  
[SinkRunner-PollingRunner-FailoverSinkProcessor-EventThread] 
(org.apache.zookeeper.ClientCnxn$EventThread.run:512)  - EventThread shut down
02 Nov 2016 22:20:06,246 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - Shutting down producer
02 Nov 2016 22:20:06,247 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - Closing all sync producers
02 Nov 2016 22:20:06,256 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - Disconnecting from 10.25.20.36:9092
02 Nov 2016 22:20:06,256 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:150)  - Component 
type: CHANNEL, name: c1 stopped
02 Nov 2016 22:20:06,256 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:156)  - Shutdown 
Metric for type: CHANNEL, name: c1. channel.start.time == 1478087992836
02 Nov 2016 22:20:06,256 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:162)  - Shutdown 
Metric for type: CHANNEL, name: c1. channel.stop.time == 1478096406256
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: CHANNEL, name: c1. channel.capacity == 0
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: CHANNEL, name: c1. channel.current.size == 0
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: CHANNEL, name: c1. channel.event.put.attempt == 0
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: CHANNEL, name: c1. channel.event.put.success == 343
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: CHANNEL, name: c1. channel.event.take.attempt == 0
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: CHANNEL, name: c1. channel.event.take.success == 342
02 Nov 2016 22:20:06,257 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: CHANNEL, name: c1. channel.kafka.commit.time == 201
02 Nov 2016 22:20:06,258 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: CHANNEL, name: c1. channel.kafka.event.get.time == 531
02 Nov 2016 22:20:06,258 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: CHANNEL, name: c1. channel.kafka.event.send.time == 1789
02 Nov 2016 22:20:06,258 INFO  [agent-shutdown-hook] 
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  - Shutdown 
Metric for type: CHANNEL, name: c1. channel.rollback.count == 0
02 Nov 2016 22:20:06,258 INFO  [agent-shutdown-hook] 
(org.apache.flume.channel.kafka.KafkaChannel.stop:123)  - Kafka channel c1 
stopped. Metrics: CHANNEL:c1{channel.event.put.attempt=0, 
channel.event.put.success=343, channel.kafka.event.get.time=531, 
channel.current.size=0, channel.event.take.attempt=0, 
channel.event.take.success=342, channel.kafka.event.send.time=1789, 
channel.capacity=0, channel.kafka.commit.time=201, channel.rollback.count=0}
02 Nov 2016 22:20:06,264 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332)  - 
Error while getting events from Kafka
java.lang.InterruptedException
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
        at 
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
        at 
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306)
        at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97)
        at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,274 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.sink.kafka.KafkaSink.process:139)  - Failed to publish events
org.apache.flume.ChannelException: Error while getting events from Kafka
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
        at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97)
        at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
        at 
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
        at 
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306)
        ... 6 more
02 Nov 2016 22:20:06,275 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. 
Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:150)
        at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from 
Kafka
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
        at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97)
        ... 3 more
Caused by: java.lang.InterruptedException
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
        at 
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
        at 
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306)
        ... 6 more
02 Nov 2016 22:20:06,794 INFO  
[flume_tbox-topic_SATL2036-1478087994030-54387da2_watcher_executor] 
(kafka.utils.Logging$class.info:68)  - 
[flume_tbox-topic_SATL2036-1478087994030-54387da2], stopping watcher executor 
thread for consumer flume_tbox-topic_SATL2036-1478087994030-54387da2
02 Nov 2016 22:20:06,816 INFO  
[flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21_watcher_executor] 
(kafka.utils.Logging$class.info:68)  - 
[flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21], stopping watcher 
executor thread for consumer 
flume-channel-tbox-topic_SATL2036-1478087992871-1a60fb21
02 Nov 2016 22:20:06,887 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] 
(org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332)  - 
Error while getting events from Kafka
java.util.NoSuchElementException
        at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
        at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
        at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
        at 
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,888 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] 
(org.apache.flume.sink.hdfs.HDFSEventSink.process:459)  - process failed
org.apache.flume.ChannelException: Error while getting events from Kafka
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
        at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
        at 
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
        at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
        at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
        ... 6 more
02 Nov 2016 22:20:06,889 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] 
(org.apache.flume.sink.FailoverSinkProcessor.process:185)  - Sink k1 failed and 
has been sent to failover list
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
Error while getting events from Kafka
        at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
        at 
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from 
Kafka
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
        at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
        ... 3 more
Caused by: java.util.NoSuchElementException
        at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
        at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
        ... 6 more
02 Nov 2016 22:20:06,896 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] 
(org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332)  - 
Error while getting events from Kafka
java.util.NoSuchElementException
        at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
        at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
        at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
        at 
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,897 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] 
(org.apache.flume.sink.hdfs.HDFSEventSink.process:459)  - process failed
org.apache.flume.ChannelException: Error while getting events from Kafka
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
        at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
        at 
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
        at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
        at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
        ... 6 more
02 Nov 2016 22:20:06,898 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] 
(org.apache.flume.sink.FailoverSinkProcessor.process:185)  - Sink k2 failed and 
has been sent to failover list
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
Error while getting events from Kafka
        at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
        at 
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from 
Kafka
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
        at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
        ... 3 more
Caused by: java.util.NoSuchElementException
        at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
        at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
        ... 6 more
02 Nov 2016 22:20:06,898 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] 
(org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake:332)  - 
Error while getting events from Kafka
java.util.NoSuchElementException
        at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
        at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
        at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
        at 
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:06,898 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] 
(org.apache.flume.sink.hdfs.HDFSEventSink.process:459)  - process failed
org.apache.flume.ChannelException: Error while getting events from Kafka
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
        at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
        at 
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
        at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
        at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
        ... 6 more
02 Nov 2016 22:20:06,899 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] 
(org.apache.flume.sink.FailoverSinkProcessor.process:185)  - Sink k3 failed and 
has been sent to failover list
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
Error while getting events from Kafka
        at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
        at 
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from 
Kafka
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
        at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
        ... 3 more
Caused by: java.util.NoSuchElementException
        at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
        at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
        ... 6 more
02 Nov 2016 22:20:06,899 ERROR [SinkRunner-PollingRunner-FailoverSinkProcessor] 
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. 
Exception follows.
org.apache.flume.EventDeliveryException: All sinks failed to process, nothing 
left to failover to
        at 
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:191)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
02 Nov 2016 22:20:07,216 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[flume_tbox_parsed_SATL2036-1478088061094-7badc6c0], ZKConsumerConnector 
shutting down
02 Nov 2016 22:20:07,217 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478088061100] 
Stopping leader finder thread
02 Nov 2016 22:20:07,217 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread], 
Shutting down
02 Nov 2016 22:20:07,218 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread], 
Shutdown completed
02 Nov 2016 22:20:07,218 INFO  
[flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread] 
(kafka.utils.Logging$class.info:68)  - 
[flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-leader-finder-thread], 
Stopped 
02 Nov 2016 22:20:07,218 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478088061100] 
Stopping all fetchers
02 Nov 2016 22:20:07,218 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2], 
Shutting down
02 Nov 2016 22:20:07,219 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2], 
Shutdown completed
02 Nov 2016 22:20:07,219 INFO  
[ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-2], 
Stopped 
02 Nov 2016 22:20:07,220 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1], 
Shutting down
02 Nov 2016 22:20:07,220 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1], 
Shutdown completed
02 Nov 2016 22:20:07,220 INFO  
[ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1] 
(kafka.utils.Logging$class.info:68)  - 
[ConsumerFetcherThread-flume_tbox_parsed_SATL2036-1478088061094-7badc6c0-0-1], 
Stopped 
02 Nov 2016 22:20:07,221 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - [ConsumerFetcherManager-1478088061100] 
All connections stopped
02 Nov 2016 22:20:07,223 INFO  
[ZkClient-EventThread-48-SATL2036:2181,SATL2037:2181,SATL2038:2181/kafka] 
(org.I0Itec.zkclient.ZkEventThread.run:82)  - Terminate ZkClient event thread.
02 Nov 2016 22:20:07,226 INFO  
[flume_tbox_parsed_SATL2036-1478088061094-7badc6c0_watcher_executor] 
(kafka.utils.Logging$class.info:68)  - 
[flume_tbox_parsed_SATL2036-1478088061094-7badc6c0], stopping watcher executor 
thread for consumer flume_tbox_parsed_SATL2036-1478088061094-7badc6c0
02 Nov 2016 22:20:07,226 INFO  [agent-shutdown-hook] 
(org.apache.zookeeper.ZooKeeper.close:684)  - Session: 0x156eb2d4a70421e closed
02 Nov 2016 22:20:07,226 INFO  [lifecycleSupervisor-1-0-EventThread] 
(org.apache.zookeeper.ClientCnxn$EventThread.run:512)  - EventThread shut down
02 Nov 2016 22:20:07,227 INFO  [agent-shutdown-hook] 
(kafka.utils.Logging$class.info:68)  - 
[flume_tbox_parsed_SATL2036-1478088061094-7badc6c0], ZKConsumerConnector shut 
down completed



Issue 2:
03 Nov 2016 13:31:29,287 INFO  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.info:68)  - Back off for 100 ms before retrying 
send. Remaining retries = 1
03 Nov 2016 13:31:29,388 INFO  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.info:68)  - Fetching metadata from broker 
id:1,host:SATL2037,port:9092 with correlation id 2307612 for 1 topic(s) 
Set(channel-tbox-topic)
03 Nov 2016 13:31:29,388 INFO  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.info:68)  - Connected to SATL2037:9092 for producing
03 Nov 2016 13:31:29,389 INFO  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.info:68)  - Disconnecting from SATL2037:9092
03 Nov 2016 13:31:29,443 INFO  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.info:68)  - Connected to SATL2037:9092 for producing
03 Nov 2016 13:31:29,549 INFO  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.info:68)  - Disconnecting from SATL2037:9092
03 Nov 2016 13:31:29,550 WARN  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.warn:89)  - Failed to send producer request with 
correlation id 2308613 to broker 2 with data for partitions 
[channel-tbox-topic,4]
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
        at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
        at sun.nio.ch.IOUtil.write(IOUtil.java:148)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
        at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
        at 
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
        at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
        at 
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
        at 
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
        at 
org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
        at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
        at java.lang.Thread.run(Thread.java:745)
03 Nov 2016 13:31:29,550 INFO  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.info:68)  - Back off for 100 ms before retrying 
send. Remaining retries = 0
03 Nov 2016 13:31:29,651 INFO  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.info:68)  - Fetching metadata from broker 
id:0,host:SATL2036,port:9092 with correlation id 2308614 for 1 topic(s) 
Set(channel-tbox-topic)
03 Nov 2016 13:31:29,651 INFO  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.info:68)  - Connected to SATL2036:9092 for producing
03 Nov 2016 13:31:29,652 INFO  [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.info:68)  - Disconnecting from SATL2036:9092
03 Nov 2016 13:31:29,653 ERROR [PollableSourceRunner-KafkaSource-r1] 
(kafka.utils.Logging$class.error:97)  - Failed to send requests for topics 
channel-tbox-topic with correlation ids in [2304607,2308614]
03 Nov 2016 13:31:29,653 WARN  [PollableSourceRunner-KafkaSource-r1] 
(org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit:363)  - 
Sending events to Kafka failed
kafka.common.FailedToSendMessageException: Failed to send messages after 3 
tries.
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
        at 
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
        at 
org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
        at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
        at java.lang.Thread.run(Thread.java:745)
03 Nov 2016 13:31:29,653 ERROR [PollableSourceRunner-KafkaSource-r1] 
(org.apache.flume.source.kafka.KafkaSource.process:153)  - KafkaSource 
EXCEPTION, {}
org.apache.flume.ChannelException: Unable to put batch on required channel: 
org.apache.flume.channel.kafka.KafkaChannel{name: c1}
        at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
        at 
org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)
        at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Commit failed as send to Kafka 
failed
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:364)
        at 
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
        ... 3 more
Caused by: kafka.common.FailedToSendMessageException: Failed to send messages 
after 3 tries.
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at 
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
        ... 5 more







> After a leader change, messages sent with ack=0 are lost
> --------------------------------------------------------
>
>                 Key: KAFKA-955
>                 URL: https://issues.apache.org/jira/browse/KAFKA-955
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.0
>            Reporter: Jason Rosenberg
>            Assignee: Guozhang Wang
>             Fix For: 0.8.0
>
>         Attachments: KAFKA-955-followup.v1.patch, KAFKA-955.v1.patch, 
> KAFKA-955.v1.patch, KAFKA-955.v2.patch, KAFKA-955.v3.patch, 
> KAFKA-955.v4.patch, KAFKA-955.v5.patch, KAFKA-955.v6.patch, KAFKA-955.v7.patch
>
>
> If the leader changes for a partition, and a producer is sending messages 
> with ack=0, then messages will be lost, since the producer has no active way 
> of knowing that the leader has changed, until it's next metadata refresh 
> update.
> The broker receiving the message, which is no longer the leader, logs a 
> message like this:
> Produce request with correlation id 7136261 from client  on partition 
> [mytopic,0] failed due to Leader not local for partition [mytopic,0] on 
> broker 508818741
> This is exacerbated by the controlled shutdown mechanism, which forces an 
> immediate leader change.
> A possible solution to this would be for a broker which receives a message, 
> for a topic that it is no longer the leader for (and if the ack level is 0), 
> then the broker could just silently forward the message over to the current 
> leader.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to