Update:

There was no way to make it work with the javaapi one.

I made it work using the scala interface from java. The code looks a bit
ugly as I had to create a scala set from java.

For example:

@Override
public Seq<QueueItem<String>> afterDequeuingExistingData(QueueItem<String>
queueItem) {
 if (queueItem == null) {
return new scala.collection.mutable.ArraySeq<QueueItem<String>>(0);
 }

MutableList<QueueItem<String>> seq = new MutableList<QueueItem<String>>();
 seq.$plus$eq(queueItem);
return seq;
}

Should I file an issue for this?


On Thu, Jan 30, 2014 at 9:53 AM, Patricio Echagüe <patric...@gmail.com>wrote:

> one more thing. Using the scala callback handler from java code seems to
> work but I'm having a hard time creating scala Seqs from java code to make
> my handler compatible with the scala signature.
>
>
> On Thu, Jan 30, 2014 at 8:10 AM, Patricio Echagüe <patric...@gmail.com>wrote:
>
>> Jun, we've been using Kafka for more than two years. Both the key and the
>> value are type string. That doesn't seem to be the problem.
>>
>> I just can't start the application when setting the callback handler
>> which I tried with string and byte[].
>>
>> The reason I want to use the handler is to send metrics that statsd.
>>
>> What else can I try to make this callback work?
>>
>> Sent from my Nexus 4.
>> On Jan 30, 2014 8:02 AM, "Jun Rao" <jun...@gmail.com> wrote:
>>
>>> Yes, you should implement kafka.javaapi.producer.async package.
>>> Internally,
>>> we wrap that callback with a scala callback. When instantiating the
>>> producer, you need to provide 2 types, the first one for key and the
>>> second
>>> one for value. Make sure the second one is of type byte[].
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
>>> On Wed, Jan 29, 2014 at 10:17 PM, Patricio Echagüe <patric...@gmail.com
>>> >wrote:
>>>
>>> > It's String. I also tried with the generic type String.
>>> >
>>> > The CallbackHandler interface I implement is the one in
>>> > kafka.javaapi.producer.async package. Is that the right one?
>>> > I'm a bit confused because the exception mentions kafka.producer.async.
>>> > CallbackHandler.
>>> >
>>> >
>>> > On Wed, Jan 29, 2014 at 9:05 PM, Jun Rao <jun...@gmail.com> wrote:
>>> >
>>> > > Is your producer instantiated with type byte[]?
>>> > >
>>> > > Thanks,
>>> > >
>>> > > Jun
>>> > >
>>> > >
>>> > > On Wed, Jan 29, 2014 at 7:25 PM, Patricio Echagüe <
>>> patric...@gmail.com
>>> > > >wrote:
>>> > >
>>> > > > I'm trying to set a callback handler from java.
>>> > > >
>>> > > > For that, I created a Callback Handler this way:
>>> > > >
>>> > > > public class KafkaAsyncCallbackHandler implements
>>> > > CallbackHandler<byte[]> {
>>> > > > }
>>> > > >
>>> > > > and set the property
>>> > > >
>>> > > > callback.handler=com.lucid.kafka.KafkaAsyncCallbackHandler
>>> > > >
>>> > > >
>>> > > > But on runtime I get this exception coming from kafka:
>>> > > >
>>> > > > Caused by: java.lang.ClassCastException:
>>> > > > com.lucid.kafka.KafkaAsyncCallbackHandler cannot be cast to
>>> > > > kafka.producer.async.CallbackHandler
>>> > > >  at kafka.producer.ProducerPool.<init>(ProducerPool.scala:62)
>>> > > > at kafka.javaapi.producer.Producer.<init>(Producer.scala:41)
>>> > > >  at
>>> > > >
>>> > >
>>> >
>>> com.lucid.dao.queue.impl.kafka.KafkaProducer.<init>(KafkaProducer.java:29)
>>> > > > at
>>> com.lucid.dao.guice.DAOModule.provideProducer(DAOModule.java:448)
>>> > > >
>>> > > > We are on kafka 0.7.1
>>> > > >
>>> > > > Am I doing anything wrong here?
>>> > > >
>>> > > > Thanks
>>> > > > Patricio
>>> > > >
>>> > >
>>> >
>>>
>>
>

Reply via email to