Hi,
Trying to stream from Kafka to Google BigQuery. The connect-standalone.properties is as follows key.converter=org.apache.kafka.connect.storage.StringConverter ##value.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter # # Converter-specific settings can be passed in by prefixing the Converter's # setting with the converter we want to apply it to key.converter.schemas.enable=true value.converter.schemas.enable=false # The internal converter used for offsets and config data is configurable and # must be specified, but most users will always want to use the built-in # default. Offset and config data is never visible outside of Kafka Connect in # this format. ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.storage.StringConverter ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect_bq.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # Set to a list of filesystem paths separated by commas (,) to enable class # loading isolation for plugins (connectors, converters, transformations). The # list should consist of top level directories that include any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of # classes of plugins and their dependencies Note: symlinks will be followed to # discover dependencies or plugins. # Examples: plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins And bigquery-sink.properties file has this { "name": "bigquery-sink", "connector.type": "bigquery-connector", "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", "defaultDataset": "test", "project": "xyz", "topics": "md", "autoCreateTables": "false", "gcsBucketName": "tmp_storage_bucket", "queueSize": "-1", "bigQueryRetry": "0", "bigQueryRetryWait": "1000", "bigQueryMessageTimePartitioning": "false", "bigQueryPartitionDecorator": "true", "timePartitioningType": "DAY", "keySource": "FILE", "keyfile": "/home/hduser/xyz.json", "sanitizeTopics": "false", "schemaRetriever": "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever", "threadPoolSize": "10", "allBQFieldsNullable": "false", "avroDataCacheSize": "100", "batchLoadIntervalSec": "120", "convertDoubleSpecialValues": "false", "enableBatchLoad": "false", "upsertEnabled": "false", "deleteEnabled": "false", "mergeIntervalMs": "60_000L", "mergeRecordsThreshold": "-1", "autoCreateBucket": "true", "allowNewBigQueryFields": "false", "allowBigQueryRequiredFieldRelaxation": "false", "allowSchemaUnionization": "false", "kafkaDataFieldName": "null", "kafkaKeyFieldName": "null" } Run as below $KAFKA_HOME/bin/connect-standalone.sh \ /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \ /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties I get this error [2021-03-11 11:07:58,826] ERROR Failed to create job for /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties (org.apache.kafka.connect.cli.ConnectStandalone:102) [2021-03-11 11:07:58,826] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113) java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config {"defaultDataset"="test",, "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",, "project"="axial-glow-224522",, "autoCreateTables"="false",, "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",, "bigQueryMessageTimePartitioning"="false",, "connector.type"="bigquery-connector",, "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",, "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",, "kafkaKeyFieldName"="null", "sanitizeTopics"="false",, "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",, "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",, "keySource"="FILE",, "allowNewBigQueryFields"="false",, "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",, "threadPoolSize"="10",, "timePartitioningType"="DAY",, "enableBatchLoad"="false",, "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",, "mergeRecordsThreshold"="-1",, "queueSize"="-1",, "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=, "avroDataCacheSize"="100",, "upsertEnabled"="false",, "kafkaDataFieldName"="null",, }=, "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector type I think the problem is the wrong entry in the bigquery-sink.properties file above. I cannot see what it is? Any ideas appreciated. Thanks *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.