Hi, Trying to stream into bigQuery table from on-premise cluster. This works with spark structured streaming foreachBatch
This is the command I use: $KAFKA_HOME/bin/connect-standalone.sh \ /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \ /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties and these are the two property files. First connect-standalone.properties key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=false internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect_bq.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka and this is biquery-sink.properties { "name": "bigquery-connect", "config": { "connector.type": "bigquery-connector", "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", "defaultDataset": "test", "project": "axial-glow-224522", "topics": "md", "autoCreateTables":"false", "gcsBucketName": "", "queueSize": "-1", "bigQueryRetry": "0", "bigQueryRetryWait": "1000", "bigQueryMessageTimePartitioning": "false", "bigQueryPartitionDecorator": "true", "timestampPartitionFieldName": "null", "timePartitioningType": "DAY", "keySource": "FILE", "keyfile": "/home/hduser/GCPFirstProject-d75f1b3a9817.json", "sanitizeTopics": "false", "schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever", "threadPoolSize": "10", "allBQFieldsNullable": "false", "avroDataCacheSize": "100", "batchLoadIntervalSec": "120", "convertDoubleSpecialValues": "false", "enableBatchLoad": "", "enableBatchLoad": "false", "upsertEnabled": "false", "deleteEnabled": "false", "mergeIntervalMs": "60_000L", "mergeRecordsThreshold": "-1", "autoCreateBucket": "true", "allowNewBigQueryFields": "false", "allowBigQueryRequiredFieldRelaxation": "false", "allowSchemaUnionization": "false", "kafkaDataFieldName": "null" "kafkaKeyFieldName": "null" } } It comes back with error: [2021-03-08 15:16:40,725] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55) [2021-03-08 15:16:40,726] ERROR Failed to create job for /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties (org.apache.kafka.connect.cli.ConnectStandalone:102) [2021-03-08 15:16:40,726] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113) java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config {"defaultDataset"="test",, "schemaRetriever"="com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",, "project"="axial-glow-224522",, "autoCreateTables"="false",, "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",, "bigQueryMessageTimePartitioning"="false",, "connector.type"="bigquery-connector",, "gcsBucketName"="",, "name"="bigquery-connect",, "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",, "kafkaKeyFieldName"="null", "sanitizeTopics"="false",, "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",, "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",, "keySource"="FILE",, "allowNewBigQueryFields"="false",, "bigQueryRetryWait"="1000",, "timestampPartitionFieldName"="null",, "allowSchemaUnionization"="false",, "config"={, "threadPoolSize"="10",, "timePartitioningType"="DAY",, "enableBatchLoad"="false",, "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",, "mergeRecordsThreshold"="-1",, "queueSize"="-1",, "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=, "avroDataCacheSize"="100",, "upsertEnabled"="false",, "kafkaDataFieldName"="null", }=, "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector type at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79) at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110) Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config {"defaultDataset"="test",, "schemaRetriever"="com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",, "project"="axial-glow-224522",, "autoCreateTables"="false",, "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",, "bigQueryMessageTimePartitioning"="false",, "connector.type"="bigquery-connector",, "gcsBucketName"="",, "name"="bigquery-connect",, "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",, "kafkaKeyFieldName"="null", "sanitizeTopics"="false",, "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",, "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",, "keySource"="FILE",, "allowNewBigQueryFields"="false",, "bigQueryRetryWait"="1000",, "timestampPartitionFieldName"="null",, "allowSchemaUnionization"="false",, "config"={, "threadPoolSize"="10",, "timePartitioningType"="DAY",, "enableBatchLoad"="false",, "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",, "mergeRecordsThreshold"="-1",, "queueSize"="-1",, "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=, "avroDataCacheSize"="100",, "upsertEnabled"="false",, "kafkaDataFieldName"="null", }=, "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector type at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:250) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107) Any ideas appreciated. Cheers, Mich *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.