saiprasad mishra created KAFKA-4344:
---------------------------------------

             Summary: Exception when accessing partition, offset and timestamp 
in processor class
                 Key: KAFKA-4344
                 URL: https://issues.apache.org/jira/browse/KAFKA-4344
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.1.0
            Reporter: saiprasad mishra
            Assignee: Guozhang Wang
            Priority: Minor


I have a kafka stream pipeline like below
source topic stream -> filter for null value ->map to make it keyed by id 
->custom processor to mystore ->to another topic -> ktable


I am hitting the below type of exception in a custom processor class if I try 
to access offset() or partition() or timestamp() from the ProcessorContext in 
the process() method. I was hoping it would return the partition and offset for 
the enclosing topic(in this case source topic) where its consuming from or -1 
based on the api docs.


java.lang.IllegalStateException: This should not happen as offset() should only 
be called while a record is processed
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
 ~[kafka-streams-0.10.1.0.jar!/:?]
        at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
        at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
 ~[kafka-streams-0.10.1.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
 ~[kafka-streams-0.10.1.0.jar!/:?]
        at 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
 ~[kafka-streams-0.10.1.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
 ~[kafka-streams-0.10.1.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
 ~[kafka-streams-0.10.1.0.jar!/:?]
        at 
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
 ~[kafka-streams-0.10.1.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
 ~[kafka-streams-0.10.1.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
 ~[kafka-streams-0.10.1.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
 ~[kafka-streams-0.10.1.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
 ~[kafka-streams-0.10.1.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
 ~[kafka-streams-0.10.1.0.jar!/:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
 [kafka-streams-0.10.1.0.jar!/:?]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to