Pradeep,

If you are writing a POC, I'd suggest you do that using the new producer
APIs<http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/producer/Producer.html>.
These are much easier to use, exposes more functionality and the new
producer is faster than the older one. It is currently in beta, slated for
release in 0.8.2 or 0.9 and we are working on stabilizing it, but it should
work great for your POC. We'd love to hear feedback on the APIs.

Thanks,
Neha


On Tue, May 20, 2014 at 10:51 AM, Kumar Pradeep <kprad...@novell.com> wrote:

> Thanks Pushkar for your response.
>
> I tried to send my own byte array; however the Kafka Producer Class does
> not take byte [] as input type. Do you have an example of this? Please
> share if you do; really appreciate.
>
> Here is my code:
>
>
> public class TestEventProducer {
>     public static void main(String[] args) {
>
>      String topic = "test-topic";
>      long eventsNum = 10;
>
>         Properties props = new Properties();
>         props.put("metadata.broker.list", "localhost:9092");
>         props.put("serializer.class", "kafka.serializer.DefaultEncoder ");
>         props.put("request.required.acks", "0");
>         ProducerConfig config = new ProducerConfig(props);
>
>         byte [] rawData;
>         Producer<String, rawData> producer = new Producer<String,
> rawData>(config); //compillation error rawData cannot be resolved to a type
>
>         long start = System.currentTimeMillis();
>
>         for (long nEvents = 0; nEvents < eventsNum; nEvents++) {
>
>              SimulateEvent event = new SimulateEvent();
>              try {
>                 rawData = Serializer.serialize(event);
>               } catch (IOException e) {
>                 e.printStackTrace();
>               }
>             KeyedMessage<String, rawData> data = new KeyedMessage<String,
> rawData>(topic, event);
>             producer.send(data);
>             System.out.println("produced event#:" + nEvents + " "+ data);
>         }
>         System.out.println("Took " + (System.currentTimeMillis() - start)
> + "to produce " + eventsNum + "messages");
>         producer.close();
>     }
> }
>
> public class Serializer {
>     public static byte[] serialize(Object obj) throws IOException {
>         ByteArrayOutputStream b = new ByteArrayOutputStream();
>         ObjectOutputStream o = new ObjectOutputStream(b);
>         o.writeObject(obj);
>         return b.toByteArray();
>     }
>
>     public static Object deserialize(byte[] bytes) throws IOException,
> ClassNotFoundException {
>         ByteArrayInputStream b = new ByteArrayInputStream(bytes);
>         ObjectInputStream o = new ObjectInputStream(b);
>         return o.readObject();
>     }
> }
>
> >>> pushkar priyadarshi <priyadarshi.push...@gmail.com> 5/20/2014 5:11 PM
> >>>
> you can send byte[] that you get by using your own serializer ; through
>
> kafka ().On the reciving side u can deseraialize from the byte[] and read
>
> back your object.for using this you will have to
>
> supply serializer.class=kafka.serializer.DefaultEncoder in the properties.
>
>
>
> On Tue, May 20, 2014 at 4:23 PM, Kumar Pradeep <kprad...@novell.com>
> wrote:
>
>
> > I am trying to build a POC with Kafka 0.8.1. I am using my own java class
>
> > as a Kafka message which has a bunch of String data types. For
>
> > serializer.class property in my producer, I cannot use the default
>
> > serializer class or the String serializer class that comes with Kafka
>
> > library. I guess I need to write my own serializer and feed it to the
>
> > producer properties. If you are aware of writing an example custom
>
> > serializer in Kafka (in java), please do share. Appreciate a lot, thanks
>
> > much.
>
> >
>
> > I tried to use something like below, but I get the exception: Exception
> in
>
> > thread "main" java.lang.NoSuchMethodException:
>
> > test.EventsDataSerializer.<init>(kafka.utils.VerifiableProperties)
>
> >  at java.lang.Class.getConstructor0(Class.java:2971)
>
> >
>
> >
>
> > package test;
>
> >
>
> > import java.io.IOException;
>
> >
>
> > import com.fasterxml.jackson.core.JsonFactory;
>
> > import com.fasterxml.jackson.databind.ObjectMapper;
>
> >
>
> > import kafka.message.Message;
>
> > import kafka.serializer.Decoder;
>
> > import kafka.serializer.Encoder;
>
> >
>
> > public  class EventsDataSerializer implements Encoder<SimulateEvent>,
>
> > Decoder<SimulateEvent> {
>
> >
>
> >  public Message toMessage(SimulateEvent eventDetails) {
>
> >         try {
>
> >             ObjectMapper mapper = new ObjectMapper(new JsonFactory());
>
> >             byte[] serialized = mapper.writeValueAsBytes(eventDetails);
>
> >             return new Message(serialized);
>
> >         } catch (IOException e) {
>
> >             e.printStackTrace();
>
> >             return null;   // TODO
>
> >         }
>
> > }
>
> >     public SimulateEvent toEvent(Message message) {
>
> >      SimulateEvent event = new SimulateEvent();
>
> >
>
> >         ObjectMapper mapper = new ObjectMapper(new JsonFactory());
>
> >         try {
>
> >             //TODO handle error
>
> >             return mapper.readValue(message.payload().array(),
>
> > SimulateEvent.class);
>
> >         } catch (IOException e) {
>
> >             e.printStackTrace();
>
> >             return null;
>
> >         }
>
> >
>
> >     }
>
> >
>
> >  public byte[] toBytes(SimulateEvent arg0) {
>
> >   // TODO Auto-generated method stub
>
> >   return null;
>
> >  }
>
> >  public SimulateEvent fromBytes(byte[] arg0) {
>
> >   // TODO Auto-generated method stub
>
> >   return null;
>
> >  }
>
> > }
>
> >
>
> >
>
> >
>
>

Reply via email to