Hello,

I'm trying to use a custom converter with Kafka Connect and I cannot seem
to get it right. I'm hoping someone has experience with this and could help
me figure it out !


Initial situation
================

- my custom converter's class path is 'custom.CustomStringConverter'.

- to avoid any mistakes, my custom converter is currently just a copy/paste
of the pre-existing StringConverter (of course, this will change when I'll
get it to work).
https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java

- I have a kafka connect cluster of 3 nodes, The nodes are running
confluent's official docker images ( confluentinc/cp-kafka-connect:3.3.0 ).

- Each node is configured to load a jar with my converter in it (using a
docker volume).



What happens ?
================

When the connectors start, they correctly load the jars and find the custom
converter. Indeed, this is what I see in the logs :

[2017-10-10 13:06:46,274] INFO Registered loader:
PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar}
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter'
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and
'CustomString' to plugin 'custom.CustomStringConverter'
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)

I then POST a JSON config to one of the connector nodes to create my
connector :

{
  "name": "hdfsSinkCustom",
  "config": {
    "topics": "yellow",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "custom.CustomStringConverter",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
    "topics.dir": "yellow_storage",
    "flush.size": "1",
    "rotate.interval.ms": "1000"
  }
}

And receive the following reply :

{
   "error_code": 400,
   "message": "Connector configuration is invalid and contains the
following 1 error(s):\nInvalid value custom.CustomStringConverter for
configuration value.converter: Class custom.CustomStringConverter could not
be found.\nYou can also find the above list of errors at the endpoint
`/{connectorType}/config/validate`"
}

================

If I try running Kafka Connect stadnalone, the error message is the same.

Has anybody faced this already ? What am I missing ?

Many thanks to anybody reading this !

Jehan

Reply via email to