Hallo,

I’m trying to send a Kafka record to RabbitMQ with headers. I have added 2 
headers to the Kafka record see function below.

public Boolean writeToTopic(ArrayList<BundleRecord> listOfKafkaRecords, String 
bootstrapServer, String topicName){
    Producer producer = createKafkaProducer(bootstrapServer);
    listOfKafkaRecords.forEach((record) -> {
        ProducerRecord recordKafka = new ProducerRecord(topicName, null, 
record);
        recordKafka.headers().add(new 
RecordHeader("type","MyMessage".getBytes()));
        recordKafka.headers().add(new 
RecordHeader("content_type","text/plain".getBytes()));
        producer.send(recordKafka);
    });
    producer.close();
    return true;
}

The headers are visible in the Kafka topic see image below.

[cid:image001.png@01D87B4D.F8D77E40]

When adding a CAMEL-RABBITMQ-KAFKA-CONNECTOR the headers are lost in RabbitMQ 
see image below.

[cid:image002.png@01D87B4E.40B10C40]

My camel connector set up is:

{
  "name": "rabbitmq-test-connector",
  "config": {
    "connector.class" : 
"org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSinkConnector",
    "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
    "value.converter" : "org.apache.kafka.connect.storage.StringConverter",
    "topics": "topicname",
    "camel.component.rabbitmq.hostname" : "rabbitmq-test",
    "camel.component.rabbitmq.portnumber" : 5672,
    "camel.component.rabbitmq.username" : "username",
    "camel.component.rabbitmq.password": "password",
    "camel.sink.path.exchangeName": "sinkExchange",
    "camel.sink.endpoint.exchangeType" :"topic",
    "camel.sink.endpoint.autoDelete" : "false",
    "camel.sink.endpoint.queue" : "endpointqueue",
    "camel.sink.endpoint.routingKey" : "key",
    "camel.sink.endpoint.vhost": "vhost"
  }
}

My question is: Why are the headers not available in RabbitMQ and how do I have 
to add to get the headers in RabbitMQ?

Kinds regards,

Tundzaj


Reply via email to