Hey folks,

I'm trying to intercept outgoing Kafka messages before they're sent to
do some basic tracking (ie get the actual assigned partition number,
which isn't available in the StreamTask or MessageCollector classes).
I've written a class that conforms to Kafka's ProducerInterceptor
interface [0] and have added the following line to my streamtask
properties file:

systems.kafka.producer.interceptor.classes=com.cavulus.kafka.CavulusKafkaProducerInterceptor

Here's that class' package and signature:

package com.cavulus.kafka;
import org.apache.kafka.clients.producer.ProducerInterceptor;
...
public class CavulusKafkaProducerInterceptor implements
ProducerInterceptor { .. }

The class compiles fine, so I assume that I'm adhering correctly to
the interface, but when I try to start up the task in YARN, it stalls
-- no logging, no errors, just nothing. I have no problem starting
this streamtask when omitting the above line from my properties file.

KIP-42 [1] explicitly references Samza as an inspiration for adding
the ProducerInterceptor to Kafka 0.10, so I've been assuming some sort
of compatibility (but I could be mistaken).

Has anybody attempted this sort of combination?

Cheers,
Malcolm McFarland
Cavulus

[0] 
https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html
[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors

This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
unauthorized or improper disclosure, copying, distribution, or use of
the contents of this message is prohibited. The information contained
in this message is intended only for the personal and confidential use
of the recipient(s) named above. If you have received this message in
error, please notify the sender immediately and delete the original
message.

Reply via email to