The examples code in AK repo uses JSON as the serde, whereas the examples
in CP repo used Avro, with the embedded schema. These two are different
serde schemes.

Using a schema registry for Avro / etc would be a better approach to you,
since it does not require each message to embed the schema info, but just a
schema id.

Guozhang


On Wed, Jul 6, 2016 at 8:12 PM, Philippe Derome <phder...@gmail.com> wrote:

> I think I should simply follow Kafka The Definitive Guide Chapter 3 for a
> good Avro producer example instead. It does not introduce some Jackson JSON
> layer and still provides the type safety using POJO generated classes from
> Avro.
>
> On Wed, Jul 6, 2016 at 9:20 PM, Philippe Derome <phder...@gmail.com>
> wrote:
>
> > The typed version of the example (PageViewTypedDemo) is what represents
> > some difficulty for someone new to Kafka (or CP3). *I think it would be
> > easier/quicker to complete the documentation of that example than to
> answer
> > my questions below....*
> >
> > I am reusing the same JsonPOJOSerializer and JsonPOJODeserializer classes
> > as in the Kafka example for PageViewTypedDemo and that leads me into
> > exceptions in Jackson JSON serialization. Could I be missing some
> > configuration? In debugger, I see that Jackson does not seem to like the
> > end of my data as it goes through my 11 fields apparently ok.
> >
> > On a processor similar to PageViewTypedDemo (mine with data more relevant
> > to what I have in mind and I mask my AVRO generated POJO class with
> > XXXXXXXX below), I write a producer in a separate app to target the
> > processor. I get following stack trace for a producer that would match
> the
> > producer (meaning using JsonPOJOSerializer to match the example that
> uses a
> > JsonPOJODeserializer).
> >
> > Here's what I do:
> > I set up normally Properties
> > I set up normally on a different object Map<String, Object> with
> > BootstrapServer and REGISTRY_URL
> >
> > final Serializer<XXXXXXXX> xxxXXXXXXXXSerializer = new
> JsonPOJOSerializer<>();
> >
> > propMap.put("JsonPOJOClass", XXXXXXXX.class);
> >
> > xxxXXXXXXXXSerializer.configure(propMap, false);   // *presumably*
> configures my POJO class with value of message?
> >
> > final StringSerializer strSerializer = new StringSerializer();
> >
> > final KafkaProducer<String, XXXXXXXX > producer1 =
> >
> >     new KafkaProducer<>(propMap, strSerializer, xxxXXXXXXXXSerializer);
> >
> > for (String[] user : users) {
> >
> >     XXXXXXXX x = new XXXXXXXX();
> >
> >     x.setPropertyA("something); // and many other properties/fields like
> this
> >
> >     producer1.send(new ProducerRecord<>("input-topic", user[1], x));
> >
> > }
> >
> > producer1.flush();
> >
> >
> > Exception in thread "main"
> > org.apache.kafka.common.errors.SerializationException: Error serializing
> > JSON message
> >
> > Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not an
> > enum:
> >
> {"type":"record","name":"XXXXXXXXX","namespace":"io.confluent.examples.streams.avro","fields":[{"name":"firstName","type":{"type":"string","avro.java.string":"String"}},{"name":"lastName","type":{"type":"string","avro.java.string":"String"}},{"name":"companyName","type":{"type":"string","avro.java.string":"String"}},{"name":"address","type":{"type":"string","avro.java.string":"String"}},{"name":"city","type":{"type":"string","avro.java.string":"String"}},{"name":"province","type":{"type":"string","avro.java.string":"String"}},{"name":"postal","type":{"type":"string","avro.java.string":"String"}},{"name":"phone1","type":{"type":"string","avro.java.string":"String"}},{"name":"phone2","type":{"type":"string","avro.java.string":"String"}},{"name":"email","type":{"type":"string","avro.java.string":"String"}},{"name":"web","type":{"type":"string","avro.java.string":"String"}}]}
> > (through reference chain:
> >
> io.confluent.examples.streams.avro.XXXXXXXXX["schema"]->org.apache.avro.RecordSchema["enumSymbols"])
> >
> > at
> >
> com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210)
> >
> > at
> >
> com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177)
> >
> > at
> >
> com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:190)
> >
> > at
> >
> com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:674)
> >
> > at
> >
> com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156)
> >
> > at
> >
> com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:575)
> >
> > at
> >
> com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666)
> >
> > at
> >
> com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156)
> >
> > at
> >
> com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:129)
> >
> > at
> >
> com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3387)
> >
> > * at
> >
> com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:2805)*
> >
> > * at
> >
> io.confluent.examples.streams.JsonPOJOSerializer.serialize(JsonPOJOSerializer.java:50)*
> >
> > at
> >
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:453)
> >
> > at
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
> >
> > at
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
> >
> > at
> >
> io.confluent.examples.streams.XXXExampleTypedProducer.produceInputs(XXXExampleTypedProducer.java:122)
> >
> > at
> >
> io.confluent.examples.streams.XXXExampleTypedProducer.main(XXXExampleTypedProducer.java:51)
> >
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >
> > at java.lang.reflect.Method.invoke(Method.java:498)
> >
> > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> >
> > Caused by: org.apache.avro.AvroRuntimeException: Not an enum:
> >
> {"type":"record","name":"XXXXXXXX","namespace":"io.confluent.examples.streams.avro","fields":[{"name":"firstName","type":{"type":"string","avro.java.string":"String"}},{"name":"lastName","type":{"type":"string","avro.java.string":"String"}},{"name":"companyName","type":{"type":"string","avro.java.string":"String"}},{"name":"address","type":{"type":"string","avro.java.string":"String"}},{"name":"city","type":{"type":"string","avro.java.string":"String"}},{"name":"province","type":{"type":"string","avro.java.string":"String"}},{"name":"postal","type":{"type":"string","avro.java.string":"String"}},{"name":"phone1","type":{"type":"string","avro.java.string":"String"}},{"name":"phone2","type":{"type":"string","avro.java.string":"String"}},{"name":"email","type":{"type":"string","avro.java.string":"String"}},{"name":"web","type":{"type":"string","avro.java.string":"String"}}]}
> >
> > at org.apache.avro.Schema.getEnumSymbols(Schema.java:206)
> >
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >
> > at java.lang.reflect.Method.invoke(Method.java:498)
> >
> > at
> >
> com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:536)
> >
> > at
> >
> com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666)
> >
> > ... 18 more
> >
> >
> > On Wed, Jul 6, 2016 at 8:18 AM, Philippe Derome <phder...@gmail.com>
> > wrote:
> >
> >> yes, it's a very similar example and I am interested in the Kafka one
> for
> >> the serialization aspect of it, which is a bit richer than on
> Confluent's...
> >>
> >> On Wed, Jul 6, 2016 at 5:35 AM, Michael Noll <mich...@confluent.io>
> >> wrote:
> >>
> >>> Correction:  Just realized that I misread your message.  You are indeed
> >>> referring to the code examples in Apache Kafka. ;-)
> >>>
> >>> On Wed, Jul 6, 2016 at 11:35 AM, Michael Noll <mich...@confluent.io>
> >>> wrote:
> >>>
> >>> > Phil,
> >>> >
> >>> > I suggest to ask this question in the Confluent Platform mailing list
> >>> > because you're referring to code under
> >>> > https://github.com/confluentinc/examples (i.e. code that is not part
> >>> of
> >>> > the Apache Kafka project).
> >>> >
> >>> > Best,
> >>> > Michael
> >>> >
> >>> >
> >>> > On Tue, Jul 5, 2016 at 5:34 PM, Philippe Derome <phder...@gmail.com>
> >>> > wrote:
> >>> >
> >>> >> Would anyone with a good understanding of serialization be available
> >>> to
> >>> >> enhance documentation of the Apache Streams examples? I mean
> >>> specifically:
> >>> >> PageViewTypedDemo, PageViewUntypedDemo in package org
> >>> >> .apache.kafka.streams.examples.pageview
> >>> >>
> >>> >> I'd be happy to run them with Confluent Platform 3 producer/consumer
> >>> shell
> >>> >> scripts but would need guidance as to how to invoke them and how to
> >>> >> specify
> >>> >> some input file (or stdin format).
> >>> >>
> >>> >> That would help me better understand how to get serialization and
> >>> streams
> >>> >> to work together.
> >>> >>
> >>> >> Thanks,
> >>> >>
> >>> >> Phil Derome
> >>> >>
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > Best regards,
> >>> > Michael Noll
> >>> >
> >>> >
> >>> >
> >>> > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
> >>> > <%2B1%20650.453.5860>Download Apache Kafka and Confluent Platform:
> >>> > www.confluent.io/download <http://www.confluent.io/download>*
> >>> >
> >>>
> >>>
> >>>
> >>> --
> >>> Best regards,
> >>> Michael Noll
> >>>
> >>>
> >>>
> >>> *Michael G. Noll | Product Manager | Confluent | +1
> 650.453.5860Download
> >>> Apache Kafka and Confluent Platform: www.confluent.io/download
> >>> <http://www.confluent.io/download>*
> >>>
> >>
> >>
> >
>



-- 
-- Guozhang

Reply via email to