Hey y'all, just wanted to follow up on this: does anybody have a working example of using Kafka's ProducerInterceptor interface under YARN that I could take a look at, or that they could provide some guidance with?
More generally, we're trying to keep an eye on Samza SSP instances that occasionally seem to go "zombie"; ie, the AM doesn't see any problems, no errors are surfaced, but they've just stop listening for messages. My idea here is to record the topic/partition/offset on each message delivery, and record the subsequent topic/partition/offset when Samza picks up the message for processing. Are there any ideas out there about how to do this? Cheers, Malcolm McFarland Cavulus This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any unauthorized or improper disclosure, copying, distribution, or use of the contents of this message is prohibited. The information contained in this message is intended only for the personal and confidential use of the recipient(s) named above. If you have received this message in error, please notify the sender immediately and delete the original message. Malcolm McFarland Cavulus This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any unauthorized or improper disclosure, copying, distribution, or use of the contents of this message is prohibited. The information contained in this message is intended only for the personal and confidential use of the recipient(s) named above. If you have received this message in error, please notify the sender immediately and delete the original message. On Tue, Sep 10, 2019 at 3:13 PM Malcolm McFarland <mmcfarl...@cavulus.com> wrote: > Hey folks, > > I'm trying to intercept outgoing Kafka messages before they're sent to > do some basic tracking (ie get the actual assigned partition number, > which isn't available in the StreamTask or MessageCollector classes). > I've written a class that conforms to Kafka's ProducerInterceptor > interface [0] and have added the following line to my streamtask > properties file: > > > systems.kafka.producer.interceptor.classes=com.cavulus.kafka.CavulusKafkaProducerInterceptor > > Here's that class' package and signature: > > package com.cavulus.kafka; > import org.apache.kafka.clients.producer.ProducerInterceptor; > ... > public class CavulusKafkaProducerInterceptor implements > ProducerInterceptor { .. } > > The class compiles fine, so I assume that I'm adhering correctly to > the interface, but when I try to start up the task in YARN, it stalls > -- no logging, no errors, just nothing. I have no problem starting > this streamtask when omitting the above line from my properties file. > > KIP-42 [1] explicitly references Samza as an inspiration for adding > the ProducerInterceptor to Kafka 0.10, so I've been assuming some sort > of compatibility (but I could be mistaken). > > Has anybody attempted this sort of combination? > > Cheers, > Malcolm McFarland > Cavulus > > [0] > https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html > [1] > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > > This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any > unauthorized or improper disclosure, copying, distribution, or use of > the contents of this message is prohibited. The information contained > in this message is intended only for the personal and confidential use > of the recipient(s) named above. If you have received this message in > error, please notify the sender immediately and delete the original > message. >