Hi Dave, This change was introduced in https://issues.apache.org/jira/browse/KAFKA-1755 for compacted topics.
> > Interestingly, none of the messages currently going to the topic use > message > compaction (i.e. they all have empty keys), although at some time in the > past > I may have sent a few messages with keys. Message compaction is being > used for other topics. So, the 0.9.0.0 version of the broker seems to > think the > topic is compacted while the 0.8.2.1 broker apparently doesn't think so. > Does > this shed any light on things? > > Also I notice the error message says "Compacted topic", which suggests that > compaction is a property of the topic, and not individual messages as > Yes - compaction is a topic-level property. You can use --describe to verify that the topic is compacted or not and if you didn't intend it to be compacted you can alter the configuration. I thought it was ok to send messages > both > with and without a key to the same topic, thus having compaction enabled > for > only a subset of the messages. Is this incorrect? > In 0.9 you cannot send unkeyed messages to compacted topics. In 0.8.x this would actually cause the log compaction thread to subsequently complain and quit (and stop compacting all compacted topics). We did consider the possibility of allowing producers to send both keyed and unkeyed but after discussion we felt it would be better to fail fast and prevent unkeyed messages from getting in. This was on the premise that supporting mixed messages and only compacting a subset that have keys may not work very well since the non-keyed messages would stick around indefinitely; however let me know if you think differently on this and we can revisit. Joel > Thanks, > Dave > > > [2016-01-20 19:21:44,923] ERROR [Replica Manager on Broker 172341926]: > Error processing append operation on partition [shown_news_stories,7] > (kafka.server.ReplicaManager) > kafka.message.InvalidMessageException: Compacted topic cannot accept > message without key. > at > > kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:250) > at kafka.log.Log.liftedTree1$1(Log.scala:327) > at kafka.log.Log.append(Log.scala:326) > at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:442) > at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:428) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) > at > kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428) > at > > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401) > at > > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386) > at > kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322) > at > kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366) > at kafka.server.KafkaApis.handle(KafkaApis.scala:68) > at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > > > > On Tue, Jan 19, 2016 at 2:50 AM, Ismael Juma <ism...@juma.me.uk> wrote: > > > Hi Dave, > > > > Do you get any errors logged in the broker when you get ACK error 2 > > (InvalidMessage) while producing requests to a mixed version cluster? It > > would be helpful to see them. > > > > With regards to the kafka-console-producer.sh error, did you use the > > 0.9.0.0 console producer with a mixed version cluster (ie some brokers > were > > on 0.8.2.1 while others were on 0.9.0.0)? If so, it is expected that it > > won't work correctly. All the brokers should be upgraded before the > clients > > are upgraded (otherwise the 0.8.2.1 broker will send a response that the > > newer clients cannot handle). > > > > Ismael > > > > On Fri, Jan 15, 2016 at 7:52 PM, Dave Peterson <d...@academia.edu> > wrote: > > > > > Hi Ismael, > > > > > > I'm using bruce (https://github.com/ifwe/bruce) to send the produce > > > requests, with a RequiredAcks value of 1. Everything works fine when > > > all brokers are running 0.8.2.1. Also if I set up a new 0.9.0.0 > > > cluster from scratch rather than trying to upgrade, everything works > > > fine. The problem only occurs after upgrading one broker in the > > > 3-broker cluster. > > > > > > The topic I am sending to has 8 partitions numbered 0-7. Doing > > > further experimentation I see that the ACK error 2 occurs only when > > > I send to partition 7. No problems occur when sending to partitions > > > 0-6. If it helps I can send output from "kafka-topics.sh --describe" > > > as well as tcpdump output showing the produce requests and responses. > > > > > > For comparison I tried using the 0.9.0.0 version of > > > kafka-console-producer.sh to send messages. With the default > > > RequiredAcks value of 0, it worked although I don't know which > > > partition it sent to. With a RequiredAcks value of 1 I get the > > > output shown below. > > > > > > Thanks, > > > Dave > > > > > > > > > > > > [2016-01-15 10:33:28,843] ERROR Uncaught error in kafka producer I/O > > > thread: (org.apache.kafka.clients.producer.internals.Sender) > > > org.apache.kafka.common.protocol.types.SchemaException: Error reading > > field > > > 'throttle_time_ms': java.nio.BufferUnderflowException > > > at > > > org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) > > > at > > > > > > > > > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:464) > > > at > > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) > > > at > > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > > > at > > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > On Fri, Jan 15, 2016 at 1:06 AM, Ismael Juma <ism...@juma.me.uk> > wrote: > > > > > > > Hi Dave, > > > > > > > > On Fri, Jan 15, 2016 at 2:04 AM, Dave Peterson <d...@academia.edu> > > > wrote: > > > > > > > > > I was trying to upgrade an 0.8.2.1 broker cluster to 0.9.0.0 by > > > following > > > > > the instructions here: > > > > > > > > > > http://kafka.apache.org/documentation.html#upgrade > > > > > > > > > > After upgrading one broker, with > > inter.broker.protocol.version=0.8.2.X > > > > > set, I get ACK error 2 (InvalidMessage) when I try to send produce > > > > > requests. > > > > > > > > > > > > I haven't seen other reports of this issue yet. Also, we have a > system > > > test > > > > that covers this scenario: > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/upgrade_test.py > > > > > > > > Just to double-check, what is the version of the producer that you > are > > > > using to send produce requests to the 0.9.0.0 broker when you get the > > > > error? > > > > > > > > Ismael > > > > > > > > > >