Hello everyone, I'm trying to run kafka connect to dump avro messages to S3, I'm having issues with the value converter. Our topics have a string key and an avro value.
I'm running the docker image confluentinc/cp-kafka-connect:5.3.2 with the following env variables regarding value converter: CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: xxx:xxx CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: https://xxx.us-east-2.aws.confluent.cloud after setting up the S3 sink with this config { "name": "raw-dump", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner", "locale": "en-US", "timezone": "UTC", "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", "tasks.max": "1", "topics": "xxx", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.region": "eu-west-1", "s3.bucket.name": "confluent-kafka-connect-s3-testing", "flush.size": 1000, "key.converter": "org.apache.kafka.connect.storage.StringConverter" } } I was getting this error: org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic xxx to Avro: Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 100023 log showed this value converter config: [2020-01-07 06:16:11,670] INFO AvroConverterConfig values: bearer.auth.token = [hidden] schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud] basic.auth.user.info = [hidden] auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = USER_INFO schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy (io.confluent.connect.avro.AvroConverterConfig) [2020-01-07 06:16:11,694] INFO KafkaAvroSerializerConfig values: bearer.auth.token = [hidden] schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud] basic.auth.user.info = [hidden] auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = USER_INFO schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy (io.confluent.kafka.serializers.KafkaAvroSerializerConfig) [2020-01-07 06:16:11,699] INFO KafkaAvroDeserializerConfig values: bearer.auth.token = [hidden] schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud] basic.auth.user.info = [hidden] auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = USER_INFO schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN specific.avro.reader = false value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy So I thought it was because of our custom name strategy. I've added our own Jar and I've noticed that: - if I only add "value.converter.(key|value).subject.name.strategy" it's not being read by the connector - if I add this config to the connector (so on both env variables and connector config): "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.basic.auth.credentials.source": "USER_INFO", "value.converter.schema.registry.basic.auth.user.info": "xxx:xxx", "value.converter.schema.registry.url": " https://xxx.us-east-2.aws.confluent.cloud", "value.converter.key.subject.name.strategy": "co.myapp.serializers.subject.EnvironmentTopicNameStrategy", "value.converter.value.subject.name.strategy": "co.myapp.serializers.subject.EnvironmentTopicNameStrategy" it finally tries to use our custom EnvironmentTopicNameStrategy but I get: "java.lang.RuntimeException: com.myapp..serializers.subject.EnvironmentTopicNameStrategy is not an instance of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy" even if the class extends TopicNameStrategy - just adding value.converter (and auth/schema registry url) configs, results in the same error as above just with the default class: "java.lang.RuntimeException: io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy" Any idea? It seems that adding value.converter to the connector breaks even the default serialization class. Thanks in advance -- Alessandro Tagliapietra