[
https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexis Midon updated KAFKA-1702:
--------------------------------
Comment: was deleted
(was: diff --git
core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index d8ac915..0f7f941 100644
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -95,27 +95,28 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
val partitionedDataOpt = partitionAndCollate(messages)
partitionedDataOpt match {
case Some(partitionedData) =>
- val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
- try {
- for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
- if (logger.isTraceEnabled)
- messagesPerBrokerMap.foreach(partitionAndEvent =>
- trace("Handling event for Topic: %s, Broker: %d, Partitions:
%s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
- val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
-
- val failedTopicPartitions = send(brokerid, messageSetPerBroker)
- failedTopicPartitions.foreach(topicPartition => {
- messagesPerBrokerMap.get(topicPartition) match {
- case Some(data) => failedProduceRequests.appendAll(data)
- case None => // nothing
- }
- })
+ val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]]
+ for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
+ if (logger.isTraceEnabled) {
+ messagesPerBrokerMap.foreach(partitionAndEvent =>
+ trace("Handling event for Topic: %s, Broker: %d, Partitions:
%s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+ }
+ val messageSetPerBrokerOpt = groupMessagesToSet(messagesPerBrokerMap)
+ messageSetPerBrokerOpt match {
+ case Some(messageSetPerBroker) =>
+ val failedTopicPartitions = send(brokerid, messageSetPerBroker)
+ failedTopicPartitions.foreach(topicPartition => {
+ messagesPerBrokerMap.get(topicPartition) match {
+ case Some(data) => failedProduceRequests.appendAll(data)
+ case None => // nothing
+ }
+ })
+ case None => // failed to group messages
+ messagesPerBrokerMap.values.foreach(m =>
failedProduceRequests.appendAll(m))
}
- } catch {
- case t: Throwable => error("Failed to send messages", t)
}
failedProduceRequests
- case None => // all produce requests failed
+ case None => // failed to collate messages
messages
}
}
@@ -290,43 +291,46 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
}
- private def groupMessagesToSet(messagesPerTopicAndPartition:
collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
+ private def groupMessagesToSet(messagesPerTopicAndPartition:
collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]) = {
/** enforce the compressed.topics config here.
- * If the compression codec is anything other than NoCompressionCodec,
- * Enable compression only for specified topics if any
- * If the list of compressed topics is empty, then enable the
specified compression codec for all topics
- * If the compression codec is NoCompressionCodec, compression is
disabled for all topics
+ * If the compression codec is anything other than NoCompressionCodec,
+ * Enable compression only for specified topics if any
+ * If the list of compressed topics is empty, then enable the specified
compression codec for all topics
+ * If the compression codec is NoCompressionCodec, compression is
disabled for all topics
*/
-
- val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case
(topicAndPartition, messages) =>
- val rawMessages = messages.map(_.message)
- ( topicAndPartition,
- config.compressionCodec match {
- case NoCompressionCodec =>
- debug("Sending %d messages with no compression to
%s".format(messages.size, topicAndPartition))
- new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
- case _ =>
- config.compressedTopics.size match {
- case 0 =>
- debug("Sending %d messages with compression codec %d to %s"
- .format(messages.size, config.compressionCodec.codec,
topicAndPartition))
- new ByteBufferMessageSet(config.compressionCodec, rawMessages:
_*)
- case _ =>
- if(config.compressedTopics.contains(topicAndPartition.topic)) {
+ try {
+ val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case
(topicAndPartition, messages) =>
+ val rawMessages = messages.map(_.message)
+ (topicAndPartition,
+ config.compressionCodec match {
+ case NoCompressionCodec =>
+ debug("Sending %d messages with no compression to
%s".format(messages.size, topicAndPartition))
+ new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
+ case _ =>
+ config.compressedTopics.size match {
+ case 0 =>
debug("Sending %d messages with compression codec %d to %s"
.format(messages.size, config.compressionCodec.codec,
topicAndPartition))
new ByteBufferMessageSet(config.compressionCodec,
rawMessages: _*)
- }
- else {
- debug("Sending %d messages to %s with no compression as it
is not in compressed.topics - %s"
- .format(messages.size, topicAndPartition,
config.compressedTopics.toString))
- new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
- }
- }
- }
- )
+ case _ =>
+ if
(config.compressedTopics.contains(topicAndPartition.topic)) {
+ debug("Sending %d messages with compression codec %d to %s"
+ .format(messages.size, config.compressionCodec.codec,
topicAndPartition))
+ new ByteBufferMessageSet(config.compressionCodec,
rawMessages: _*)
+ }
+ else {
+ debug("Sending %d messages to %s with no compression as it
is not in compressed.topics - %s"
+ .format(messages.size, topicAndPartition,
config.compressedTopics.toString))
+ new ByteBufferMessageSet(NoCompressionCodec, rawMessages:
_*)
+ }
+ }
+ }
+ )
+ }
+ Some(messagesPerTopicPartition)
+ } catch {
+ case t: Throwable => error("Failed to group messages", t); None
}
- messagesPerTopicPartition
}
def close() {
)
> Messages silently Lost by producer
> ----------------------------------
>
> Key: KAFKA-1702
> URL: https://issues.apache.org/jira/browse/KAFKA-1702
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 0.8.1.1
> Reporter: Alexis Midon
> Assignee: Jun Rao
> Attachments: KAFKA-1702.0.patch
>
>
> Hello,
> we lost millions of messages because of this {{try/catch}} in the producer
> {{DefaultEventHandler}}:
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116
> If a Throwable is caught by this {{try/catch}}, the retry policy will have no
> effect and all yet-to-be-sent messages are lost (the error will break the
> loop over the broker list).
> This issue is very hard to detect because: the producer (async or sync)
> cannot even catch the error, and *all* the metrics are updated as if
> everything was fine.
> Only the abnormal drop in the producers network I/O, or the incoming message
> rate on the brokers; or the alerting on errors in producer logs could have
> revealed the issue.
> This behavior was introduced by KAFKA-300. I can't see a good reason for it,
> so here is a patch that will let the retry-policy do its job when such a
> {{Throwable}} occurs.
> Thanks in advance for your help.
> Alexis
> ps: you might wonder how could this {{try/catch}} ever caught something?
> {{DefaultEventHandler#groupMessagesToSet}} looks so harmless.
> Here are the details:
> We use Snappy compression. When the native snappy library is not installed on
> the host, Snappy, during the initialization of class
> {{org.xerial.snappy.Snappy}} will [write a C
> library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312]
> in the JVM temp directory {{java.io.tmpdir}}.
> In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an
> instance reboot (thank you
> [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp
> directory was removed. The JVM was then running with a non-existing temp dir.
> Snappy class would be impossible to initialize and the following message
> would be silently logged:
> {code}
> ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler:
> Failed to send messages
> ! java.lang.NoClassDefFoundError: Could not initialize class
> org.xerial.snappy.Snappy
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)