Hi Marco,

We've seen this issue when the actual message is not encoded with a schema.
In our case we had to refactor our producer slightly, as it wasn't
including the magic byte in the messages.

Cheers,
Patrick


On Thu, 6 Dec 2018, 19:30 Marcos Juarez, <mjua...@gmail.com> wrote:

> We're trying to use Kafka Connect to pull down data from Kafka, but we're
> having issues with the Avro deserialization.
>
> When we attempt to consume data using the kafka-avro-console-consumer, we
> can consume it, and deserialize it correctly.  Our command is similar to
> the following:
>
> *./kafka-avro-console-consumer --topic my_topic --bootstrap-server
> localhost:9092 --property schema.registry.url=http://localhost:8081
> <http://localhost:8081>*
>
> However, if we attempt to use Kafka Connect to consume the data (in our
> case, with an S3SinkConnector), we always get the following error from the
> connector:
>
> "state": "FAILED",
>
> "trace": "org.apache.kafka.connect.errors.DataException:
> cf4_sdp_rest_test_v2
>
>         at
>
> io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
>
>         at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:467)
>
>         at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>
>         at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>
>         at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>
>         at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>
>         at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> *Caused by: org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id -1*
>
> *Caused by: org.apache.kafka.common.errors.SerializationException: Unknown
> magic byte!*
>
> We've tried several different configurations, but we are not able to get
> Kafka Connect to properly consume these messages.  For reference, these
> same messages work correctly with Apache Camus, and of course with the
> command-line kafka-avro-console-consumer as described above. So we're
> pretty confident that the messages are built correctly.
>
> We have noticed that we get the same error if we attempt to print the
> messages in the console consumer with the option *--property
> print.key=false*.  However, we can't figure out a way to turn off key
> deserialization (if that is what is causing this) on the kafka
> connect/connector side.
>
> We're using Kafka 1.1.1, and all the packages are Confluent platform 4.1.2.
>
> Any help would be appreciated.
>
> Thanks,
>
> Marcos Juarez
>

Reply via email to