Thanks again Liam.

This is the error

[2021-03-12 14:17:54,670] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
java.lang.NoClassDefFoundError:
org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
        at
com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
       at
com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
        at
org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
        at
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
        at
org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
        at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
Caused by: java.lang.ClassNotFoundException:
*org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString*

It is indeed under $KAFKA_HOME/libs


cd $KAFKA_HOME/libs

 grep -lRi org.apache.kafka.common.config.ConfigDef

kafka-clients-1.1.0.jar

So it is in kafka-clients-1.1.0.jar file


I added the $KAFKA_HOME/libs to the CLASSPATH. It did not work. Then I
copied over the file to
share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib



grep -lRi
org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString

share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib/kafka-clients-1.1.0.jar

Still cannot find it!

In my plugin I have specified

plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins


So not sure where it is looking




Regards,


Mich



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 12 Mar 2021 at 13:44, Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
> should already have the common conf lib in there).
>
>
> On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <mich.talebza...@gmail.com
> >
> wrote:
>
> > Thanks Liam for the suggestion.
> >
> > This is the redone sink file (plain text)
> >
> > name=bigquery-sink
> > connector.type=bigquery-connector
> > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
> > defaultDataset=test
> > project=axial-glow-224522
> > topics=md
> > autoCreateTables=false
> > gcsBucketName=tmp_storage_bucket
> > queueSize=-1
> > bigQueryRetry=0
> > bigQueryRetryWait=1000
> > bigQueryMessageTimePartitioning=false
> > bigQueryPartitionDecorator=true
> > timePartitioningType=DAY
> > keySource=FILE
> > keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
> > sanitizeTopics=false
> >
> >
> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
> > threadPoolSize=10
> > allBQFieldsNullable=false
> > avroDataCacheSize=100
> > batchLoadIntervalSec=120
> > convertDoubleSpecialValues=false
> > enableBatchLoad=false
> > upsertEnabled=false
> > deleteEnabled=false
> > mergeIntervalMs=60_000L
> > mergeRecordsThreshold=-1
> > autoCreateBucket=true
> > allowNewBigQueryFields=false
> > allowBigQueryRequiredFieldRelaxation=false
> > allowSchemaUnionization=false
> > kafkaDataFieldName=null
> > kafkaKeyFieldName=null
> >
> > Now when I run the command
> >
> > $KAFKA_HOME/bin/connect-standalone.sh \
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> >
> > It comes back with this error:
> >
> > [2021-03-12 09:23:54,523] INFO REST server listening at
> > http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
> > (org.apache.kafka.connect.runtime.rest.RestServer:207)
> > [2021-03-12 09:23:54,523] INFO Kafka Connect started
> > (org.apache.kafka.connect.runtime.Connect:55)
> > [2021-03-12 09:23:54,534] ERROR Stopping after connector error
> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > java.lang.NoClassDefFoundError:
> > org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
> >         at
> >
> >
> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
> >         at
> >
> >
> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
> >         at
> > org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
> >         at
> >
> >
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
> >         at
> >
> >
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
> >         at
> >
> >
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
> > Caused by: java.lang.ClassNotFoundException:
> > org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
> >         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >         at
> >
> >
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >
> > I downloaded common-config-6.1.0.jar and added to lib directory in
> >
> > ..../wepay-kafka-connect-bigquery-2.1.0/lib
> >
> > But little joy I am afraid.
> >
> > Cheers,
> >
> > Mich
> >
> >
> >
> > LinkedIn *
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> > <
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> > >*
> >
> >
> >
> >
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising
> from
> > such loss, damage or destruction.
> >
> >
> >
> >
> > On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
> > liam.cla...@adscale.co.nz> wrote:
> >
> > > Hi Mich,
> > >
> > > Your bigquery-sink.properties file is in a JSON format - which won't
> > work.
> > > It needs to follow the usual format of a Java properties file.
> > >
> > > Kind regards,
> > >
> > > Liam Clarke-Hutchinson
> > >
> > > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
> > > mich.talebza...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > > Trying to stream from Kafka to Google BigQuery.
> > > >
> > > >
> > > >  The connect-standalone.properties is as follows
> > > >
> > > >
> > > > key.converter=org.apache.kafka.connect.storage.StringConverter
> > > >
> > > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
> > > >
> > > > value.converter=org.apache.kafka.connect.json.JsonConverter
> > > >
> > > > #
> > > >
> > > > # Converter-specific settings can be passed in by prefixing the
> > > Converter's
> > > >
> > > > # setting with the converter we want to apply it to
> > > >
> > > > key.converter.schemas.enable=true
> > > >
> > > > value.converter.schemas.enable=false
> > > >
> > > >
> > > > # The internal converter used for offsets and config data is
> > configurable
> > > > and
> > > >
> > > > # must be specified, but most users will always want to use the
> > built-in
> > > >
> > > > # default. Offset and config data is never visible outside of Kafka
> > > Connect
> > > > in
> > > >
> > > > # this format.
> > > >
> > > > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> > > >
> > > > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> > > >
> > > >
> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
> > > >
> > > >
> > >
> >
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
> > > >
> > > > internal.key.converter.schemas.enable=false
> > > >
> > > > internal.value.converter.schemas.enable=false
> > > >
> > > >
> > > > offset.storage.file.filename=/tmp/connect_bq.offsets
> > > >
> > > > # Flush much faster than normal, which is useful for
> testing/debugging
> > > >
> > > > offset.flush.interval.ms=10000
> > > >
> > > >
> > > > # Set to a list of filesystem paths separated by commas (,) to enable
> > > class
> > > >
> > > > # loading isolation for plugins (connectors, converters,
> > > transformations).
> > > > The
> > > >
> > > > # list should consist of top level directories that include any
> > > combination
> > > > of:
> > > >
> > > > # a) directories immediately containing jars with plugins and their
> > > > dependencies
> > > >
> > > > # b) uber-jars with plugins and their dependencies
> > > >
> > > > # c) directories immediately containing the package directory
> structure
> > > of
> > > >
> > > > # classes of plugins and their dependencies Note: symlinks will be
> > > followed
> > > > to
> > > >
> > > > # discover dependencies or plugins.
> > > >
> > > > # Examples:
> > > >
> > > >
> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
> > > >
> > > >
> > > > And bigquery-sink.properties file has this
> > > >
> > > >
> > > > {
> > > >
> > > >      "name": "bigquery-sink",
> > > >
> > > >      "connector.type": "bigquery-connector",
> > > >
> > > >      "connector.class":
> > > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
> > > >
> > > >      "defaultDataset": "test",
> > > >
> > > >      "project": "xyz",
> > > >
> > > >      "topics": "md",
> > > >
> > > >      "autoCreateTables": "false",
> > > >
> > > >      "gcsBucketName": "tmp_storage_bucket",
> > > >
> > > >      "queueSize": "-1",
> > > >
> > > >      "bigQueryRetry": "0",
> > > >
> > > >      "bigQueryRetryWait": "1000",
> > > >
> > > >      "bigQueryMessageTimePartitioning": "false",
> > > >
> > > >      "bigQueryPartitionDecorator": "true",
> > > >
> > > >      "timePartitioningType": "DAY",
> > > >
> > > >      "keySource": "FILE",
> > > >
> > > >      "keyfile": "/home/hduser/xyz.json",
> > > >
> > > >      "sanitizeTopics": "false",
> > > >
> > > >      "schemaRetriever":
> > > >
> > > >
> > >
> >
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
> > > >
> > > >      "threadPoolSize": "10",
> > > >
> > > >      "allBQFieldsNullable": "false",
> > > >
> > > >      "avroDataCacheSize": "100",
> > > >
> > > >      "batchLoadIntervalSec": "120",
> > > >
> > > >      "convertDoubleSpecialValues": "false",
> > > >
> > > >      "enableBatchLoad": "false",
> > > >
> > > >      "upsertEnabled": "false",
> > > >
> > > >      "deleteEnabled": "false",
> > > >
> > > >      "mergeIntervalMs": "60_000L",
> > > >
> > > >      "mergeRecordsThreshold": "-1",
> > > >
> > > >      "autoCreateBucket": "true",
> > > >
> > > >      "allowNewBigQueryFields": "false",
> > > >
> > > >      "allowBigQueryRequiredFieldRelaxation": "false",
> > > >
> > > >      "allowSchemaUnionization": "false",
> > > >
> > > >      "kafkaDataFieldName": "null",
> > > >
> > > >      "kafkaKeyFieldName": "null"
> > > >
> > > > }
> > > >
> > > > Run as below
> > > >
> > > >
> > > > $KAFKA_HOME/bin/connect-standalone.sh \
> > > >
> > > >
> > >
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> > > >
> > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > > >
> > > > I get this error
> > > >
> > > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
> > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
> > > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
> > > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > > > java.util.concurrent.ExecutionException:
> > > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
> > > Connector
> > > > config {"defaultDataset"="test",,
> > > >
> > > >
> > >
> >
> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
> > > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
> > > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
> > > > "bigQueryMessageTimePartitioning"="false",,
> > > > "connector.type"="bigquery-connector",,
> > > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
> > > > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
> > > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
> > > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
> > > > "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
> > > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
> > > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
> > > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
> > > > "enableBatchLoad"="false",,
> > > >
> > > >
> > >
> >
> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
> > > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
> > > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
> > > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
> > > > "kafkaDataFieldName"="null",, }=,
> > > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no
> connector
> > > type
> > > >
> > > > I think the problem is the wrong entry in the
> bigquery-sink.properties
> > > > file above.
> > > >
> > > > I cannot see what it is?
> > > >
> > > >
> > > > Any ideas appreciated.
> > > >
> > > >
> > > > Thanks
> > > >
> > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for
> > any
> > > > loss, damage or destruction of data or any other property which may
> > arise
> > > > from relying on this email's technical content is explicitly
> > disclaimed.
> > > > The author will in no case be liable for any monetary damages arising
> > > from
> > > > such loss, damage or destruction.
> > > >
> > >
> >
>

Reply via email to