Sorry for the email again I was expecting it to work always when accessed from process() method as this corresponds to each kafka message/record processing. I understand illegalstate by the time punctuate() is called as its already batched by time interval
Regards Sai On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra <saiprasadmis...@gmail.com > wrote: > Hi > > his is with my streaming app kafka 10.1.0. > > My flow looks something like below > > source topic stream -> filter for null value ->map to make it keyed by id > ->custom processor to mystore -> to another topic -> ktable > > I am hitting the below type of exception in a custom processor class if I > try to access offset() or partition() or timestamp() from the > ProcessorContext in the process() method. I was hoping it would return the > partition and offset for the enclosing topic(in this case source topic) > where its consuming from or -1 based on the api docs. > > Looks like only in certain cases it is accessible. is it getting lost in > transformation phases. > > Same issue happens on if i try to access them in punctuate() method but > some where I saw that it might not work in punctuate(). Any reason for this > or any link describing this will be helpful > > > ==================================================================== > > java.lang.IllegalStateException: This should not happen as offset() > should only be called while a record is processed > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > ~[kafka-streams-0.10.1.0.jar!/:?] > at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] > at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.kstream.internals.KStreamMap$ > KStreamMapProcessor.process(KStreamMap.java:43) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.kstream.internals.KStreamFilter$ > KStreamFilterProcessor.process(KStreamFilter.java:44) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:181) ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?] > ===================================================================== > > > Regards > Sai >