I just resolved the fix I needed before having time to see your reply. I
took all references to JsonPOJO and Jackson JSON dependencies and am using
for VALUE_SERDE_CONFIG SpecificAvroSerde and it works well. Indeed I had
not planned to embed the schema info in each message as this would be
tedious and less elegant.

So you are saying a producer and consumer driver for the AK repo example
would work well if embedding the schema info in each message? What would be
the VALUE_CONFIG classes then? Matching ones dependent on JsonPOJO or
GenericAvroSerde and then specifying schema along the way? I think a little
more detail in the AK repo example would be helpful for future readers.

On Thu, Jul 7, 2016 at 1:42 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> The examples code in AK repo uses JSON as the serde, whereas the examples
> in CP repo used Avro, with the embedded schema. These two are different
> serde schemes.
>
> Using a schema registry for Avro / etc would be a better approach to you,
> since it does not require each message to embed the schema info, but just a
> schema id.
>
> Guozhang
>
>
> On Wed, Jul 6, 2016 at 8:12 PM, Philippe Derome <phder...@gmail.com>
> wrote:
>
> > I think I should simply follow Kafka The Definitive Guide Chapter 3 for a
> > good Avro producer example instead. It does not introduce some Jackson
> JSON
> > layer and still provides the type safety using POJO generated classes
> from
> > Avro.
> >
> > On Wed, Jul 6, 2016 at 9:20 PM, Philippe Derome <phder...@gmail.com>
> > wrote:
> >
> > > The typed version of the example (PageViewTypedDemo) is what represents
> > > some difficulty for someone new to Kafka (or CP3). *I think it would be
> > > easier/quicker to complete the documentation of that example than to
> > answer
> > > my questions below....*
> > >
> > > I am reusing the same JsonPOJOSerializer and JsonPOJODeserializer
> classes
> > > as in the Kafka example for PageViewTypedDemo and that leads me into
> > > exceptions in Jackson JSON serialization. Could I be missing some
> > > configuration? In debugger, I see that Jackson does not seem to like
> the
> > > end of my data as it goes through my 11 fields apparently ok.
> > >
> > > On a processor similar to PageViewTypedDemo (mine with data more
> relevant
> > > to what I have in mind and I mask my AVRO generated POJO class with
> > > XXXXXXXX below), I write a producer in a separate app to target the
> > > processor. I get following stack trace for a producer that would match
> > the
> > > producer (meaning using JsonPOJOSerializer to match the example that
> > uses a
> > > JsonPOJODeserializer).
> > >
> > > Here's what I do:
> > > I set up normally Properties
> > > I set up normally on a different object Map<String, Object> with
> > > BootstrapServer and REGISTRY_URL
> > >
> > > final Serializer<XXXXXXXX> xxxXXXXXXXXSerializer = new
> > JsonPOJOSerializer<>();
> > >
> > > propMap.put("JsonPOJOClass", XXXXXXXX.class);
> > >
> > > xxxXXXXXXXXSerializer.configure(propMap, false);   // *presumably*
> > configures my POJO class with value of message?
> > >
> > > final StringSerializer strSerializer = new StringSerializer();
> > >
> > > final KafkaProducer<String, XXXXXXXX > producer1 =
> > >
> > >     new KafkaProducer<>(propMap, strSerializer, xxxXXXXXXXXSerializer);
> > >
> > > for (String[] user : users) {
> > >
> > >     XXXXXXXX x = new XXXXXXXX();
> > >
> > >     x.setPropertyA("something); // and many other properties/fields
> like
> > this
> > >
> > >     producer1.send(new ProducerRecord<>("input-topic", user[1], x));
> > >
> > > }
> > >
> > > producer1.flush();
> > >
> > >
> > > Exception in thread "main"
> > > org.apache.kafka.common.errors.SerializationException: Error
> serializing
> > > JSON message
> > >
> > > Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not an
> > > enum:
> > >
> >
> {"type":"record","name":"XXXXXXXXX","namespace":"io.confluent.examples.streams.avro","fields":[{"name":"firstName","type":{"type":"string","avro.java.string":"String"}},{"name":"lastName","type":{"type":"string","avro.java.string":"String"}},{"name":"companyName","type":{"type":"string","avro.java.string":"String"}},{"name":"address","type":{"type":"string","avro.java.string":"String"}},{"name":"city","type":{"type":"string","avro.java.string":"String"}},{"name":"province","type":{"type":"string","avro.java.string":"String"}},{"name":"postal","type":{"type":"string","avro.java.string":"String"}},{"name":"phone1","type":{"type":"string","avro.java.string":"String"}},{"name":"phone2","type":{"type":"string","avro.java.string":"String"}},{"name":"email","type":{"type":"string","avro.java.string":"String"}},{"name":"web","type":{"type":"string","avro.java.string":"String"}}]}
> > > (through reference chain:
> > >
> >
> io.confluent.examples.streams.avro.XXXXXXXXX["schema"]->org.apache.avro.RecordSchema["enumSymbols"])
> > >
> > > at
> > >
> >
> com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210)
> > >
> > > at
> > >
> >
> com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177)
> > >
> > > at
> > >
> >
> com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:190)
> > >
> > > at
> > >
> >
> com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:674)
> > >
> > > at
> > >
> >
> com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156)
> > >
> > > at
> > >
> >
> com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:575)
> > >
> > > at
> > >
> >
> com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666)
> > >
> > > at
> > >
> >
> com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156)
> > >
> > > at
> > >
> >
> com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:129)
> > >
> > > at
> > >
> >
> com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3387)
> > >
> > > * at
> > >
> >
> com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:2805)*
> > >
> > > * at
> > >
> >
> io.confluent.examples.streams.JsonPOJOSerializer.serialize(JsonPOJOSerializer.java:50)*
> > >
> > > at
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:453)
> > >
> > > at
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
> > >
> > > at
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
> > >
> > > at
> > >
> >
> io.confluent.examples.streams.XXXExampleTypedProducer.produceInputs(XXXExampleTypedProducer.java:122)
> > >
> > > at
> > >
> >
> io.confluent.examples.streams.XXXExampleTypedProducer.main(XXXExampleTypedProducer.java:51)
> > >
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > >
> > > at
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > >
> > > at
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > >
> > > at java.lang.reflect.Method.invoke(Method.java:498)
> > >
> > > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> > >
> > > Caused by: org.apache.avro.AvroRuntimeException: Not an enum:
> > >
> >
> {"type":"record","name":"XXXXXXXX","namespace":"io.confluent.examples.streams.avro","fields":[{"name":"firstName","type":{"type":"string","avro.java.string":"String"}},{"name":"lastName","type":{"type":"string","avro.java.string":"String"}},{"name":"companyName","type":{"type":"string","avro.java.string":"String"}},{"name":"address","type":{"type":"string","avro.java.string":"String"}},{"name":"city","type":{"type":"string","avro.java.string":"String"}},{"name":"province","type":{"type":"string","avro.java.string":"String"}},{"name":"postal","type":{"type":"string","avro.java.string":"String"}},{"name":"phone1","type":{"type":"string","avro.java.string":"String"}},{"name":"phone2","type":{"type":"string","avro.java.string":"String"}},{"name":"email","type":{"type":"string","avro.java.string":"String"}},{"name":"web","type":{"type":"string","avro.java.string":"String"}}]}
> > >
> > > at org.apache.avro.Schema.getEnumSymbols(Schema.java:206)
> > >
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > >
> > > at
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > >
> > > at
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > >
> > > at java.lang.reflect.Method.invoke(Method.java:498)
> > >
> > > at
> > >
> >
> com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:536)
> > >
> > > at
> > >
> >
> com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666)
> > >
> > > ... 18 more
> > >
> > >
> > > On Wed, Jul 6, 2016 at 8:18 AM, Philippe Derome <phder...@gmail.com>
> > > wrote:
> > >
> > >> yes, it's a very similar example and I am interested in the Kafka one
> > for
> > >> the serialization aspect of it, which is a bit richer than on
> > Confluent's...
> > >>
> > >> On Wed, Jul 6, 2016 at 5:35 AM, Michael Noll <mich...@confluent.io>
> > >> wrote:
> > >>
> > >>> Correction:  Just realized that I misread your message.  You are
> indeed
> > >>> referring to the code examples in Apache Kafka. ;-)
> > >>>
> > >>> On Wed, Jul 6, 2016 at 11:35 AM, Michael Noll <mich...@confluent.io>
> > >>> wrote:
> > >>>
> > >>> > Phil,
> > >>> >
> > >>> > I suggest to ask this question in the Confluent Platform mailing
> list
> > >>> > because you're referring to code under
> > >>> > https://github.com/confluentinc/examples (i.e. code that is not
> part
> > >>> of
> > >>> > the Apache Kafka project).
> > >>> >
> > >>> > Best,
> > >>> > Michael
> > >>> >
> > >>> >
> > >>> > On Tue, Jul 5, 2016 at 5:34 PM, Philippe Derome <
> phder...@gmail.com>
> > >>> > wrote:
> > >>> >
> > >>> >> Would anyone with a good understanding of serialization be
> available
> > >>> to
> > >>> >> enhance documentation of the Apache Streams examples? I mean
> > >>> specifically:
> > >>> >> PageViewTypedDemo, PageViewUntypedDemo in package org
> > >>> >> .apache.kafka.streams.examples.pageview
> > >>> >>
> > >>> >> I'd be happy to run them with Confluent Platform 3
> producer/consumer
> > >>> shell
> > >>> >> scripts but would need guidance as to how to invoke them and how
> to
> > >>> >> specify
> > >>> >> some input file (or stdin format).
> > >>> >>
> > >>> >> That would help me better understand how to get serialization and
> > >>> streams
> > >>> >> to work together.
> > >>> >>
> > >>> >> Thanks,
> > >>> >>
> > >>> >> Phil Derome
> > >>> >>
> > >>> >
> > >>> >
> > >>> >
> > >>> > --
> > >>> > Best regards,
> > >>> > Michael Noll
> > >>> >
> > >>> >
> > >>> >
> > >>> > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
> > >>> > <%2B1%20650.453.5860>Download Apache Kafka and Confluent Platform:
> > >>> > www.confluent.io/download <http://www.confluent.io/download>*
> > >>> >
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> Best regards,
> > >>> Michael Noll
> > >>>
> > >>>
> > >>>
> > >>> *Michael G. Noll | Product Manager | Confluent | +1
> > 650.453.5860Download
> > >>> Apache Kafka and Confluent Platform: www.confluent.io/download
> > >>> <http://www.confluent.io/download>*
> > >>>
> > >>
> > >>
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to