Hi, I'm trying to use the Confluent JDBC Sink as Sri is doing but without a schema. I do not want to write "schema" + "payload" for each record as my records are all for the same table and the schema is not going to change (this is a very simple project) is there a way to configure a fixed schema to be attached to any record ?
Thanks Enrico 2016-09-20 8:46 GMT+02:00 Enrico Olivelli - Diennea <enrico.olive...@diennea.com>: > Hi, > I'm trying to use the Confluent JDBC Sink as Sri is doing but without a > schema. > I do not want to write "schema" + "payload" for each record as my records > are all for the same table and the schema is not going to change (this is a > very simple project) > > Thanks > Enrico > > > Il giorno lun, 19/09/2016 alle 14.41 -0500, Srikrishna Alla ha scritto: > > Thanks Shikar. I made this change and it's working now. > > Thanks, > Sri > > > > On Sep 19, 2016, at 2:25 PM, Shikhar Bhushan > <shik...@confluent.io<mailto:shik...@confluent.io>> wrote: > > Hi Srikrishna, > > The issue is that you are using "name" to specify the field name for the > struct's fields. The correct key to use is "field". > > Best, > > Shikhar > > > > On Thu, Sep 15, 2016 at 4:23 PM Gwen Shapira > <g...@confluent.io<mailto:g...@confluent.io>> wrote: > > ah, never mind - I just noticed you do use a schema... Maybe you are > running into this? https://issues.apache.org/jira/browse/KAFKA-3055 > > > > On Thu, Sep 15, 2016 at 4:20 PM, Gwen Shapira > <g...@confluent.io<mailto:g...@confluent.io>> wrote: > Most people use JSON without schema, so you should probably change > your configuration to: > > key.converter.schemas.enable=false > value.converter.schemas.enable=false > > On Thu, Sep 15, 2016 at 4:04 PM, Srikrishna Alla > <allasrikrish...@gmail.com<mailto:allasrikrish...@gmail.com>> wrote: > > > I am trying to use jdbc connector to send records from Kafka 0.9 to DB. > > > > > I > > > > > am using jsonConverter to convert the records. My connector is failing > > > > > when > > > > > its checking the Schema I am using. Please let me know what is the issue > with my json schema. > > Configuration used: > key.converter=org.apache.kafka.connect.storage.StringConverter > value.converter=org.apache.kafka.connect.json.JsonConverter > # Converter-specific settings can be passed in by prefixing the > > > > > Converter's > > > > > setting with the converter we want to apply > # it to > key.converter.schemas.enable=true > value.converter.schemas.enable=true > > Record that has been sent to the topic - > > > > > {"schema":{"type":"struct","fields":[{"name":"error_code","type":"string","optional":"false"},{"name":"error_time","type":"string","optional":"false"},{"name":"error_msg","type":"string","optional":"false"},{"name":"source","type":"string","optional":"false"},{"name":"criticality","type":"string","optional":"false"}]},"payload":{"error_code":"RAW104","error_time":"09/15/2016@18 > :00:32","error_msg":"Not > > > > > accepting","source":"APPLICATION","criticality":"WARN"}} > > > Error I am seeing: > [2016-09-15 18:01:07,513] ERROR Thread WorkerSinkTask-jdbc-sink-test-0 > exiting with uncaught exception: > (org.apache.kafka.connect.util.ShutdownableThread:84) > *org.apache.kafka.connect.errors.DataException: Struct schema's field > > > > > name > > > > > not specified properly* > at > > > > > org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:493) > > > > > at > > > > > org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:344) > > > > > at > > > > > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334) > > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266) > > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175) > > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90) > > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58) > > > > > at > > > > > org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82) > > > > > Exception in thread "WorkerSinkTask-jdbc-sink-test-0" > *org.apache.kafka.connect.errors.DataException: > Struct schema's field name not specified properly* > at > > > > > org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:493) > > > > > at > > > > > org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:344) > > > > > at > > > > > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334) > > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266) > > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175) > > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90) > > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58) > > > > > at > > > > > org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82) > > > > > > Thanks, > Sri > > > > > > -- > Gwen Shapira > Product Manager | Confluent > 650.450.2760 | @gwenshap > Follow us: Twitter | blog > > > > > > -- > Gwen Shapira > Product Manager | Confluent > 650.450.2760 | @gwenshap > Follow us: Twitter | blog > > > > -- > Enrico Olivelli > Software Development Manager @Diennea > Tel.: (+39) 0546 066100 - Int. 925 > Viale G.Marconi 30/14 - 48018 Faenza (RA) > > MagNews - E-mail Marketing Solutions > http://www.magnews.it > Diennea - Digital Marketing Solutions > http://www.diennea.com > > > ________________________________ > > Iscriviti alla nostra newsletter per rimanere aggiornato su digital ed email > marketing! http://www.magnews.it/newsletter/ > > The information in this email is confidential and may be legally privileged. > If you are not the intended recipient please notify the sender immediately > and destroy this email. Any unauthorized, direct or indirect, disclosure, > copying, storage, distribution or other use is strictly forbidden.