> However, we can't figure out a way to turn off key deserialization (if that is what is causing this) on the kafka connect/connector side.
Set *key.converter* to the correct value for the source message. https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained -- Robin Moffatt | Developer Advocate | ro...@confluent.io | @rmoff On Thu, 6 Dec 2018 at 18:30, Marcos Juarez <mjua...@gmail.com> wrote: > We're trying to use Kafka Connect to pull down data from Kafka, but we're > having issues with the Avro deserialization. > > When we attempt to consume data using the kafka-avro-console-consumer, we > can consume it, and deserialize it correctly. Our command is similar to > the following: > > *./kafka-avro-console-consumer --topic my_topic --bootstrap-server > localhost:9092 --property schema.registry.url=http://localhost:8081 > <http://localhost:8081>* > > However, if we attempt to use Kafka Connect to consume the data (in our > case, with an S3SinkConnector), we always get the following error from the > connector: > > "state": "FAILED", > > "trace": "org.apache.kafka.connect.errors.DataException: > cf4_sdp_rest_test_v2 > > at > > io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97) > > at > > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:467) > > at > > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > > at > > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > > at > > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > *Caused by: org.apache.kafka.common.errors.SerializationException: Error > deserializing Avro message for id -1* > > *Caused by: org.apache.kafka.common.errors.SerializationException: Unknown > magic byte!* > > We've tried several different configurations, but we are not able to get > Kafka Connect to properly consume these messages. For reference, these > same messages work correctly with Apache Camus, and of course with the > command-line kafka-avro-console-consumer as described above. So we're > pretty confident that the messages are built correctly. > > We have noticed that we get the same error if we attempt to print the > messages in the console consumer with the option *--property > print.key=false*. However, we can't figure out a way to turn off key > deserialization (if that is what is causing this) on the kafka > connect/connector side. > > We're using Kafka 1.1.1, and all the packages are Confluent platform 4.1.2. > > Any help would be appreciated. > > Thanks, > > Marcos Juarez >