[ https://issues.apache.org/jira/browse/KAFKA-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-7035. ------------------------------------ Resolution: Abandoned > Kafka Processor's init() method sometimes is not called > ------------------------------------------------------- > > Key: KAFKA-7035 > URL: https://issues.apache.org/jira/browse/KAFKA-7035 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: Oleksandr Konopko > Priority: Critical > Attachments: TransformProcessor.java > > > Scenario: > 1. We have processing of Kafka Topic which is implemented with Processor API > 2. We want to collect metrics (lets say just count number of processed > entities for simplicity) > 3. How we tried to organize this > * process data with process() method and send it down the stream with context > * on each call of process() method update the counter > * schedule puctuate function which will send metric to special topic. Metric > is build with counter > You can find the code (we removed all business sensitive code out of it, so > it should be easy to read) in attachment > > Problematic Kafka Streams behaviour that i can see by logging every step: > 1. We have 800000 messages in the input topic > 2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, > ProcessorB, ProcessorC and ProcessorD > 3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed > correctly, results are sent down the stream. Counter is upated > 4. init() method was not called for ProcessorA and ProcessorB > 5. ProcessorC and ProcessorD are created and they start to receive all the > rest of data. 95-99% > 6. init() method is called for both ProcessorC and ProcessorD. It initiates > punctuation, which causes Metrics message be created and sent down the metric > stream periodically > 7. ProcessorA and ProcessorB are closed. init() was never called for them. So > Metric entity was not sent to metrics topic > 8. Processing is finished. > > In the end: > Expected: > * 800000 entities were processed and sent to the Sink > * Metrics entities contain counters which sum up to 800000 > Actual results: > * 800000 entities were processed and sent to the Sink > * Metrics entities contain counters which sum up to some number 3-6% less > than 800000, for example 786543 > > Problem: > * init() method call is not guaranteed > * there is no way to guarantee that all work was done by punctuate method > before close() > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)