I was able to make some progress, setting CONNECT_VALUE_CONVERTER_KEY_SUBJECT_NAME_STRATEGY and CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY env variables fixed the issue.
However this makes it impossible to have a custom name strategy per connector, just to sum up: - setting the value converter as env variable without the custom name strategy works as long as you don't need a custom name strategy - setting the value converter globally and trying to override the name strategy in a connector config doesn't work, it's just ignored - setting the value converter globally and in a connector (even without a custom name strategy) generates: "java.lang.RuntimeException: io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy" Is the last case expected to have such exception? Or is the configuration wrong? -- Alessandro Tagliapietra On Mon, Jan 6, 2020 at 10:31 PM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > Hello everyone, > > I'm trying to run kafka connect to dump avro messages to S3, I'm having > issues with the value converter. Our topics have a string key and an avro > value. > > I'm running the docker image confluentinc/cp-kafka-connect:5.3.2 with the > following env variables regarding value converter: > > CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" > CONNECT_INTERNAL_VALUE_CONVERTER: > org.apache.kafka.connect.json.JsonConverter > CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter > CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: xxx:xxx > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: > https://xxx.us-east-2.aws.confluent.cloud > > > after setting up the S3 sink with this config > > { > "name": "raw-dump", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "partitioner.class": > "io.confluent.connect.storage.partitioner.HourlyPartitioner", > "locale": "en-US", > "timezone": "UTC", > "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", > "tasks.max": "1", > "topics": "xxx", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "s3.region": "eu-west-1", > "s3.bucket.name": "confluent-kafka-connect-s3-testing", > "flush.size": 1000, > "key.converter": "org.apache.kafka.connect.storage.StringConverter" > } > } > > > I was getting this error: > > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in > error handler > Caused by: org.apache.kafka.connect.errors.DataException: Failed to > deserialize data for topic xxx to Avro: > Caused by: org.apache.kafka.common.errors.SerializationException: Error > retrieving Avro schema for id 100023 > > > log showed this value converter config: > > [2020-01-07 06:16:11,670] INFO AvroConverterConfig values: > bearer.auth.token = [hidden] > schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud] > basic.auth.user.info = [hidden] > auto.register.schemas = true > max.schemas.per.subject = 1000 > basic.auth.credentials.source = USER_INFO > schema.registry.basic.auth.user.info = [hidden] > bearer.auth.credentials.source = STATIC_TOKEN > value.subject.name.strategy = class > io.confluent.kafka.serializers.subject.TopicNameStrategy > key.subject.name.strategy = class > io.confluent.kafka.serializers.subject.TopicNameStrategy > (io.confluent.connect.avro.AvroConverterConfig) > [2020-01-07 06:16:11,694] INFO KafkaAvroSerializerConfig values: > bearer.auth.token = [hidden] > schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud] > basic.auth.user.info = [hidden] > auto.register.schemas = true > max.schemas.per.subject = 1000 > basic.auth.credentials.source = USER_INFO > schema.registry.basic.auth.user.info = [hidden] > bearer.auth.credentials.source = STATIC_TOKEN > value.subject.name.strategy = class > io.confluent.kafka.serializers.subject.TopicNameStrategy > key.subject.name.strategy = class > io.confluent.kafka.serializers.subject.TopicNameStrategy > (io.confluent.kafka.serializers.KafkaAvroSerializerConfig) > [2020-01-07 06:16:11,699] INFO KafkaAvroDeserializerConfig values: > bearer.auth.token = [hidden] > schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud] > basic.auth.user.info = [hidden] > auto.register.schemas = true > max.schemas.per.subject = 1000 > basic.auth.credentials.source = USER_INFO > schema.registry.basic.auth.user.info = [hidden] > bearer.auth.credentials.source = STATIC_TOKEN > specific.avro.reader = false > value.subject.name.strategy = class > io.confluent.kafka.serializers.subject.TopicNameStrategy > key.subject.name.strategy = class > io.confluent.kafka.serializers.subject.TopicNameStrategy > > So I thought it was because of our custom name strategy. I've added our > own Jar and I've noticed that: > > - if I only add "value.converter.(key|value).subject.name.strategy" it's > not being read by the connector > - if I add this config to the connector (so on both env variables and > connector config): > > "value.converter": "io.confluent.connect.avro.AvroConverter", > "value.converter.basic.auth.credentials.source": "USER_INFO", > "value.converter.schema.registry.basic.auth.user.info": "xxx:xxx", > "value.converter.schema.registry.url": " > https://xxx.us-east-2.aws.confluent.cloud", > "value.converter.key.subject.name.strategy": > "co.myapp.serializers.subject.EnvironmentTopicNameStrategy", > "value.converter.value.subject.name.strategy": > "co.myapp.serializers.subject.EnvironmentTopicNameStrategy" > > > it finally tries to use our custom EnvironmentTopicNameStrategy but I > get: "java.lang.RuntimeException: > com.myapp..serializers.subject.EnvironmentTopicNameStrategy is not an > instance of > io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy" even > if the class extends TopicNameStrategy > - just adding value.converter (and auth/schema registry url) configs, > results in the same error as above just with the default class: > "java.lang.RuntimeException: > io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance > of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy" > > Any idea? It seems that adding value.converter to the connector breaks > even the default serialization class. > > Thanks in advance > > -- > Alessandro Tagliapietra > >