I just resolved the fix I needed before having time to see your reply. I took all references to JsonPOJO and Jackson JSON dependencies and am using for VALUE_SERDE_CONFIG SpecificAvroSerde and it works well. Indeed I had not planned to embed the schema info in each message as this would be tedious and less elegant.
So you are saying a producer and consumer driver for the AK repo example would work well if embedding the schema info in each message? What would be the VALUE_CONFIG classes then? Matching ones dependent on JsonPOJO or GenericAvroSerde and then specifying schema along the way? I think a little more detail in the AK repo example would be helpful for future readers. On Thu, Jul 7, 2016 at 1:42 PM, Guozhang Wang <wangg...@gmail.com> wrote: > The examples code in AK repo uses JSON as the serde, whereas the examples > in CP repo used Avro, with the embedded schema. These two are different > serde schemes. > > Using a schema registry for Avro / etc would be a better approach to you, > since it does not require each message to embed the schema info, but just a > schema id. > > Guozhang > > > On Wed, Jul 6, 2016 at 8:12 PM, Philippe Derome <phder...@gmail.com> > wrote: > > > I think I should simply follow Kafka The Definitive Guide Chapter 3 for a > > good Avro producer example instead. It does not introduce some Jackson > JSON > > layer and still provides the type safety using POJO generated classes > from > > Avro. > > > > On Wed, Jul 6, 2016 at 9:20 PM, Philippe Derome <phder...@gmail.com> > > wrote: > > > > > The typed version of the example (PageViewTypedDemo) is what represents > > > some difficulty for someone new to Kafka (or CP3). *I think it would be > > > easier/quicker to complete the documentation of that example than to > > answer > > > my questions below....* > > > > > > I am reusing the same JsonPOJOSerializer and JsonPOJODeserializer > classes > > > as in the Kafka example for PageViewTypedDemo and that leads me into > > > exceptions in Jackson JSON serialization. Could I be missing some > > > configuration? In debugger, I see that Jackson does not seem to like > the > > > end of my data as it goes through my 11 fields apparently ok. > > > > > > On a processor similar to PageViewTypedDemo (mine with data more > relevant > > > to what I have in mind and I mask my AVRO generated POJO class with > > > XXXXXXXX below), I write a producer in a separate app to target the > > > processor. I get following stack trace for a producer that would match > > the > > > producer (meaning using JsonPOJOSerializer to match the example that > > uses a > > > JsonPOJODeserializer). > > > > > > Here's what I do: > > > I set up normally Properties > > > I set up normally on a different object Map<String, Object> with > > > BootstrapServer and REGISTRY_URL > > > > > > final Serializer<XXXXXXXX> xxxXXXXXXXXSerializer = new > > JsonPOJOSerializer<>(); > > > > > > propMap.put("JsonPOJOClass", XXXXXXXX.class); > > > > > > xxxXXXXXXXXSerializer.configure(propMap, false); // *presumably* > > configures my POJO class with value of message? > > > > > > final StringSerializer strSerializer = new StringSerializer(); > > > > > > final KafkaProducer<String, XXXXXXXX > producer1 = > > > > > > new KafkaProducer<>(propMap, strSerializer, xxxXXXXXXXXSerializer); > > > > > > for (String[] user : users) { > > > > > > XXXXXXXX x = new XXXXXXXX(); > > > > > > x.setPropertyA("something); // and many other properties/fields > like > > this > > > > > > producer1.send(new ProducerRecord<>("input-topic", user[1], x)); > > > > > > } > > > > > > producer1.flush(); > > > > > > > > > Exception in thread "main" > > > org.apache.kafka.common.errors.SerializationException: Error > serializing > > > JSON message > > > > > > Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not an > > > enum: > > > > > > {"type":"record","name":"XXXXXXXXX","namespace":"io.confluent.examples.streams.avro","fields":[{"name":"firstName","type":{"type":"string","avro.java.string":"String"}},{"name":"lastName","type":{"type":"string","avro.java.string":"String"}},{"name":"companyName","type":{"type":"string","avro.java.string":"String"}},{"name":"address","type":{"type":"string","avro.java.string":"String"}},{"name":"city","type":{"type":"string","avro.java.string":"String"}},{"name":"province","type":{"type":"string","avro.java.string":"String"}},{"name":"postal","type":{"type":"string","avro.java.string":"String"}},{"name":"phone1","type":{"type":"string","avro.java.string":"String"}},{"name":"phone2","type":{"type":"string","avro.java.string":"String"}},{"name":"email","type":{"type":"string","avro.java.string":"String"}},{"name":"web","type":{"type":"string","avro.java.string":"String"}}]} > > > (through reference chain: > > > > > > io.confluent.examples.streams.avro.XXXXXXXXX["schema"]->org.apache.avro.RecordSchema["enumSymbols"]) > > > > > > at > > > > > > com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210) > > > > > > at > > > > > > com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177) > > > > > > at > > > > > > com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:190) > > > > > > at > > > > > > com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:674) > > > > > > at > > > > > > com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156) > > > > > > at > > > > > > com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:575) > > > > > > at > > > > > > com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666) > > > > > > at > > > > > > com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156) > > > > > > at > > > > > > com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:129) > > > > > > at > > > > > > com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3387) > > > > > > * at > > > > > > com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:2805)* > > > > > > * at > > > > > > io.confluent.examples.streams.JsonPOJOSerializer.serialize(JsonPOJOSerializer.java:50)* > > > > > > at > > > > > > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:453) > > > > > > at > > > > > > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430) > > > > > > at > > > > > > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353) > > > > > > at > > > > > > io.confluent.examples.streams.XXXExampleTypedProducer.produceInputs(XXXExampleTypedProducer.java:122) > > > > > > at > > > > > > io.confluent.examples.streams.XXXExampleTypedProducer.main(XXXExampleTypedProducer.java:51) > > > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > > > at > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > > > > > at > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > > > > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > > > > > > Caused by: org.apache.avro.AvroRuntimeException: Not an enum: > > > > > > {"type":"record","name":"XXXXXXXX","namespace":"io.confluent.examples.streams.avro","fields":[{"name":"firstName","type":{"type":"string","avro.java.string":"String"}},{"name":"lastName","type":{"type":"string","avro.java.string":"String"}},{"name":"companyName","type":{"type":"string","avro.java.string":"String"}},{"name":"address","type":{"type":"string","avro.java.string":"String"}},{"name":"city","type":{"type":"string","avro.java.string":"String"}},{"name":"province","type":{"type":"string","avro.java.string":"String"}},{"name":"postal","type":{"type":"string","avro.java.string":"String"}},{"name":"phone1","type":{"type":"string","avro.java.string":"String"}},{"name":"phone2","type":{"type":"string","avro.java.string":"String"}},{"name":"email","type":{"type":"string","avro.java.string":"String"}},{"name":"web","type":{"type":"string","avro.java.string":"String"}}]} > > > > > > at org.apache.avro.Schema.getEnumSymbols(Schema.java:206) > > > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > > > at > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > > > > > at > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > > > > at > > > > > > com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:536) > > > > > > at > > > > > > com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666) > > > > > > ... 18 more > > > > > > > > > On Wed, Jul 6, 2016 at 8:18 AM, Philippe Derome <phder...@gmail.com> > > > wrote: > > > > > >> yes, it's a very similar example and I am interested in the Kafka one > > for > > >> the serialization aspect of it, which is a bit richer than on > > Confluent's... > > >> > > >> On Wed, Jul 6, 2016 at 5:35 AM, Michael Noll <mich...@confluent.io> > > >> wrote: > > >> > > >>> Correction: Just realized that I misread your message. You are > indeed > > >>> referring to the code examples in Apache Kafka. ;-) > > >>> > > >>> On Wed, Jul 6, 2016 at 11:35 AM, Michael Noll <mich...@confluent.io> > > >>> wrote: > > >>> > > >>> > Phil, > > >>> > > > >>> > I suggest to ask this question in the Confluent Platform mailing > list > > >>> > because you're referring to code under > > >>> > https://github.com/confluentinc/examples (i.e. code that is not > part > > >>> of > > >>> > the Apache Kafka project). > > >>> > > > >>> > Best, > > >>> > Michael > > >>> > > > >>> > > > >>> > On Tue, Jul 5, 2016 at 5:34 PM, Philippe Derome < > phder...@gmail.com> > > >>> > wrote: > > >>> > > > >>> >> Would anyone with a good understanding of serialization be > available > > >>> to > > >>> >> enhance documentation of the Apache Streams examples? I mean > > >>> specifically: > > >>> >> PageViewTypedDemo, PageViewUntypedDemo in package org > > >>> >> .apache.kafka.streams.examples.pageview > > >>> >> > > >>> >> I'd be happy to run them with Confluent Platform 3 > producer/consumer > > >>> shell > > >>> >> scripts but would need guidance as to how to invoke them and how > to > > >>> >> specify > > >>> >> some input file (or stdin format). > > >>> >> > > >>> >> That would help me better understand how to get serialization and > > >>> streams > > >>> >> to work together. > > >>> >> > > >>> >> Thanks, > > >>> >> > > >>> >> Phil Derome > > >>> >> > > >>> > > > >>> > > > >>> > > > >>> > -- > > >>> > Best regards, > > >>> > Michael Noll > > >>> > > > >>> > > > >>> > > > >>> > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860 > > >>> > <%2B1%20650.453.5860>Download Apache Kafka and Confluent Platform: > > >>> > www.confluent.io/download <http://www.confluent.io/download>* > > >>> > > > >>> > > >>> > > >>> > > >>> -- > > >>> Best regards, > > >>> Michael Noll > > >>> > > >>> > > >>> > > >>> *Michael G. Noll | Product Manager | Confluent | +1 > > 650.453.5860Download > > >>> Apache Kafka and Confluent Platform: www.confluent.io/download > > >>> <http://www.confluent.io/download>* > > >>> > > >> > > >> > > > > > > > > > -- > -- Guozhang >