Update: There was no way to make it work with the javaapi one.
I made it work using the scala interface from java. The code looks a bit ugly as I had to create a scala set from java. For example: @Override public Seq<QueueItem<String>> afterDequeuingExistingData(QueueItem<String> queueItem) { if (queueItem == null) { return new scala.collection.mutable.ArraySeq<QueueItem<String>>(0); } MutableList<QueueItem<String>> seq = new MutableList<QueueItem<String>>(); seq.$plus$eq(queueItem); return seq; } Should I file an issue for this? On Thu, Jan 30, 2014 at 9:53 AM, Patricio Echagüe <patric...@gmail.com>wrote: > one more thing. Using the scala callback handler from java code seems to > work but I'm having a hard time creating scala Seqs from java code to make > my handler compatible with the scala signature. > > > On Thu, Jan 30, 2014 at 8:10 AM, Patricio Echagüe <patric...@gmail.com>wrote: > >> Jun, we've been using Kafka for more than two years. Both the key and the >> value are type string. That doesn't seem to be the problem. >> >> I just can't start the application when setting the callback handler >> which I tried with string and byte[]. >> >> The reason I want to use the handler is to send metrics that statsd. >> >> What else can I try to make this callback work? >> >> Sent from my Nexus 4. >> On Jan 30, 2014 8:02 AM, "Jun Rao" <jun...@gmail.com> wrote: >> >>> Yes, you should implement kafka.javaapi.producer.async package. >>> Internally, >>> we wrap that callback with a scala callback. When instantiating the >>> producer, you need to provide 2 types, the first one for key and the >>> second >>> one for value. Make sure the second one is of type byte[]. >>> >>> Thanks, >>> >>> Jun >>> >>> >>> On Wed, Jan 29, 2014 at 10:17 PM, Patricio Echagüe <patric...@gmail.com >>> >wrote: >>> >>> > It's String. I also tried with the generic type String. >>> > >>> > The CallbackHandler interface I implement is the one in >>> > kafka.javaapi.producer.async package. Is that the right one? >>> > I'm a bit confused because the exception mentions kafka.producer.async. >>> > CallbackHandler. >>> > >>> > >>> > On Wed, Jan 29, 2014 at 9:05 PM, Jun Rao <jun...@gmail.com> wrote: >>> > >>> > > Is your producer instantiated with type byte[]? >>> > > >>> > > Thanks, >>> > > >>> > > Jun >>> > > >>> > > >>> > > On Wed, Jan 29, 2014 at 7:25 PM, Patricio Echagüe < >>> patric...@gmail.com >>> > > >wrote: >>> > > >>> > > > I'm trying to set a callback handler from java. >>> > > > >>> > > > For that, I created a Callback Handler this way: >>> > > > >>> > > > public class KafkaAsyncCallbackHandler implements >>> > > CallbackHandler<byte[]> { >>> > > > } >>> > > > >>> > > > and set the property >>> > > > >>> > > > callback.handler=com.lucid.kafka.KafkaAsyncCallbackHandler >>> > > > >>> > > > >>> > > > But on runtime I get this exception coming from kafka: >>> > > > >>> > > > Caused by: java.lang.ClassCastException: >>> > > > com.lucid.kafka.KafkaAsyncCallbackHandler cannot be cast to >>> > > > kafka.producer.async.CallbackHandler >>> > > > at kafka.producer.ProducerPool.<init>(ProducerPool.scala:62) >>> > > > at kafka.javaapi.producer.Producer.<init>(Producer.scala:41) >>> > > > at >>> > > > >>> > > >>> > >>> com.lucid.dao.queue.impl.kafka.KafkaProducer.<init>(KafkaProducer.java:29) >>> > > > at >>> com.lucid.dao.guice.DAOModule.provideProducer(DAOModule.java:448) >>> > > > >>> > > > We are on kafka 0.7.1 >>> > > > >>> > > > Am I doing anything wrong here? >>> > > > >>> > > > Thanks >>> > > > Patricio >>> > > > >>> > > >>> > >>> >> >