Thanks Liam for the suggestion.

This is the redone sink file (plain text)

name=bigquery-sink
connector.type=bigquery-connector
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
defaultDataset=test
project=axial-glow-224522
topics=md
autoCreateTables=false
gcsBucketName=tmp_storage_bucket
queueSize=-1
bigQueryRetry=0
bigQueryRetryWait=1000
bigQueryMessageTimePartitioning=false
bigQueryPartitionDecorator=true
timePartitioningType=DAY
keySource=FILE
keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
sanitizeTopics=false
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
threadPoolSize=10
allBQFieldsNullable=false
avroDataCacheSize=100
batchLoadIntervalSec=120
convertDoubleSpecialValues=false
enableBatchLoad=false
upsertEnabled=false
deleteEnabled=false
mergeIntervalMs=60_000L
mergeRecordsThreshold=-1
autoCreateBucket=true
allowNewBigQueryFields=false
allowBigQueryRequiredFieldRelaxation=false
allowSchemaUnionization=false
kafkaDataFieldName=null
kafkaKeyFieldName=null

Now when I run the command

$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties

It comes back with this error:

[2021-03-12 09:23:54,523] INFO REST server listening at
http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
(org.apache.kafka.connect.runtime.rest.RestServer:207)
[2021-03-12 09:23:54,523] INFO Kafka Connect started
(org.apache.kafka.connect.runtime.Connect:55)
[2021-03-12 09:23:54,534] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
java.lang.NoClassDefFoundError:
org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
        at
com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
        at
com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
        at
org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
        at
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
        at
org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
        at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at
org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

I downloaded common-config-6.1.0.jar and added to lib directory in

..../wepay-kafka-connect-bigquery-2.1.0/lib

But little joy I am afraid.

Cheers,

Mich



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> Hi Mich,
>
> Your bigquery-sink.properties file is in a JSON format - which won't work.
> It needs to follow the usual format of a Java properties file.
>
> Kind regards,
>
> Liam Clarke-Hutchinson
>
> On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
> mich.talebza...@gmail.com>
> wrote:
>
> > Hi,
> >
> >
> > Trying to stream from Kafka to Google BigQuery.
> >
> >
> >  The connect-standalone.properties is as follows
> >
> >
> > key.converter=org.apache.kafka.connect.storage.StringConverter
> >
> > ##value.converter=org.apache.kafka.connect.storage.StringConverter
> >
> > value.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > #
> >
> > # Converter-specific settings can be passed in by prefixing the
> Converter's
> >
> > # setting with the converter we want to apply it to
> >
> > key.converter.schemas.enable=true
> >
> > value.converter.schemas.enable=false
> >
> >
> > # The internal converter used for offsets and config data is configurable
> > and
> >
> > # must be specified, but most users will always want to use the built-in
> >
> > # default. Offset and config data is never visible outside of Kafka
> Connect
> > in
> >
> > # this format.
> >
> > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > internal.key.converter=org.apache.kafka.connect.storage.StringConverter
> >
> >
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
> >
> > internal.key.converter.schemas.enable=false
> >
> > internal.value.converter.schemas.enable=false
> >
> >
> > offset.storage.file.filename=/tmp/connect_bq.offsets
> >
> > # Flush much faster than normal, which is useful for testing/debugging
> >
> > offset.flush.interval.ms=10000
> >
> >
> > # Set to a list of filesystem paths separated by commas (,) to enable
> class
> >
> > # loading isolation for plugins (connectors, converters,
> transformations).
> > The
> >
> > # list should consist of top level directories that include any
> combination
> > of:
> >
> > # a) directories immediately containing jars with plugins and their
> > dependencies
> >
> > # b) uber-jars with plugins and their dependencies
> >
> > # c) directories immediately containing the package directory structure
> of
> >
> > # classes of plugins and their dependencies Note: symlinks will be
> followed
> > to
> >
> > # discover dependencies or plugins.
> >
> > # Examples:
> >
> > plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
> >
> >
> > And bigquery-sink.properties file has this
> >
> >
> > {
> >
> >      "name": "bigquery-sink",
> >
> >      "connector.type": "bigquery-connector",
> >
> >      "connector.class":
> > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
> >
> >      "defaultDataset": "test",
> >
> >      "project": "xyz",
> >
> >      "topics": "md",
> >
> >      "autoCreateTables": "false",
> >
> >      "gcsBucketName": "tmp_storage_bucket",
> >
> >      "queueSize": "-1",
> >
> >      "bigQueryRetry": "0",
> >
> >      "bigQueryRetryWait": "1000",
> >
> >      "bigQueryMessageTimePartitioning": "false",
> >
> >      "bigQueryPartitionDecorator": "true",
> >
> >      "timePartitioningType": "DAY",
> >
> >      "keySource": "FILE",
> >
> >      "keyfile": "/home/hduser/xyz.json",
> >
> >      "sanitizeTopics": "false",
> >
> >      "schemaRetriever":
> >
> >
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
> >
> >      "threadPoolSize": "10",
> >
> >      "allBQFieldsNullable": "false",
> >
> >      "avroDataCacheSize": "100",
> >
> >      "batchLoadIntervalSec": "120",
> >
> >      "convertDoubleSpecialValues": "false",
> >
> >      "enableBatchLoad": "false",
> >
> >      "upsertEnabled": "false",
> >
> >      "deleteEnabled": "false",
> >
> >      "mergeIntervalMs": "60_000L",
> >
> >      "mergeRecordsThreshold": "-1",
> >
> >      "autoCreateBucket": "true",
> >
> >      "allowNewBigQueryFields": "false",
> >
> >      "allowBigQueryRequiredFieldRelaxation": "false",
> >
> >      "allowSchemaUnionization": "false",
> >
> >      "kafkaDataFieldName": "null",
> >
> >      "kafkaKeyFieldName": "null"
> >
> > }
> >
> > Run as below
> >
> >
> > $KAFKA_HOME/bin/connect-standalone.sh \
> >
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> >
> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> >
> > I get this error
> >
> > [2021-03-11 11:07:58,826] ERROR Failed to create job for
> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > (org.apache.kafka.connect.cli.ConnectStandalone:102)
> > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > java.util.concurrent.ExecutionException:
> > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
> Connector
> > config {"defaultDataset"="test",,
> >
> >
> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
> > "project"="axial-glow-224522",, "autoCreateTables"="false",,
> > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
> > "bigQueryMessageTimePartitioning"="false",,
> > "connector.type"="bigquery-connector",,
> > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
> > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
> > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
> > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
> > "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
> > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
> > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
> > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
> > "enableBatchLoad"="false",,
> >
> >
> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
> > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
> > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
> > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
> > "kafkaDataFieldName"="null",, }=,
> > "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector
> type
> >
> > I think the problem is the wrong entry in the bigquery-sink.properties
> > file above.
> >
> > I cannot see what it is?
> >
> >
> > Any ideas appreciated.
> >
> >
> > Thanks
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising
> from
> > such loss, damage or destruction.
> >
>

Reply via email to