This is what is termed as fun and game.

Trying to write a single column (for the sake of test) in this case to
BigQuery from Kafka. I am sending the schema and payload as per docs

This is message sent that I can get it from console

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server
rhes75:9092 --from-beginning --topic md --property print.key=true

Note that it also prints kafka key


9485818a-e6c5-434d-9096-29c6e3f55148    {"schema": { "type": "struct",
"fields": [ { "field": "rowkey", "type": "string", "optional":
true}],"optional": false,"name": "BQ"}, "payload": {"rowkey":
"9485818a-e6c5-434d-9096-29c6e3f55148"}}

The error thrown is


[2021-03-17 09:29:16,655] ERROR WorkerSinkTask{id=bigquery-sink-0}
Task threw an uncaught and unrecoverable exception. Task is being
killed and will not recover until manually restarted. Error: Top-level
Kafka Connect schema must be of type 'struct'
(org.apache.kafka.connect.runtime.WorkerSinkTask:612)

This is the the standalone properties file


bootstrap.servers=rhes75:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect_bq.offsetsoffset.flush.interval.ms=10000

and this is the sink properties file


name=bigquery-sink
connector.type=bigquery-connector
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
defaultDataset=test
project=project_name
topics=md
autoCreateTables=false
gcsBucketName=tmp_storage_bucket
queueSize=-1
bigQueryRetry=0
bigQueryRetryWait=1000
bigQueryMessageTimePartitioning=false
bigQueryPartitionDecorator=true
timePartitioningType=DAY
keySource=FILE
keyfile=xxx.json
sanitizeTopics=false
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
threadPoolSize=10
allBQFieldsNullable=false
avroDataCacheSize=100
batchLoadIntervalSec=120
convertDoubleSpecialValues=false
enableBatchLoad=false
upsertEnabled=false
deleteEnabled=false
mergeIntervalMs=60000
mergeRecordsThreshold=-1
autoCreateBucket=true
allowNewBigQueryFields=false
allowBigQueryRequiredFieldRelaxation=false
allowSchemaUnionization=false
kafkaDataFieldName=null
kafkaKeyFieldName=null

I am sure someone should be able to spot the error here.


Many thanks


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to