Since the data that goes through your Streams app is the one with the bad magic byte, I suspect your streams Serde is not serializing Avro correctly (i.e. in the format that the Connect converter requires).
Can you share your Serde code? Gwen On Tue, Feb 7, 2017 at 10:49 AM, Nick DeCoursin <n.decour...@foodpanda.com> wrote: > Hello, > > I'm experiencing a problem using Kafka Connect's JdbcSinkConnector. I'm > creating two connectors using the following script: `./create-connector.sh > test` and `./create-connector.sh test2`. > > The first one `test` works, the second one `test2` doesn't work. Meaning, > the first one successfully copies data into postgres, the other one fails > with the error message below. The only difference between `test` and > `test2` is that the second is the result of: `test.map(...).through(.., .., > "test2").` > > *Error* > { > "name": "test2", > "connector": { > "state": "RUNNING", > "worker_id": "localhost:8083" > }, > "tasks": [ > { > "state": "FAILED", > "trace": "org.apache.kafka.connect.errors.DataException: Failed to > deserialize data to Avro: \n\tat > io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)\n\tat > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)\n\tat > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)\n\tat > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)\n\tat > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)\n\tat > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)\n\tat > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)\n\tat > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat > java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat > java.lang.Thread.run(Thread.java:745)\nCaused by: > org.apache.kafka.common.errors.SerializationException: Error deserializing > Avro message for id -1\nCaused by: > org.apache.kafka.common.errors.SerializationException: Unknown magic > byte!\n", > "id": 0, > "worker_id": "localhost:8083" > } > ] > } > > *create-connector.sh* > #! /bin/bash > NAME=$1 > TOPICS=$1 > > # create the JDBC sink connector. > curl -X POST \ > -H "Content-Type: application/json" \ > --data '{"name": "'$NAME'", "config": { "connector.class": > "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": 1, > "connection.url": > "jdbc:postgresql://localhost:7999/kafka?user=postgres&password=somepassword33", > "topics": "'$TOPICS'", "poll.interval.ms": 1000, "auto.create": true, > "auto.evolve": true } }' \ > http://localhost:8083/connector > > *connect server* > docker run -d \ > --name=kafka-connect \ > --net=host \ > -e CONNECT_BOOTSTRAP_SERVERS=localhost:9092 \ > -e CONNECT_REST_PORT=8083 \ > -e CONNECT_GROUP_ID="logistics" \ > -e CONNECT_CONFIG_STORAGE_TOPIC="logistics-config" \ > -e CONNECT_OFFSET_STORAGE_TOPIC="logistics-offsets" \ > -e CONNECT_STATUS_STORAGE_TOPIC="logistics-status" \ > -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \ > -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081" \ > -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \ > -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081" \ > -e > CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e > CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \ > -e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \ > confluentinc/cp-kafka-connect:latest > > *Producer* > public static void main(String[] args) throws InterruptedException { > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("acks", "all"); > props.put("retries", 0); > props.put("key.serializer", > "io.confluent.kafka.serializers.KafkaAvroSerializer"); > props.put("value.serializer", > "io.confluent.kafka.serializers.KafkaAvroSerializer"); > props.put("schema.registry.url", "http://localhost:8081"); > > String topic = "test"; > > Producer<String, test> producer = new KafkaProducer<>(props); > > while (true) { > test command = CommandGenerator.getNextTest(); > System.out.println("Generated event " + command.toString()); > > ProducerRecord<String, test> record = new ProducerRecord<>(topic, > UUID.randomUUID().toString(), command); > producer.send(record); > Thread.sleep(500); > } > } > > *Consumer* > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > "create-order"); > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > "localhost:2181"); > streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > "http://localhost:8081"); > > final KStream<String, test> tests = builder.stream(Serdes.String(), > testSpecificAvroSerde, "test"); > > tests.map((id, command) -> { > System.out.println("test id=" + id + " command=" + command); > command.setId(9); > > return new KeyValue<>(UUID.randomUUID().toString(), command); > }) > .through(Serdes.String(), testSpecificAvroSerde, "test2"); > > > *test.avsc* > { > "type": "record", > "namespace": "com.foodpanda.command.avro", > "name": "test", > "fields": [{ > "name": "id", > "type": "int" > }, { > "name": "product", > "type": "string" > }, { > "name": "quantity", > "type": "int" > }, { > "name": "price", > "type": "float" > }] > } > > -- > > Nick DeCoursin > Software Engineer > foodpanda > > Tel | +1 920 450 5434 > > Mail | n.decour...@foodpanda.com > > Skype | nick.foodpanda > > Foodpanda GmbH | Schreiberhauer Str. 30 | 10317 Berlin | Germany > Sitz der Gesellschaft | Berlin, AG Charlottenburg | HRB 138224 B | > USt-ID-Nr | DE 283789080 > Geschäftsführer | Benjamin Bauer, Felix Plog, Ralf Wenzel > > CONFIDENTIALITY NOTICE: This message (including any attachments) is > confidential and may be privileged. It may be read, copied and used only by > the intended recipient. If you have received it in error please contact the > sender (by return e-mail) immediately and delete this message. Any > unauthorized use or dissemination of this message in whole or in parts is > strictly prohibited. -- Gwen Shapira Product Manager | Confluent 650.450.2760 | @gwenshap Follow us: Twitter | blog