Hi,

Trying to stream from Kafka to Google BigQuery.


 The connect-standalone.properties is as follows


key.converter=org.apache.kafka.connect.storage.StringConverter

##value.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

#

# Converter-specific settings can be passed in by prefixing the Converter's

# setting with the converter we want to apply it to

key.converter.schemas.enable=true

value.converter.schemas.enable=false


# The internal converter used for offsets and config data is configurable
and

# must be specified, but most users will always want to use the built-in

# default. Offset and config data is never visible outside of Kafka Connect
in

# this format.

##internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter=org.apache.kafka.connect.storage.StringConverter

##internal.value.converter=org.apache.kafka.connect.storage.StringConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false


offset.storage.file.filename=/tmp/connect_bq.offsets

# Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000


# Set to a list of filesystem paths separated by commas (,) to enable class

# loading isolation for plugins (connectors, converters, transformations).
The

# list should consist of top level directories that include any combination
of:

# a) directories immediately containing jars with plugins and their
dependencies

# b) uber-jars with plugins and their dependencies

# c) directories immediately containing the package directory structure of

# classes of plugins and their dependencies Note: symlinks will be followed
to

# discover dependencies or plugins.

# Examples:

plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins


And bigquery-sink.properties file has this


{

     "name": "bigquery-sink",

     "connector.type": "bigquery-connector",

     "connector.class":
"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",

     "defaultDataset": "test",

     "project": "xyz",

     "topics": "md",

     "autoCreateTables": "false",

     "gcsBucketName": "tmp_storage_bucket",

     "queueSize": "-1",

     "bigQueryRetry": "0",

     "bigQueryRetryWait": "1000",

     "bigQueryMessageTimePartitioning": "false",

     "bigQueryPartitionDecorator": "true",

     "timePartitioningType": "DAY",

     "keySource": "FILE",

     "keyfile": "/home/hduser/xyz.json",

     "sanitizeTopics": "false",

     "schemaRetriever":
"com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",

     "threadPoolSize": "10",

     "allBQFieldsNullable": "false",

     "avroDataCacheSize": "100",

     "batchLoadIntervalSec": "120",

     "convertDoubleSpecialValues": "false",

     "enableBatchLoad": "false",

     "upsertEnabled": "false",

     "deleteEnabled": "false",

     "mergeIntervalMs": "60_000L",

     "mergeRecordsThreshold": "-1",

     "autoCreateBucket": "true",

     "allowNewBigQueryFields": "false",

     "allowBigQueryRequiredFieldRelaxation": "false",

     "allowSchemaUnionization": "false",

     "kafkaDataFieldName": "null",

     "kafkaKeyFieldName": "null"

}

Run as below


$KAFKA_HOME/bin/connect-standalone.sh \

/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \

/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties

I get this error

[2021-03-11 11:07:58,826] ERROR Failed to create job for
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
(org.apache.kafka.connect.cli.ConnectStandalone:102)
[2021-03-11 11:07:58,826] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
java.util.concurrent.ExecutionException:
org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector
config {"defaultDataset"="test",,
"schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
"project"="axial-glow-224522",, "autoCreateTables"="false",,
"deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
"bigQueryMessageTimePartitioning"="false",,
"connector.type"="bigquery-connector",,
"gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
"mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
"kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
"keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
"topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
"keySource"="FILE",, "allowNewBigQueryFields"="false",,
"bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
"threadPoolSize"="10",, "timePartitioningType"="DAY",,
"enableBatchLoad"="false",,
"connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
"mergeRecordsThreshold"="-1",, "queueSize"="-1",,
"batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
"avroDataCacheSize"="100",, "upsertEnabled"="false",,
"kafkaDataFieldName"="null",, }=,
"allowBigQueryRequiredFieldRelaxation"="false",} contains no connector type

I think the problem is the wrong entry in the bigquery-sink.properties
file above.

I cannot see what it is?


Any ideas appreciated.


Thanks

*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to