We're trying to use Kafka Connect to pull down data from Kafka, but we're
having issues with the Avro deserialization.

When we attempt to consume data using the kafka-avro-console-consumer, we
can consume it, and deserialize it correctly.  Our command is similar to
the following:

*./kafka-avro-console-consumer --topic my_topic --bootstrap-server
localhost:9092 --property schema.registry.url=http://localhost:8081
<http://localhost:8081>*

However, if we attempt to use Kafka Connect to consume the data (in our
case, with an S3SinkConnector), we always get the following error from the
connector:

"state": "FAILED",

"trace": "org.apache.kafka.connect.errors.DataException:
cf4_sdp_rest_test_v2

        at
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)

        at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:467)

        at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)

        at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)

        at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)

        at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)

        at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)

        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

*Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id -1*

*Caused by: org.apache.kafka.common.errors.SerializationException: Unknown
magic byte!*

We've tried several different configurations, but we are not able to get
Kafka Connect to properly consume these messages.  For reference, these
same messages work correctly with Apache Camus, and of course with the
command-line kafka-avro-console-consumer as described above. So we're
pretty confident that the messages are built correctly.

We have noticed that we get the same error if we attempt to print the
messages in the console consumer with the option *--property
print.key=false*.  However, we can't figure out a way to turn off key
deserialization (if that is what is causing this) on the kafka
connect/connector side.

We're using Kafka 1.1.1, and all the packages are Confluent platform 4.1.2.

Any help would be appreciated.

Thanks,

Marcos Juarez

Reply via email to