It seems like a bug.

Thanks,
Nick

On 9 February 2017 at 14:57, Nick DeCoursin <n.decour...@foodpanda.com>
wrote:

> Hello,
>
> Here is a github repo with the failing case: https://github.com/decoursin/
> kafka-connect-test.
>
> I've tried other similar things and nothing seems to work.
>
> Thanks,
> Nick
>
> On 9 February 2017 at 04:40, Nick DeCoursin <n.decour...@foodpanda.com>
> wrote:
>
>> Any help here? I can create a git repo with the code, if somebody assures
>> me they'll have a look.
>>
>> Thank you,
>> Nick
>>
>> On 8 February 2017 at 10:39, Nick DeCoursin <n.decour...@foodpanda.com>
>> wrote:
>>
>>> Below's the rest of my consumer, which includes the serde code. It's
>>> worth noting that when I run the following command, it properly outputs the
>>> topic to the terminal.
>>>
>>> sudo kafka-avro-console-consumer --bootstrap-server localhost:9092 \
>>>                             --from-beginning \
>>>                             --topic test2
>>>
>>> {"id":9,"product":"bar","quantity":3,"price":1.0}
>>> {"id":9,"product":"bar","quantity":3,"price":1.0}
>>> {"id":9,"product":"bar","quantity":3,"price":1.0}
>>> {"id":9,"product":"bar","quantity":3,"price":1.0}
>>>
>>> *Rest of the consumer*
>>> public class CreateOrderProcessor {
>>>     public static final String SCHEMA_REGISTRY_URL = "
>>> http://localhost:8081";;
>>>
>>>     public static void main(final String[] args) throws Exception {
>>>         final Properties streamsConfiguration = new Properties();
>>>         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>> "create-order");
>>>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>> "localhost:9092");
>>>         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>>> "localhost:2181");
>>>         
>>> streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>> SCHEMA_REGISTRY_URL);
>>>
>>>         final CachedSchemaRegistryClient schemaRegistry = new
>>> CachedSchemaRegistryClient(SCHEMA_REGISTRY_URL, 100);
>>>         final Map<String, String> serdeProps =
>>> Collections.singletonMap("schema.registry.url", SCHEMA_REGISTRY_URL);
>>>
>>>         final SpecificAvroSerde<test> testSpecificAvroSerde = new
>>> SpecificAvroSerde<>(schemaRegistry, serdeProps);
>>>         testSpecificAvroSerde.configure(serdeProps, false);
>>>
>>>         final KStreamBuilder builder = new KStreamBuilder();
>>>
>>>         final KStream<String, test> tests =
>>> builder.stream(Serdes.String(), testSpecificAvroSerde, "test");
>>>
>>>         tests.map((id, command) -> {
>>>             System.out.println("test id=" + id + " command=" + command);
>>>             command.setId(9);
>>>
>>>             return new KeyValue<>(UUID.randomUUID().toString(),
>>> command);
>>>         })
>>>             .through(Serdes.String(), testSpecificAvroSerde, "test2");
>>>
>>>         System.out.println("starting stream...");
>>>
>>>         final KafkaStreams streams = new KafkaStreams(builder,
>>> streamsConfiguration);
>>>         streams.start();
>>>
>>>         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>>>             try {
>>>                 streams.close();
>>>             } catch (Exception e) {
>>>                 System.out.println(e);
>>>             }
>>>         }));
>>>     }
>>> }
>>>
>>> Thank you very much,
>>> Nick
>>>
>>>
>>> On 8 February 2017 at 01:26, Gwen Shapira <g...@confluent.io> wrote:
>>>
>>>> Since the data that goes through your Streams app is the one with the
>>>> bad magic byte, I suspect your streams Serde is not serializing Avro
>>>> correctly (i.e. in the format that the Connect converter requires).
>>>>
>>>> Can you share your Serde code?
>>>>
>>>> Gwen
>>>>
>>>> On Tue, Feb 7, 2017 at 10:49 AM, Nick DeCoursin
>>>> <n.decour...@foodpanda.com> wrote:
>>>> > Hello,
>>>> >
>>>> > I'm experiencing a problem using Kafka Connect's JdbcSinkConnector.
>>>> I'm
>>>> > creating two connectors using the following script:
>>>> `./create-connector.sh
>>>> > test` and `./create-connector.sh test2`.
>>>> >
>>>> > The first one `test` works, the second one `test2` doesn't work.
>>>> Meaning,
>>>> > the first one successfully copies data into postgres, the other one
>>>> fails
>>>> > with the error message below. The only difference between `test` and
>>>> > `test2` is that the second is the result of:
>>>> `test.map(...).through(.., ..,
>>>> > "test2").`
>>>> >
>>>> > *Error*
>>>> > {
>>>> >   "name": "test2",
>>>> >   "connector": {
>>>> >     "state": "RUNNING",
>>>> >     "worker_id": "localhost:8083"
>>>> >   },
>>>> >   "tasks": [
>>>> >     {
>>>> >       "state": "FAILED",
>>>> >       "trace": "org.apache.kafka.connect.errors.DataException:
>>>> Failed to
>>>> > deserialize data to Avro: \n\tat
>>>> > io.confluent.connect.avro.AvroConverter.toConnectData(AvroCo
>>>> nverter.java:109)\n\tat
>>>> > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessa
>>>> ges(WorkerSinkTask.java:357)\n\tat
>>>> > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerS
>>>> inkTask.java:239)\n\tat
>>>> > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(Wo
>>>> rkerSinkTask.java:172)\n\tat
>>>> > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(Work
>>>> erSinkTask.java:143)\n\tat
>>>> > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask
>>>> .java:140)\n\tat
>>>> > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.j
>>>> ava:175)\n\tat
>>>> > java.util.concurrent.Executors$RunnableAdapter.call(Executor
>>>> s.java:511)\n\tat
>>>> > java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
>>>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>> Executor.java:1142)\n\tat
>>>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>> lExecutor.java:617)\n\tat
>>>> > java.lang.Thread.run(Thread.java:745)\nCaused by:
>>>> > org.apache.kafka.common.errors.SerializationException: Error
>>>> deserializing
>>>> > Avro message for id -1\nCaused by:
>>>> > org.apache.kafka.common.errors.SerializationException: Unknown magic
>>>> > byte!\n",
>>>> >       "id": 0,
>>>> >       "worker_id": "localhost:8083"
>>>> >     }
>>>> >   ]
>>>> > }
>>>> >
>>>> > *create-connector.sh*
>>>> > #! /bin/bash
>>>> > NAME=$1
>>>> > TOPICS=$1
>>>> >
>>>> > # create the JDBC sink connector.
>>>> > curl -X POST \
>>>> >   -H "Content-Type: application/json" \
>>>> >   --data '{"name": "'$NAME'", "config": { "connector.class":
>>>> > "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": 1,
>>>> > "connection.url":
>>>> > "jdbc:postgresql://localhost:7999/kafka?user=postgres&passwo
>>>> rd=somepassword33",
>>>> > "topics": "'$TOPICS'", "poll.interval.ms": 1000, "auto.create": true,
>>>> > "auto.evolve": true } }' \
>>>> >   http://localhost:8083/connector
>>>> >
>>>> > *connect server*
>>>> > docker run -d \
>>>> >   --name=kafka-connect \
>>>> >   --net=host \
>>>> >   -e CONNECT_BOOTSTRAP_SERVERS=localhost:9092 \
>>>> >   -e CONNECT_REST_PORT=8083 \
>>>> >   -e CONNECT_GROUP_ID="logistics" \
>>>> >   -e CONNECT_CONFIG_STORAGE_TOPIC="logistics-config" \
>>>> >   -e CONNECT_OFFSET_STORAGE_TOPIC="logistics-offsets" \
>>>> >   -e CONNECT_STATUS_STORAGE_TOPIC="logistics-status" \
>>>> >   -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter"
>>>> \
>>>> >   -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081";
>>>> \
>>>> >   -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter"
>>>> \
>>>> >   -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://localhos
>>>> t:8081" \
>>>> >   -e
>>>> > CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.jso
>>>> n.JsonConverter"
>>>> > \
>>>> >   -e
>>>> > CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.j
>>>> son.JsonConverter"
>>>> > \
>>>> >   -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
>>>> >   -e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \
>>>> >   confluentinc/cp-kafka-connect:latest
>>>> >
>>>> > *Producer*
>>>> > public static void main(String[] args) throws InterruptedException {
>>>> > Properties props = new Properties();
>>>> > props.put("bootstrap.servers", "localhost:9092");
>>>> > props.put("acks", "all");
>>>> > props.put("retries", 0);
>>>> > props.put("key.serializer",
>>>> > "io.confluent.kafka.serializers.KafkaAvroSerializer");
>>>> > props.put("value.serializer",
>>>> > "io.confluent.kafka.serializers.KafkaAvroSerializer");
>>>> > props.put("schema.registry.url", "http://localhost:8081";);
>>>> >
>>>> > String topic = "test";
>>>> >
>>>> > Producer<String, test> producer = new KafkaProducer<>(props);
>>>> >
>>>> > while (true) {
>>>> > test command = CommandGenerator.getNextTest();
>>>> > System.out.println("Generated event " + command.toString());
>>>> >
>>>> > ProducerRecord<String, test> record = new ProducerRecord<>(topic,
>>>> > UUID.randomUUID().toString(), command);
>>>> > producer.send(record);
>>>> > Thread.sleep(500);
>>>> > }
>>>> > }
>>>> >
>>>> > *Consumer*
>>>> > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>> > "create-order");
>>>> > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>> > "localhost:9092");
>>>> > streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>>>> > "localhost:2181");
>>>> > streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA
>>>> _REGISTRY_URL_CONFIG,
>>>> > "http://localhost:8081";);
>>>> >
>>>> > final KStream<String, test> tests = builder.stream(Serdes.String(),
>>>> > testSpecificAvroSerde, "test");
>>>> >
>>>> > tests.map((id, command) -> {
>>>> > System.out.println("test id=" + id + " command=" + command);
>>>> > command.setId(9);
>>>> >
>>>> > return new KeyValue<>(UUID.randomUUID().toString(), command);
>>>> > })
>>>> > .through(Serdes.String(), testSpecificAvroSerde, "test2");
>>>> >
>>>> >
>>>> > *test.avsc*
>>>> > {
>>>> > "type": "record",
>>>> >         "namespace": "com.foodpanda.command.avro",
>>>> > "name": "test",
>>>> > "fields": [{
>>>> > "name": "id",
>>>> > "type": "int"
>>>> > }, {
>>>> > "name": "product",
>>>> > "type": "string"
>>>> > }, {
>>>> > "name": "quantity",
>>>> > "type": "int"
>>>> > }, {
>>>> > "name": "price",
>>>> > "type": "float"
>>>> > }]
>>>> > }
>>>> >
>>>> > --
>>>> >
>>>> > Nick DeCoursin
>>>> > Software Engineer
>>>> > foodpanda
>>>> >
>>>> > Tel | +1 920 450 5434
>>>> >
>>>> > Mail | n.decour...@foodpanda.com
>>>> >
>>>> > Skype | nick.foodpanda
>>>> >
>>>> > Foodpanda GmbH | Schreiberhauer Str. 30 | 10317 Berlin | Germany
>>>> > Sitz der Gesellschaft | Berlin, AG Charlottenburg | HRB 138224 B |
>>>> > USt-ID-Nr | DE 283789080
>>>> > Geschäftsführer | Benjamin Bauer, Felix Plog, Ralf Wenzel
>>>> >
>>>> > CONFIDENTIALITY NOTICE: This message (including any attachments) is
>>>> > confidential and may be privileged. It may be read, copied and used
>>>> only by
>>>> > the intended recipient. If you have received it in error please
>>>> contact the
>>>> > sender (by return e-mail) immediately and delete this message. Any
>>>> > unauthorized use or dissemination of this message in whole or in
>>>> parts is
>>>> > strictly prohibited.
>>>>
>>>>
>>>>
>>>> --
>>>> Gwen Shapira
>>>> Product Manager | Confluent
>>>> 650.450.2760 | @gwenshap
>>>> Follow us: Twitter | blog
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> Nick DeCoursin
>>> Software Engineer
>>> foodpanda
>>>
>>> Tel | +1 920 450 5434 <(920)%20450-5434>
>>>
>>> Mail | n.decour...@foodpanda.com
>>>
>>> Skype | nick.foodpanda
>>>
>>> Foodpanda GmbH | Schreiberhauer Str. 30 | 10317 Berlin | Germany
>>> Sitz der Gesellschaft | Berlin, AG Charlottenburg | HRB 138224 B |
>>> USt-ID-Nr | DE 283789080
>>> Geschäftsführer | Benjamin Bauer, Felix Plog, Ralf Wenzel
>>>
>>> CONFIDENTIALITY NOTICE: This message (including any attachments) is
>>> confidential and may be privileged. It may be read, copied and used only by
>>> the intended recipient. If you have received it in error please contact the
>>> sender (by return e-mail) immediately and delete this message. Any
>>> unauthorized use or dissemination of this message in whole or in parts is
>>> strictly prohibited.
>>>
>>
>>
>>
>> --
>>
>> Nick DeCoursin
>> Software Engineer
>> foodpanda
>>
>> Tel | +1 920 450 5434 <(920)%20450-5434>
>>
>> Mail | n.decour...@foodpanda.com
>>
>> Skype | nick.foodpanda
>>
>> Foodpanda GmbH | Schreiberhauer Str. 30 | 10317 Berlin | Germany
>> Sitz der Gesellschaft | Berlin, AG Charlottenburg | HRB 138224 B |
>> USt-ID-Nr | DE 283789080
>> Geschäftsführer | Benjamin Bauer, Felix Plog, Ralf Wenzel
>>
>> CONFIDENTIALITY NOTICE: This message (including any attachments) is
>> confidential and may be privileged. It may be read, copied and used only by
>> the intended recipient. If you have received it in error please contact the
>> sender (by return e-mail) immediately and delete this message. Any
>> unauthorized use or dissemination of this message in whole or in parts is
>> strictly prohibited.
>>
>
>
>
> --
>
> Nick DeCoursin
> Software Engineer
> foodpanda
>
> Tel | +1 920 450 5434 <(920)%20450-5434>
>
> Mail | n.decour...@foodpanda.com
>
> Skype | nick.foodpanda
>
> Foodpanda GmbH | Schreiberhauer Str. 30 | 10317 Berlin | Germany
> Sitz der Gesellschaft | Berlin, AG Charlottenburg | HRB 138224 B |
> USt-ID-Nr | DE 283789080
> Geschäftsführer | Benjamin Bauer, Felix Plog, Ralf Wenzel
>
> CONFIDENTIALITY NOTICE: This message (including any attachments) is
> confidential and may be privileged. It may be read, copied and used only by
> the intended recipient. If you have received it in error please contact the
> sender (by return e-mail) immediately and delete this message. Any
> unauthorized use or dissemination of this message in whole or in parts is
> strictly prohibited.
>



-- 

Nick DeCoursin
Software Engineer
foodpanda

Tel | +1 920 450 5434

Mail | n.decour...@foodpanda.com

Skype | nick.foodpanda

Foodpanda GmbH | Schreiberhauer Str. 30 | 10317 Berlin | Germany
Sitz der Gesellschaft | Berlin, AG Charlottenburg | HRB 138224 B |
USt-ID-Nr | DE 283789080
Geschäftsführer | Benjamin Bauer, Felix Plog, Ralf Wenzel

CONFIDENTIALITY NOTICE: This message (including any attachments) is
confidential and may be privileged. It may be read, copied and used only by
the intended recipient. If you have received it in error please contact the
sender (by return e-mail) immediately and delete this message. Any
unauthorized use or dissemination of this message in whole or in parts is
strictly prohibited.

Reply via email to