Hi Matthias Thanks for the reply. I think I don't have permission for this. If you can grant me permission I can create one (my handle is saimishra). Or you can go ahead and create one
I may need permission to create JIRA as I might report more issues after discussing with you over here. Regards Sai On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > Hi, > > sorry for late reply. Seems like a bug to me; within > Processor#process() accessing the context should work. Can you open a > JIRA for it? > > - -Matthias > > On 10/23/16 10:28 PM, saiprasad mishra wrote: > > Sorry for the email again > > > > I was expecting it to work always when accessed from process() > > method as this corresponds to each kafka message/record processing. > > I understand illegalstate by the time punctuate() is called as its > > already batched by time interval > > > > Regards Sai > > > > On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra > > <saiprasadmis...@gmail.com > >> wrote: > > > >> Hi > >> > >> his is with my streaming app kafka 10.1.0. > >> > >> My flow looks something like below > >> > >> source topic stream -> filter for null value ->map to make it > >> keyed by id ->custom processor to mystore -> to another topic -> > >> ktable > >> > >> I am hitting the below type of exception in a custom processor > >> class if I try to access offset() or partition() or timestamp() > >> from the ProcessorContext in the process() method. I was hoping > >> it would return the partition and offset for the enclosing > >> topic(in this case source topic) where its consuming from or -1 > >> based on the api docs. > >> > >> Looks like only in certain cases it is accessible. is it getting > >> lost in transformation phases. > >> > >> Same issue happens on if i try to access them in punctuate() > >> method but some where I saw that it might not work in > >> punctuate(). Any reason for this or any link describing this > >> will be helpful > >> > >> > >> ==================================================================== > >> > >> > >> > >> > java.lang.IllegalStateException: This should not happen as offset() > >> should only be called while a record is processed at > >> org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at > >> com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr > ocessorNode.java:82) > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] > >> at org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.kstream.internals.KStreamMap$ > >> KStreamMapProcessor.process(KStreamMap.java:43) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr > ocessorNode.java:82) > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] > >> at org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.kstream.internals.KStreamFilter$ > >> KStreamFilterProcessor.process(KStreamFilter.java:44) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr > ocessorNode.java:82) > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] > >> at org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals. > >> SourceNode.process(SourceNode.java:66) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals. > >> StreamTask.process(StreamTask.java:181) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(Str > eamThread.java:436) > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:242) > >> [kafka-streams-0.10.1.0.jar!/:?] > >> ===================================================================== > >> > >> > >> > >> > >> > Regards > >> Sai > >> > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYD5XWAAoJECnhiMLycopPUogQAJ6qawqVVmUORrGugiAC3/YM > ge0bvBSLbwCbys1wkm8vi17iRcMcZgYV1kUbspCBa8Ax7sA7YgmeqEYJpuCt6rRG > AXOepZ7WCF+q9NK8aLGTr94ymKMT4t5KlaBMmR9AMR0jAK8iGZJIYcwWHdzYQZz8 > DjY2lYmkkzAQSorx2s9v4AEU2LiCsug3jJY/3/uQYAQnEPmHG5IoOmHnQoWQqT8S > udLAtbzCRTcA3Fua5UE1P8KCQG2Pjw8DuDE5qxi0DWVmiuB+ASzp2V7+yVxLVotw > Okg2q1V0T9L0QorbwZ1nG6fys+OeOSIX3vg1KM8nUOCC2YbeGtueYqRte5ThE/Xp > 5rVXNIHXGzpcO1BeZT8BdDHcFc/4AR6fHZy0XFv6gHDRn4nsemwGOiNRADjhNaNp > cM9w2Bo8Wxo9qPz0fAnaYTTNt/J4h2RkycIcFTY2xvBVfmjJZwq9XVVwIXkIDnxN > sTxM6Czy4L7bcP+y6B/tqOG96cIJ5czKZwD7qwEOM9D0KIns2iM2wuQSgqU/vweY > bWiwqEkodg+X+CuJ/5nch5z6xw+6d2MNC/mkYik5pFL4Na4O7eNjoIclVlq7bgcu > hAJMx1B4flAoGqcRjUCRq39/fzKDEp1cJ1G4FjM8wWdPagumKQcgH51GkVn+6+HU > a36xOHjdT3P3j+wcu512 > =cUp7 > -----END PGP SIGNATURE----- >