The typed version of the example (PageViewTypedDemo) is what represents some difficulty for someone new to Kafka (or CP3). *I think it would be easier/quicker to complete the documentation of that example than to answer my questions below....*
I am reusing the same JsonPOJOSerializer and JsonPOJODeserializer classes as in the Kafka example for PageViewTypedDemo and that leads me into exceptions in Jackson JSON serialization. Could I be missing some configuration? In debugger, I see that Jackson does not seem to like the end of my data as it goes through my 11 fields apparently ok. On a processor similar to PageViewTypedDemo (mine with data more relevant to what I have in mind and I mask my AVRO generated POJO class with XXXXXXXX below), I write a producer in a separate app to target the processor. I get following stack trace for a producer that would match the producer (meaning using JsonPOJOSerializer to match the example that uses a JsonPOJODeserializer). Here's what I do: I set up normally Properties I set up normally on a different object Map<String, Object> with BootstrapServer and REGISTRY_URL final Serializer<XXXXXXXX> xxxXXXXXXXXSerializer = new JsonPOJOSerializer<>(); propMap.put("JsonPOJOClass", XXXXXXXX.class); xxxXXXXXXXXSerializer.configure(propMap, false); // *presumably* configures my POJO class with value of message? final StringSerializer strSerializer = new StringSerializer(); final KafkaProducer<String, XXXXXXXX > producer1 = new KafkaProducer<>(propMap, strSerializer, xxxXXXXXXXXSerializer); for (String[] user : users) { XXXXXXXX x = new XXXXXXXX(); x.setPropertyA("something); // and many other properties/fields like this producer1.send(new ProducerRecord<>("input-topic", user[1], x)); } producer1.flush(); Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing JSON message Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not an enum: {"type":"record","name":"XXXXXXXXX","namespace":"io.confluent.examples.streams.avro","fields":[{"name":"firstName","type":{"type":"string","avro.java.string":"String"}},{"name":"lastName","type":{"type":"string","avro.java.string":"String"}},{"name":"companyName","type":{"type":"string","avro.java.string":"String"}},{"name":"address","type":{"type":"string","avro.java.string":"String"}},{"name":"city","type":{"type":"string","avro.java.string":"String"}},{"name":"province","type":{"type":"string","avro.java.string":"String"}},{"name":"postal","type":{"type":"string","avro.java.string":"String"}},{"name":"phone1","type":{"type":"string","avro.java.string":"String"}},{"name":"phone2","type":{"type":"string","avro.java.string":"String"}},{"name":"email","type":{"type":"string","avro.java.string":"String"}},{"name":"web","type":{"type":"string","avro.java.string":"String"}}]} (through reference chain: io.confluent.examples.streams.avro.XXXXXXXXX["schema"]->org.apache.avro.RecordSchema["enumSymbols"]) at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210) at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177) at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:190) at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:674) at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156) at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:575) at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666) at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:129) at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3387) * at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:2805)* * at io.confluent.examples.streams.JsonPOJOSerializer.serialize(JsonPOJOSerializer.java:50)* at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:453) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353) at io.confluent.examples.streams.XXXExampleTypedProducer.produceInputs(XXXExampleTypedProducer.java:122) at io.confluent.examples.streams.XXXExampleTypedProducer.main(XXXExampleTypedProducer.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: org.apache.avro.AvroRuntimeException: Not an enum: {"type":"record","name":"XXXXXXXX","namespace":"io.confluent.examples.streams.avro","fields":[{"name":"firstName","type":{"type":"string","avro.java.string":"String"}},{"name":"lastName","type":{"type":"string","avro.java.string":"String"}},{"name":"companyName","type":{"type":"string","avro.java.string":"String"}},{"name":"address","type":{"type":"string","avro.java.string":"String"}},{"name":"city","type":{"type":"string","avro.java.string":"String"}},{"name":"province","type":{"type":"string","avro.java.string":"String"}},{"name":"postal","type":{"type":"string","avro.java.string":"String"}},{"name":"phone1","type":{"type":"string","avro.java.string":"String"}},{"name":"phone2","type":{"type":"string","avro.java.string":"String"}},{"name":"email","type":{"type":"string","avro.java.string":"String"}},{"name":"web","type":{"type":"string","avro.java.string":"String"}}]} at org.apache.avro.Schema.getEnumSymbols(Schema.java:206) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:536) at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666) ... 18 more On Wed, Jul 6, 2016 at 8:18 AM, Philippe Derome <phder...@gmail.com> wrote: > yes, it's a very similar example and I am interested in the Kafka one for > the serialization aspect of it, which is a bit richer than on Confluent's... > > On Wed, Jul 6, 2016 at 5:35 AM, Michael Noll <mich...@confluent.io> wrote: > >> Correction: Just realized that I misread your message. You are indeed >> referring to the code examples in Apache Kafka. ;-) >> >> On Wed, Jul 6, 2016 at 11:35 AM, Michael Noll <mich...@confluent.io> >> wrote: >> >> > Phil, >> > >> > I suggest to ask this question in the Confluent Platform mailing list >> > because you're referring to code under >> > https://github.com/confluentinc/examples (i.e. code that is not part of >> > the Apache Kafka project). >> > >> > Best, >> > Michael >> > >> > >> > On Tue, Jul 5, 2016 at 5:34 PM, Philippe Derome <phder...@gmail.com> >> > wrote: >> > >> >> Would anyone with a good understanding of serialization be available to >> >> enhance documentation of the Apache Streams examples? I mean >> specifically: >> >> PageViewTypedDemo, PageViewUntypedDemo in package org >> >> .apache.kafka.streams.examples.pageview >> >> >> >> I'd be happy to run them with Confluent Platform 3 producer/consumer >> shell >> >> scripts but would need guidance as to how to invoke them and how to >> >> specify >> >> some input file (or stdin format). >> >> >> >> That would help me better understand how to get serialization and >> streams >> >> to work together. >> >> >> >> Thanks, >> >> >> >> Phil Derome >> >> >> > >> > >> > >> > -- >> > Best regards, >> > Michael Noll >> > >> > >> > >> > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860 >> > <%2B1%20650.453.5860>Download Apache Kafka and Confluent Platform: >> > www.confluent.io/download <http://www.confluent.io/download>* >> > >> >> >> >> -- >> Best regards, >> Michael Noll >> >> >> >> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download >> Apache Kafka and Confluent Platform: www.confluent.io/download >> <http://www.confluent.io/download>* >> > >