The typed version of the example (PageViewTypedDemo) is what represents
some difficulty for someone new to Kafka (or CP3). *I think it would be
easier/quicker to complete the documentation of that example than to answer
my questions below....*

I am reusing the same JsonPOJOSerializer and JsonPOJODeserializer classes
as in the Kafka example for PageViewTypedDemo and that leads me into
exceptions in Jackson JSON serialization. Could I be missing some
configuration? In debugger, I see that Jackson does not seem to like the
end of my data as it goes through my 11 fields apparently ok.

On a processor similar to PageViewTypedDemo (mine with data more relevant
to what I have in mind and I mask my AVRO generated POJO class with
XXXXXXXX below), I write a producer in a separate app to target the
processor. I get following stack trace for a producer that would match the
producer (meaning using JsonPOJOSerializer to match the example that uses a
JsonPOJODeserializer).

Here's what I do:
I set up normally Properties
I set up normally on a different object Map<String, Object> with
BootstrapServer and REGISTRY_URL

final Serializer<XXXXXXXX> xxxXXXXXXXXSerializer = new JsonPOJOSerializer<>();

propMap.put("JsonPOJOClass", XXXXXXXX.class);

xxxXXXXXXXXSerializer.configure(propMap, false);   // *presumably*
configures my POJO class with value of message?

final StringSerializer strSerializer = new StringSerializer();

final KafkaProducer<String, XXXXXXXX > producer1 =

    new KafkaProducer<>(propMap, strSerializer, xxxXXXXXXXXSerializer);

for (String[] user : users) {

    XXXXXXXX x = new XXXXXXXX();

    x.setPropertyA("something); // and many other properties/fields like this

    producer1.send(new ProducerRecord<>("input-topic", user[1], x));

}

producer1.flush();


Exception in thread "main"
org.apache.kafka.common.errors.SerializationException: Error serializing
JSON message

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not an
enum:
{"type":"record","name":"XXXXXXXXX","namespace":"io.confluent.examples.streams.avro","fields":[{"name":"firstName","type":{"type":"string","avro.java.string":"String"}},{"name":"lastName","type":{"type":"string","avro.java.string":"String"}},{"name":"companyName","type":{"type":"string","avro.java.string":"String"}},{"name":"address","type":{"type":"string","avro.java.string":"String"}},{"name":"city","type":{"type":"string","avro.java.string":"String"}},{"name":"province","type":{"type":"string","avro.java.string":"String"}},{"name":"postal","type":{"type":"string","avro.java.string":"String"}},{"name":"phone1","type":{"type":"string","avro.java.string":"String"}},{"name":"phone2","type":{"type":"string","avro.java.string":"String"}},{"name":"email","type":{"type":"string","avro.java.string":"String"}},{"name":"web","type":{"type":"string","avro.java.string":"String"}}]}
(through reference chain:
io.confluent.examples.streams.avro.XXXXXXXXX["schema"]->org.apache.avro.RecordSchema["enumSymbols"])

at
com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210)

at
com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177)

at
com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:190)

at
com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:674)

at
com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156)

at
com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:575)

at
com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666)

at
com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156)

at
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:129)

at
com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3387)

* at
com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:2805)*

* at
io.confluent.examples.streams.JsonPOJOSerializer.serialize(JsonPOJOSerializer.java:50)*

at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:453)

at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)

at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)

at
io.confluent.examples.streams.XXXExampleTypedProducer.produceInputs(XXXExampleTypedProducer.java:122)

at
io.confluent.examples.streams.XXXExampleTypedProducer.main(XXXExampleTypedProducer.java:51)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

Caused by: org.apache.avro.AvroRuntimeException: Not an enum:
{"type":"record","name":"XXXXXXXX","namespace":"io.confluent.examples.streams.avro","fields":[{"name":"firstName","type":{"type":"string","avro.java.string":"String"}},{"name":"lastName","type":{"type":"string","avro.java.string":"String"}},{"name":"companyName","type":{"type":"string","avro.java.string":"String"}},{"name":"address","type":{"type":"string","avro.java.string":"String"}},{"name":"city","type":{"type":"string","avro.java.string":"String"}},{"name":"province","type":{"type":"string","avro.java.string":"String"}},{"name":"postal","type":{"type":"string","avro.java.string":"String"}},{"name":"phone1","type":{"type":"string","avro.java.string":"String"}},{"name":"phone2","type":{"type":"string","avro.java.string":"String"}},{"name":"email","type":{"type":"string","avro.java.string":"String"}},{"name":"web","type":{"type":"string","avro.java.string":"String"}}]}

at org.apache.avro.Schema.getEnumSymbols(Schema.java:206)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:536)

at
com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666)

... 18 more


On Wed, Jul 6, 2016 at 8:18 AM, Philippe Derome <phder...@gmail.com> wrote:

> yes, it's a very similar example and I am interested in the Kafka one for
> the serialization aspect of it, which is a bit richer than on Confluent's...
>
> On Wed, Jul 6, 2016 at 5:35 AM, Michael Noll <mich...@confluent.io> wrote:
>
>> Correction:  Just realized that I misread your message.  You are indeed
>> referring to the code examples in Apache Kafka. ;-)
>>
>> On Wed, Jul 6, 2016 at 11:35 AM, Michael Noll <mich...@confluent.io>
>> wrote:
>>
>> > Phil,
>> >
>> > I suggest to ask this question in the Confluent Platform mailing list
>> > because you're referring to code under
>> > https://github.com/confluentinc/examples (i.e. code that is not part of
>> > the Apache Kafka project).
>> >
>> > Best,
>> > Michael
>> >
>> >
>> > On Tue, Jul 5, 2016 at 5:34 PM, Philippe Derome <phder...@gmail.com>
>> > wrote:
>> >
>> >> Would anyone with a good understanding of serialization be available to
>> >> enhance documentation of the Apache Streams examples? I mean
>> specifically:
>> >> PageViewTypedDemo, PageViewUntypedDemo in package org
>> >> .apache.kafka.streams.examples.pageview
>> >>
>> >> I'd be happy to run them with Confluent Platform 3 producer/consumer
>> shell
>> >> scripts but would need guidance as to how to invoke them and how to
>> >> specify
>> >> some input file (or stdin format).
>> >>
>> >> That would help me better understand how to get serialization and
>> streams
>> >> to work together.
>> >>
>> >> Thanks,
>> >>
>> >> Phil Derome
>> >>
>> >
>> >
>> >
>> > --
>> > Best regards,
>> > Michael Noll
>> >
>> >
>> >
>> > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
>> > <%2B1%20650.453.5860>Download Apache Kafka and Confluent Platform:
>> > www.confluent.io/download <http://www.confluent.io/download>*
>> >
>>
>>
>>
>> --
>> Best regards,
>> Michael Noll
>>
>>
>>
>> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
>> Apache Kafka and Confluent Platform: www.confluent.io/download
>> <http://www.confluent.io/download>*
>>
>
>

Reply via email to