Thanks Liam for the suggestion. This is the redone sink file (plain text)
name=bigquery-sink connector.type=bigquery-connector connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector defaultDataset=test project=axial-glow-224522 topics=md autoCreateTables=false gcsBucketName=tmp_storage_bucket queueSize=-1 bigQueryRetry=0 bigQueryRetryWait=1000 bigQueryMessageTimePartitioning=false bigQueryPartitionDecorator=true timePartitioningType=DAY keySource=FILE keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json sanitizeTopics=false schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever threadPoolSize=10 allBQFieldsNullable=false avroDataCacheSize=100 batchLoadIntervalSec=120 convertDoubleSpecialValues=false enableBatchLoad=false upsertEnabled=false deleteEnabled=false mergeIntervalMs=60_000L mergeRecordsThreshold=-1 autoCreateBucket=true allowNewBigQueryFields=false allowBigQueryRequiredFieldRelaxation=false allowSchemaUnionization=false kafkaDataFieldName=null kafkaKeyFieldName=null Now when I run the command $KAFKA_HOME/bin/connect-standalone.sh \ /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \ /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties It comes back with this error: [2021-03-12 09:23:54,523] INFO REST server listening at http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:207) [2021-03-12 09:23:54,523] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55) [2021-03-12 09:23:54,534] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113) java.lang.NoClassDefFoundError: org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString at com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505) at com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79) at org.apache.kafka.connect.connector.Connector.validate(Connector.java:132) at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) I downloaded common-config-6.1.0.jar and added to lib directory in ..../wepay-kafka-connect-bigquery-2.1.0/lib But little joy I am afraid. Cheers, Mich LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson < liam.cla...@adscale.co.nz> wrote: > Hi Mich, > > Your bigquery-sink.properties file is in a JSON format - which won't work. > It needs to follow the usual format of a Java properties file. > > Kind regards, > > Liam Clarke-Hutchinson > > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh < > mich.talebza...@gmail.com> > wrote: > > > Hi, > > > > > > Trying to stream from Kafka to Google BigQuery. > > > > > > The connect-standalone.properties is as follows > > > > > > key.converter=org.apache.kafka.connect.storage.StringConverter > > > > ##value.converter=org.apache.kafka.connect.storage.StringConverter > > > > value.converter=org.apache.kafka.connect.json.JsonConverter > > > > # > > > > # Converter-specific settings can be passed in by prefixing the > Converter's > > > > # setting with the converter we want to apply it to > > > > key.converter.schemas.enable=true > > > > value.converter.schemas.enable=false > > > > > > # The internal converter used for offsets and config data is configurable > > and > > > > # must be specified, but most users will always want to use the built-in > > > > # default. Offset and config data is never visible outside of Kafka > Connect > > in > > > > # this format. > > > > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter > > > > internal.value.converter=org.apache.kafka.connect.json.JsonConverter > > > > internal.key.converter=org.apache.kafka.connect.storage.StringConverter > > > > > ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter > > > > internal.key.converter.schemas.enable=false > > > > internal.value.converter.schemas.enable=false > > > > > > offset.storage.file.filename=/tmp/connect_bq.offsets > > > > # Flush much faster than normal, which is useful for testing/debugging > > > > offset.flush.interval.ms=10000 > > > > > > # Set to a list of filesystem paths separated by commas (,) to enable > class > > > > # loading isolation for plugins (connectors, converters, > transformations). > > The > > > > # list should consist of top level directories that include any > combination > > of: > > > > # a) directories immediately containing jars with plugins and their > > dependencies > > > > # b) uber-jars with plugins and their dependencies > > > > # c) directories immediately containing the package directory structure > of > > > > # classes of plugins and their dependencies Note: symlinks will be > followed > > to > > > > # discover dependencies or plugins. > > > > # Examples: > > > > plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins > > > > > > And bigquery-sink.properties file has this > > > > > > { > > > > "name": "bigquery-sink", > > > > "connector.type": "bigquery-connector", > > > > "connector.class": > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", > > > > "defaultDataset": "test", > > > > "project": "xyz", > > > > "topics": "md", > > > > "autoCreateTables": "false", > > > > "gcsBucketName": "tmp_storage_bucket", > > > > "queueSize": "-1", > > > > "bigQueryRetry": "0", > > > > "bigQueryRetryWait": "1000", > > > > "bigQueryMessageTimePartitioning": "false", > > > > "bigQueryPartitionDecorator": "true", > > > > "timePartitioningType": "DAY", > > > > "keySource": "FILE", > > > > "keyfile": "/home/hduser/xyz.json", > > > > "sanitizeTopics": "false", > > > > "schemaRetriever": > > > > > "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever", > > > > "threadPoolSize": "10", > > > > "allBQFieldsNullable": "false", > > > > "avroDataCacheSize": "100", > > > > "batchLoadIntervalSec": "120", > > > > "convertDoubleSpecialValues": "false", > > > > "enableBatchLoad": "false", > > > > "upsertEnabled": "false", > > > > "deleteEnabled": "false", > > > > "mergeIntervalMs": "60_000L", > > > > "mergeRecordsThreshold": "-1", > > > > "autoCreateBucket": "true", > > > > "allowNewBigQueryFields": "false", > > > > "allowBigQueryRequiredFieldRelaxation": "false", > > > > "allowSchemaUnionization": "false", > > > > "kafkaDataFieldName": "null", > > > > "kafkaKeyFieldName": "null" > > > > } > > > > Run as below > > > > > > $KAFKA_HOME/bin/connect-standalone.sh \ > > > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \ > > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties > > > > I get this error > > > > [2021-03-11 11:07:58,826] ERROR Failed to create job for > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties > > (org.apache.kafka.connect.cli.ConnectStandalone:102) > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error > > (org.apache.kafka.connect.cli.ConnectStandalone:113) > > java.util.concurrent.ExecutionException: > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException: > Connector > > config {"defaultDataset"="test",, > > > > > "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",, > > "project"="axial-glow-224522",, "autoCreateTables"="false",, > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",, > > "bigQueryMessageTimePartitioning"="false",, > > "connector.type"="bigquery-connector",, > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",, > > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",, > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",, > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",, > > "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",, > > "keySource"="FILE",, "allowNewBigQueryFields"="false",, > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",, > > "threadPoolSize"="10",, "timePartitioningType"="DAY",, > > "enableBatchLoad"="false",, > > > > > "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",, > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",, > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=, > > "avroDataCacheSize"="100",, "upsertEnabled"="false",, > > "kafkaDataFieldName"="null",, }=, > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector > type > > > > I think the problem is the wrong entry in the bigquery-sink.properties > > file above. > > > > I cannot see what it is? > > > > > > Any ideas appreciated. > > > > > > Thanks > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > > loss, damage or destruction of data or any other property which may arise > > from relying on this email's technical content is explicitly disclaimed. > > The author will in no case be liable for any monetary damages arising > from > > such loss, damage or destruction. > > >