Hi,

Trying to stream into bigQuery table from on-premise cluster. This works
with spark structured streaming foreachBatch

This is the command I use:

$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties

and these are the two property files. First connect-standalone.properties

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect_bq.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka

and this is biquery-sink.properties


{

   "name": "bigquery-connect",

   "config": {

     "connector.type": "bigquery-connector",

     "connector.class":
"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",

     "defaultDataset": "test",

     "project": "axial-glow-224522",

     "topics": "md",

     "autoCreateTables":"false",

     "gcsBucketName": "",

     "queueSize": "-1",

     "bigQueryRetry": "0",

     "bigQueryRetryWait": "1000",

     "bigQueryMessageTimePartitioning": "false",

     "bigQueryPartitionDecorator": "true",

     "timestampPartitionFieldName": "null",

     "timePartitioningType": "DAY",

     "keySource": "FILE",

     "keyfile": "/home/hduser/GCPFirstProject-d75f1b3a9817.json",

     "sanitizeTopics": "false",

     "schemaRetriever":
"com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",

     "threadPoolSize": "10",

     "allBQFieldsNullable": "false",

     "avroDataCacheSize": "100",

     "batchLoadIntervalSec": "120",

     "convertDoubleSpecialValues": "false",

     "enableBatchLoad": "",

     "enableBatchLoad": "false",

     "upsertEnabled": "false",

     "deleteEnabled": "false",

     "mergeIntervalMs": "60_000L",

     "mergeRecordsThreshold": "-1",

     "autoCreateBucket": "true",

     "allowNewBigQueryFields": "false",

     "allowBigQueryRequiredFieldRelaxation": "false",

     "allowSchemaUnionization": "false",

     "kafkaDataFieldName": "null"

     "kafkaKeyFieldName": "null"

   }

}

It comes back with error:

[2021-03-08 15:16:40,725] INFO Kafka Connect started
(org.apache.kafka.connect.runtime.Connect:55)
[2021-03-08 15:16:40,726] ERROR Failed to create job for
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
(org.apache.kafka.connect.cli.ConnectStandalone:102)
[2021-03-08 15:16:40,726] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
java.util.concurrent.ExecutionException:
org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector
config {"defaultDataset"="test",,
"schemaRetriever"="com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",,
"project"="axial-glow-224522",, "autoCreateTables"="false",,
"deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
"bigQueryMessageTimePartitioning"="false",,
"connector.type"="bigquery-connector",, "gcsBucketName"="",,
"name"="bigquery-connect",, "mergeIntervalMs"="60_000L",,
"convertDoubleSpecialValues"="false",, "kafkaKeyFieldName"="null",
"sanitizeTopics"="false",,
"keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
"topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
"keySource"="FILE",, "allowNewBigQueryFields"="false",,
"bigQueryRetryWait"="1000",, "timestampPartitionFieldName"="null",,
"allowSchemaUnionization"="false",, "config"={, "threadPoolSize"="10",,
"timePartitioningType"="DAY",, "enableBatchLoad"="false",,
"connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
"mergeRecordsThreshold"="-1",, "queueSize"="-1",,
"batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
"avroDataCacheSize"="100",, "upsertEnabled"="false",,
"kafkaDataFieldName"="null",
}=, "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector
type
        at
org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
        at
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
        at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110)
Caused by:
org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector
config {"defaultDataset"="test",,
"schemaRetriever"="com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",,
"project"="axial-glow-224522",, "autoCreateTables"="false",,
"deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
"bigQueryMessageTimePartitioning"="false",,
"connector.type"="bigquery-connector",, "gcsBucketName"="",,
"name"="bigquery-connect",, "mergeIntervalMs"="60_000L",,
"convertDoubleSpecialValues"="false",, "kafkaKeyFieldName"="null",
"sanitizeTopics"="false",,
"keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
"topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
"keySource"="FILE",, "allowNewBigQueryFields"="false",,
"bigQueryRetryWait"="1000",, "timestampPartitionFieldName"="null",,
"allowSchemaUnionization"="false",, "config"={, "threadPoolSize"="10",,
"timePartitioningType"="DAY",, "enableBatchLoad"="false",,
"connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
"mergeRecordsThreshold"="-1",, "queueSize"="-1",,
"batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
"avroDataCacheSize"="100",, "upsertEnabled"="false",,
"kafkaDataFieldName"="null", }=,
"allowBigQueryRequiredFieldRelaxation"="false",} contains no connector type
        at
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:250)
        at
org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
        at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)

Any ideas appreciated.


Cheers,


Mich



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to