I was able to make some progress,
setting CONNECT_VALUE_CONVERTER_KEY_SUBJECT_NAME_STRATEGY
and CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY env variables fixed
the issue.

However this makes it impossible to have a custom name strategy per
connector, just to sum up:
 - setting the value converter as env variable without the custom name
strategy works as long as you don't need a custom name strategy
 - setting the value converter globally and trying to override the name
strategy in a connector config doesn't work, it's just ignored
 - setting the value converter globally and in a connector (even without a
custom name strategy) generates:
    "java.lang.RuntimeException:
io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance
of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy"

Is the last case expected to have such exception? Or is the configuration
wrong?

--
Alessandro Tagliapietra



On Mon, Jan 6, 2020 at 10:31 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Hello everyone,
>
> I'm trying to run kafka connect to dump avro messages to S3, I'm having
> issues with the value converter. Our topics have a string key and an avro
> value.
>
> I'm running the docker image confluentinc/cp-kafka-connect:5.3.2 with the
> following env variables regarding value converter:
>
> CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
> CONNECT_INTERNAL_VALUE_CONVERTER:
> org.apache.kafka.connect.json.JsonConverter
> CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
> CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
> CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: xxx:xxx
> CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL:
> https://xxx.us-east-2.aws.confluent.cloud
>
>
> after setting up the S3 sink with this config
>
> {
>   "name": "raw-dump",
>   "config": {
>       "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>       "partitioner.class":
> "io.confluent.connect.storage.partitioner.HourlyPartitioner",
>       "locale": "en-US",
>       "timezone": "UTC",
>       "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
>       "tasks.max": "1",
>       "topics": "xxx",
>       "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>       "s3.region": "eu-west-1",
>       "s3.bucket.name": "confluent-kafka-connect-s3-testing",
>       "flush.size": 1000,
>       "key.converter": "org.apache.kafka.connect.storage.StringConverter"
>   }
> }
>
>
> I was getting this error:
>
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in
> error handler
> Caused by: org.apache.kafka.connect.errors.DataException: Failed to
> deserialize data for topic xxx to Avro:
> Caused by: org.apache.kafka.common.errors.SerializationException: Error
> retrieving Avro schema for id 100023
>
>
> log showed this value converter config:
>
> [2020-01-07 06:16:11,670] INFO AvroConverterConfig values:
>     bearer.auth.token = [hidden]
>     schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud]
>     basic.auth.user.info = [hidden]
>     auto.register.schemas = true
>     max.schemas.per.subject = 1000
>     basic.auth.credentials.source = USER_INFO
>     schema.registry.basic.auth.user.info = [hidden]
>     bearer.auth.credentials.source = STATIC_TOKEN
>     value.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>     key.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>  (io.confluent.connect.avro.AvroConverterConfig)
> [2020-01-07 06:16:11,694] INFO KafkaAvroSerializerConfig values:
>     bearer.auth.token = [hidden]
>     schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud]
>     basic.auth.user.info = [hidden]
>     auto.register.schemas = true
>     max.schemas.per.subject = 1000
>     basic.auth.credentials.source = USER_INFO
>     schema.registry.basic.auth.user.info = [hidden]
>     bearer.auth.credentials.source = STATIC_TOKEN
>     value.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>     key.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>  (io.confluent.kafka.serializers.KafkaAvroSerializerConfig)
> [2020-01-07 06:16:11,699] INFO KafkaAvroDeserializerConfig values:
>     bearer.auth.token = [hidden]
>     schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud]
>     basic.auth.user.info = [hidden]
>     auto.register.schemas = true
>     max.schemas.per.subject = 1000
>     basic.auth.credentials.source = USER_INFO
>     schema.registry.basic.auth.user.info = [hidden]
>     bearer.auth.credentials.source = STATIC_TOKEN
>     specific.avro.reader = false
>     value.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>     key.subject.name.strategy = class
> io.confluent.kafka.serializers.subject.TopicNameStrategy
>
> So I thought it was because of our custom name strategy. I've added our
> own Jar and I've noticed that:
>
>  - if I only add "value.converter.(key|value).subject.name.strategy" it's
> not being read by the connector
>  - if I add this config to the connector (so on both env variables and
> connector config):
>
> "value.converter": "io.confluent.connect.avro.AvroConverter",
> "value.converter.basic.auth.credentials.source": "USER_INFO",
> "value.converter.schema.registry.basic.auth.user.info": "xxx:xxx",
> "value.converter.schema.registry.url": "
> https://xxx.us-east-2.aws.confluent.cloud";,
> "value.converter.key.subject.name.strategy":
> "co.myapp.serializers.subject.EnvironmentTopicNameStrategy",
> "value.converter.value.subject.name.strategy":
> "co.myapp.serializers.subject.EnvironmentTopicNameStrategy"
>
>
>    it finally tries to use our custom EnvironmentTopicNameStrategy but I
> get: "java.lang.RuntimeException:
> com.myapp..serializers.subject.EnvironmentTopicNameStrategy is not an
> instance of
> io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy" even
> if the class extends TopicNameStrategy
>  - just adding value.converter (and auth/schema registry url) configs,
> results in the same error as above just with the default class:
> "java.lang.RuntimeException:
> io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance
> of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy"
>
> Any idea? It seems that adding value.converter to the connector breaks
> even the default serialization class.
>
> Thanks in advance
>
> --
> Alessandro Tagliapietra
>
>

Reply via email to