Oleksandr Konopko created KAFKA-7035:
----------------------------------------

             Summary: Kafka Processor's init() method sometimes is not called
                 Key: KAFKA-7035
                 URL: https://issues.apache.org/jira/browse/KAFKA-7035
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 1.0.0
            Reporter: Oleksandr Konopko


Scenario:

1. We have processing of Kafka Topic which is implemented with Processor API

2. We want to collect metrics (lets say just count number of processed entities 
for simplicity)

3. How we tried to organize this
 * process data with process() method and send it down the stream with context
 * on each call of process() method update the counter
 * schedule puctuate function which will send metric to special topic. Metric 
is build with counter

You can find the code (we removed all business sensitive code out of it, so it 
should be easy to read) in attachment

 

Problematic Kafka Streams behaviour that i can see by logging every step:

1. We have 800000 messages in the input topic

2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, 
ProcessorB, ProcessorC and ProcessorD

3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed correctly, 
results are sent down the stream. Counter is upated

4. init() method was not called for ProcessorA and ProcessorB

5. ProcessorC and ProcessorD are created and they start to receive all the rest 
of data. 95-99%

6. init() method is called for both ProcessorC and ProcessorD. It initiates 
punctuation, which causes Metrics message be created and sent down the metric 
stream periodically

7. ProcessorA and ProcessorB are closed. init() was never called for them. So 
Metric entity was not sent to metrics topic

8. Processing is finished.

 

In the end:

Expected:
 * 800000 entities were processed and sent to the Sink
 * Metrics entities contain counters which sum up to 800000

Actual results:
 * 800000 entities were processed and sent to the Sink
 * Metrics entities contain counters which sum up to some number 3-6% less than 
800000, for example 786543

 

Problem:
 * init() method call is not guaranteed
 * there is no way to guarantee that all work was done by punctuate method 
before close()

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to