Hi Mich, Your bigquery-sink.properties file is in a JSON format - which won't work. It needs to follow the usual format of a Java properties file.
Kind regards, Liam Clarke-Hutchinson On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi, > > > Trying to stream from Kafka to Google BigQuery. > > > The connect-standalone.properties is as follows > > > key.converter=org.apache.kafka.connect.storage.StringConverter > > ##value.converter=org.apache.kafka.connect.storage.StringConverter > > value.converter=org.apache.kafka.connect.json.JsonConverter > > # > > # Converter-specific settings can be passed in by prefixing the Converter's > > # setting with the converter we want to apply it to > > key.converter.schemas.enable=true > > value.converter.schemas.enable=false > > > # The internal converter used for offsets and config data is configurable > and > > # must be specified, but most users will always want to use the built-in > > # default. Offset and config data is never visible outside of Kafka Connect > in > > # this format. > > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter > > internal.value.converter=org.apache.kafka.connect.json.JsonConverter > > internal.key.converter=org.apache.kafka.connect.storage.StringConverter > > ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter > > internal.key.converter.schemas.enable=false > > internal.value.converter.schemas.enable=false > > > offset.storage.file.filename=/tmp/connect_bq.offsets > > # Flush much faster than normal, which is useful for testing/debugging > > offset.flush.interval.ms=10000 > > > # Set to a list of filesystem paths separated by commas (,) to enable class > > # loading isolation for plugins (connectors, converters, transformations). > The > > # list should consist of top level directories that include any combination > of: > > # a) directories immediately containing jars with plugins and their > dependencies > > # b) uber-jars with plugins and their dependencies > > # c) directories immediately containing the package directory structure of > > # classes of plugins and their dependencies Note: symlinks will be followed > to > > # discover dependencies or plugins. > > # Examples: > > plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins > > > And bigquery-sink.properties file has this > > > { > > "name": "bigquery-sink", > > "connector.type": "bigquery-connector", > > "connector.class": > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", > > "defaultDataset": "test", > > "project": "xyz", > > "topics": "md", > > "autoCreateTables": "false", > > "gcsBucketName": "tmp_storage_bucket", > > "queueSize": "-1", > > "bigQueryRetry": "0", > > "bigQueryRetryWait": "1000", > > "bigQueryMessageTimePartitioning": "false", > > "bigQueryPartitionDecorator": "true", > > "timePartitioningType": "DAY", > > "keySource": "FILE", > > "keyfile": "/home/hduser/xyz.json", > > "sanitizeTopics": "false", > > "schemaRetriever": > > "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever", > > "threadPoolSize": "10", > > "allBQFieldsNullable": "false", > > "avroDataCacheSize": "100", > > "batchLoadIntervalSec": "120", > > "convertDoubleSpecialValues": "false", > > "enableBatchLoad": "false", > > "upsertEnabled": "false", > > "deleteEnabled": "false", > > "mergeIntervalMs": "60_000L", > > "mergeRecordsThreshold": "-1", > > "autoCreateBucket": "true", > > "allowNewBigQueryFields": "false", > > "allowBigQueryRequiredFieldRelaxation": "false", > > "allowSchemaUnionization": "false", > > "kafkaDataFieldName": "null", > > "kafkaKeyFieldName": "null" > > } > > Run as below > > > $KAFKA_HOME/bin/connect-standalone.sh \ > > /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \ > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties > > I get this error > > [2021-03-11 11:07:58,826] ERROR Failed to create job for > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties > (org.apache.kafka.connect.cli.ConnectStandalone:102) > [2021-03-11 11:07:58,826] ERROR Stopping after connector error > (org.apache.kafka.connect.cli.ConnectStandalone:113) > java.util.concurrent.ExecutionException: > org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector > config {"defaultDataset"="test",, > > "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",, > "project"="axial-glow-224522",, "autoCreateTables"="false",, > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",, > "bigQueryMessageTimePartitioning"="false",, > "connector.type"="bigquery-connector",, > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",, > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",, > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",, > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",, > "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",, > "keySource"="FILE",, "allowNewBigQueryFields"="false",, > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",, > "threadPoolSize"="10",, "timePartitioningType"="DAY",, > "enableBatchLoad"="false",, > > "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",, > "mergeRecordsThreshold"="-1",, "queueSize"="-1",, > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=, > "avroDataCacheSize"="100",, "upsertEnabled"="false",, > "kafkaDataFieldName"="null",, }=, > "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector type > > I think the problem is the wrong entry in the bigquery-sink.properties > file above. > > I cannot see what it is? > > > Any ideas appreciated. > > > Thanks > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. >