This is what is termed as fun and game. Trying to write a single column (for the sake of test) in this case to BigQuery from Kafka. I am sending the schema and payload as per docs
This is message sent that I can get it from console $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server rhes75:9092 --from-beginning --topic md --property print.key=true Note that it also prints kafka key 9485818a-e6c5-434d-9096-29c6e3f55148 {"schema": { "type": "struct", "fields": [ { "field": "rowkey", "type": "string", "optional": true}],"optional": false,"name": "BQ"}, "payload": {"rowkey": "9485818a-e6c5-434d-9096-29c6e3f55148"}} The error thrown is [2021-03-17 09:29:16,655] ERROR WorkerSinkTask{id=bigquery-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Top-level Kafka Connect schema must be of type 'struct' (org.apache.kafka.connect.runtime.WorkerSinkTask:612) This is the the standalone properties file bootstrap.servers=rhes75:9092 key.converter=org.apache.kafka.connect.storage.StringConverter #key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=true internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.storage.StringConverter ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect_bq.offsetsoffset.flush.interval.ms=10000 and this is the sink properties file name=bigquery-sink connector.type=bigquery-connector connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector defaultDataset=test project=project_name topics=md autoCreateTables=false gcsBucketName=tmp_storage_bucket queueSize=-1 bigQueryRetry=0 bigQueryRetryWait=1000 bigQueryMessageTimePartitioning=false bigQueryPartitionDecorator=true timePartitioningType=DAY keySource=FILE keyfile=xxx.json sanitizeTopics=false schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever threadPoolSize=10 allBQFieldsNullable=false avroDataCacheSize=100 batchLoadIntervalSec=120 convertDoubleSpecialValues=false enableBatchLoad=false upsertEnabled=false deleteEnabled=false mergeIntervalMs=60000 mergeRecordsThreshold=-1 autoCreateBucket=true allowNewBigQueryFields=false allowBigQueryRequiredFieldRelaxation=false allowSchemaUnionization=false kafkaDataFieldName=null kafkaKeyFieldName=null I am sure someone should be able to spot the error here. Many thanks *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.