Hi Mich,

Your bigquery-sink.properties file is in a JSON format - which won't work.
It needs to follow the usual format of a Java properties file.

Kind regards,

Liam Clarke-Hutchinson

On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
>
> Trying to stream from Kafka to Google BigQuery.
>
>
>  The connect-standalone.properties is as follows
>
>
> key.converter=org.apache.kafka.connect.storage.StringConverter
>
> ##value.converter=org.apache.kafka.connect.storage.StringConverter
>
> value.converter=org.apache.kafka.connect.json.JsonConverter
>
> #
>
> # Converter-specific settings can be passed in by prefixing the Converter's
>
> # setting with the converter we want to apply it to
>
> key.converter.schemas.enable=true
>
> value.converter.schemas.enable=false
>
>
> # The internal converter used for offsets and config data is configurable
> and
>
> # must be specified, but most users will always want to use the built-in
>
> # default. Offset and config data is never visible outside of Kafka Connect
> in
>
> # this format.
>
> ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>
> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
>
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
>
> internal.key.converter.schemas.enable=false
>
> internal.value.converter.schemas.enable=false
>
>
> offset.storage.file.filename=/tmp/connect_bq.offsets
>
> # Flush much faster than normal, which is useful for testing/debugging
>
> offset.flush.interval.ms=10000
>
>
> # Set to a list of filesystem paths separated by commas (,) to enable class
>
> # loading isolation for plugins (connectors, converters, transformations).
> The
>
> # list should consist of top level directories that include any combination
> of:
>
> # a) directories immediately containing jars with plugins and their
> dependencies
>
> # b) uber-jars with plugins and their dependencies
>
> # c) directories immediately containing the package directory structure of
>
> # classes of plugins and their dependencies Note: symlinks will be followed
> to
>
> # discover dependencies or plugins.
>
> # Examples:
>
> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>
>
> And bigquery-sink.properties file has this
>
>
> {
>
>      "name": "bigquery-sink",
>
>      "connector.type": "bigquery-connector",
>
>      "connector.class":
> "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
>
>      "defaultDataset": "test",
>
>      "project": "xyz",
>
>      "topics": "md",
>
>      "autoCreateTables": "false",
>
>      "gcsBucketName": "tmp_storage_bucket",
>
>      "queueSize": "-1",
>
>      "bigQueryRetry": "0",
>
>      "bigQueryRetryWait": "1000",
>
>      "bigQueryMessageTimePartitioning": "false",
>
>      "bigQueryPartitionDecorator": "true",
>
>      "timePartitioningType": "DAY",
>
>      "keySource": "FILE",
>
>      "keyfile": "/home/hduser/xyz.json",
>
>      "sanitizeTopics": "false",
>
>      "schemaRetriever":
>
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
>
>      "threadPoolSize": "10",
>
>      "allBQFieldsNullable": "false",
>
>      "avroDataCacheSize": "100",
>
>      "batchLoadIntervalSec": "120",
>
>      "convertDoubleSpecialValues": "false",
>
>      "enableBatchLoad": "false",
>
>      "upsertEnabled": "false",
>
>      "deleteEnabled": "false",
>
>      "mergeIntervalMs": "60_000L",
>
>      "mergeRecordsThreshold": "-1",
>
>      "autoCreateBucket": "true",
>
>      "allowNewBigQueryFields": "false",
>
>      "allowBigQueryRequiredFieldRelaxation": "false",
>
>      "allowSchemaUnionization": "false",
>
>      "kafkaDataFieldName": "null",
>
>      "kafkaKeyFieldName": "null"
>
> }
>
> Run as below
>
>
> $KAFKA_HOME/bin/connect-standalone.sh \
>
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>
> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>
> I get this error
>
> [2021-03-11 11:07:58,826] ERROR Failed to create job for
> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> (org.apache.kafka.connect.cli.ConnectStandalone:102)
> [2021-03-11 11:07:58,826] ERROR Stopping after connector error
> (org.apache.kafka.connect.cli.ConnectStandalone:113)
> java.util.concurrent.ExecutionException:
> org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector
> config {"defaultDataset"="test",,
>
> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
> "project"="axial-glow-224522",, "autoCreateTables"="false",,
> "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
> "bigQueryMessageTimePartitioning"="false",,
> "connector.type"="bigquery-connector",,
> "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
> "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
> "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
> "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
> "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
> "keySource"="FILE",, "allowNewBigQueryFields"="false",,
> "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
> "threadPoolSize"="10",, "timePartitioningType"="DAY",,
> "enableBatchLoad"="false",,
>
> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
> "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
> "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
> "avroDataCacheSize"="100",, "upsertEnabled"="false",,
> "kafkaDataFieldName"="null",, }=,
> "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector type
>
> I think the problem is the wrong entry in the bigquery-sink.properties
> file above.
>
> I cannot see what it is?
>
>
> Any ideas appreciated.
>
>
> Thanks
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>

Reply via email to