Hello everyone,

I'm trying to run kafka connect to dump avro messages to S3, I'm having
issues with the value converter. Our topics have a string key and an avro
value.

I'm running the docker image confluentinc/cp-kafka-connect:5.3.2 with the
following env variables regarding value converter:

CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_VALUE_CONVERTER:
org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: xxx:xxx
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL:
https://xxx.us-east-2.aws.confluent.cloud


after setting up the S3 sink with this config

{
  "name": "raw-dump",
  "config": {
      "connector.class": "io.confluent.connect.s3.S3SinkConnector",
      "partitioner.class":
"io.confluent.connect.storage.partitioner.HourlyPartitioner",
      "locale": "en-US",
      "timezone": "UTC",
      "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
      "tasks.max": "1",
      "topics": "xxx",
      "storage.class": "io.confluent.connect.s3.storage.S3Storage",
      "s3.region": "eu-west-1",
      "s3.bucket.name": "confluent-kafka-connect-s3-testing",
      "flush.size": 1000,
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}


I was getting this error:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in
error handler
Caused by: org.apache.kafka.connect.errors.DataException: Failed to
deserialize data for topic xxx to Avro:
Caused by: org.apache.kafka.common.errors.SerializationException: Error
retrieving Avro schema for id 100023


log showed this value converter config:

[2020-01-07 06:16:11,670] INFO AvroConverterConfig values:
    bearer.auth.token = [hidden]
    schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = USER_INFO
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    value.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.connect.avro.AvroConverterConfig)
[2020-01-07 06:16:11,694] INFO KafkaAvroSerializerConfig values:
    bearer.auth.token = [hidden]
    schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = USER_INFO
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    value.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroSerializerConfig)
[2020-01-07 06:16:11,699] INFO KafkaAvroDeserializerConfig values:
    bearer.auth.token = [hidden]
    schema.registry.url = [https://xxx.us-east-2.aws.confluent.cloud]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = USER_INFO
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    specific.avro.reader = false
    value.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class
io.confluent.kafka.serializers.subject.TopicNameStrategy

So I thought it was because of our custom name strategy. I've added our own
Jar and I've noticed that:

 - if I only add "value.converter.(key|value).subject.name.strategy" it's
not being read by the connector
 - if I add this config to the connector (so on both env variables and
connector config):

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "xxx:xxx",
"value.converter.schema.registry.url": "
https://xxx.us-east-2.aws.confluent.cloud";,
"value.converter.key.subject.name.strategy":
"co.myapp.serializers.subject.EnvironmentTopicNameStrategy",
"value.converter.value.subject.name.strategy":
"co.myapp.serializers.subject.EnvironmentTopicNameStrategy"


   it finally tries to use our custom EnvironmentTopicNameStrategy but I
get: "java.lang.RuntimeException:
com.myapp..serializers.subject.EnvironmentTopicNameStrategy is not an
instance of
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy" even
if the class extends TopicNameStrategy
 - just adding value.converter (and auth/schema registry url) configs,
results in the same error as above just with the default class:
"java.lang.RuntimeException:
io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance
of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy"

Any idea? It seems that adding value.converter to the connector breaks even
the default serialization class.

Thanks in advance

--
Alessandro Tagliapietra

Reply via email to