Chris, Partitioning is independent of serialization. The key data is serialized using the serializer specified through the "key.serializer.class" property in the Producer. It defaults to a no-op encoder if you don't specify one. Here, since you want to use an integer key, you'd have to plug in an Encoder that can serialize integer data to a byte array.
Thanks, Neha On Mon, Feb 25, 2013 at 11:50 AM, Chris Curtin <curtin.ch...@gmail.com>wrote: > Hi, > > In an earlier thread about partitioning on 0.8 I read that you can provide > a key to the KeyedMessage constructor and all the messages with the key > would end up in the same partition, even if you don't provide a partition > function (vs the random assignment of a message to a partition). > > When I do this I get an runtime error: > > Code: > > long events = Long.parseLong(args[1]); > int blocks = Integer.parseInt(args[2]); > > Random rnd = new Random(); > > Properties props = new Properties(); > props.put("broker.list", > "vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092"); > props.put("serializer.class", "kafka.serializer.StringEncoder"); > ProducerConfig config = new ProducerConfig(props); > > Producer<Integer, String> producer = new Producer<Integer, > String>(config); > for (int nBlocks = 0; nBlocks < blocks; nBlocks++) { > for (long nEvents = 0; nEvents < events; nEvents++) { > long runtime = new Date().getTime(); > String msg = runtime + "," + (50 + nBlocks) + "," + > nEvents+ "," + rnd.nextInt(1000); > KeyedMessage<Integer, String> data = new > KeyedMessage<Integer, String>("test1", nBlocks, msg); > producer.send(data); > } > } > producer.close(); > > Runtime error: > > 0 [main] INFO kafka.utils.VerifiableProperties - Verifying properties > 33 [main] INFO kafka.utils.VerifiableProperties - Property broker.list > is overridden to vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092 > 33 [main] INFO kafka.utils.VerifiableProperties - Property > serializer.class is overridden to kafka.serializer.StringEncoder > Exception in thread "main" java.lang.ClassCastException: java.lang.Integer > cannot be cast to java.lang.String > at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46) > at > > kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:126) > at > > kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:123) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:32) > at > > kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:123) > at > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:54) > at kafka.producer.Producer.send(Producer.scala:76) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > at com.silverpop.kafka.playproducer.TestProducer.main(TestProducer.java:31) > > Changing the logic to use my own partitioner that accepts a String vs. an > Integer above works correctly. So do I always need to define a partitioning > function? > > Thanks, > > Chris >