My JIRA id is saimishra Regards Sai
On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > What is your JIRA ID? We can add you to the contributor list to give > you permission. > > - -Matthias > > > On 10/25/16 10:48 AM, saiprasad mishra wrote: > > Hi Matthias Thanks for the reply. I think I don't have permission > > for this. If you can grant me permission I can create one (my > > handle is saimishra). Or you can go ahead and create one > > > > I may need permission to create JIRA as I might report more issues > > after discussing with you over here. > > > > Regards Sai > > > > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax > > <matth...@confluent.io> wrote: > > > > Hi, > > > > sorry for late reply. Seems like a bug to me; within > > Processor#process() accessing the context should work. Can you open > > a JIRA for it? > > > > -Matthias > > > > On 10/23/16 10:28 PM, saiprasad mishra wrote: > >>>> Sorry for the email again > >>>> > >>>> I was expecting it to work always when accessed from > >>>> process() method as this corresponds to each kafka > >>>> message/record processing. I understand illegalstate by the > >>>> time punctuate() is called as its already batched by time > >>>> interval > >>>> > >>>> Regards Sai > >>>> > >>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra > >>>> <saiprasadmis...@gmail.com > >>>>> wrote: > >>>> > >>>>> Hi > >>>>> > >>>>> his is with my streaming app kafka 10.1.0. > >>>>> > >>>>> My flow looks something like below > >>>>> > >>>>> source topic stream -> filter for null value ->map to make > >>>>> it keyed by id ->custom processor to mystore -> to another > >>>>> topic -> ktable > >>>>> > >>>>> I am hitting the below type of exception in a custom > >>>>> processor class if I try to access offset() or partition() > >>>>> or timestamp() from the ProcessorContext in the process() > >>>>> method. I was hoping it would return the partition and > >>>>> offset for the enclosing topic(in this case source topic) > >>>>> where its consuming from or -1 based on the api docs. > >>>>> > >>>>> Looks like only in certain cases it is accessible. is it > >>>>> getting lost in transformation phases. > >>>>> > >>>>> Same issue happens on if i try to access them in > >>>>> punctuate() method but some where I saw that it might not > >>>>> work in punctuate(). Any reason for this or any link > >>>>> describing this will be helpful > >>>>> > >>>>> > >>>>> ================================================================== > == > >>>>> > >>>>> > >>>>> > >>>>> > > > >>>>> > java.lang.IllegalStateException: This should not happen as offset() > >>>>> should only be called while a record is processed at > >>>>> org.apache.kafka.streams.processor.internals. > >>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at > >>>>> com.sai.repo.MyStore.process(MyStore.java:72) > >>>>> ~[classes!/:?] at > >>>>> com.sai.repo.MyStore.process(MyStore.java:39) > >>>>> ~[classes!/:?] at > >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process > (Pr > > > >>>>> > ocessorNode.java:82) > >>>>> > >>>>> > >>>>> > > ~[kafka-streams-0.10.1.0.jar!/:?] > >>>>> at org.apache.kafka.streams.processor.internals. > >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >>>>> > >>>>> > ~[kafka-streams-0.10.1.0.jar!/:?] at > >>>>> org.apache.kafka.streams.kstream.internals.KStreamMap$ > >>>>> KStreamMapProcessor.process(KStreamMap.java:43) > >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at > >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process > (Pr > > > >>>>> > ocessorNode.java:82) > >>>>> > >>>>> > >>>>> > > ~[kafka-streams-0.10.1.0.jar!/:?] > >>>>> at org.apache.kafka.streams.processor.internals. > >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >>>>> > >>>>> > ~[kafka-streams-0.10.1.0.jar!/:?] at > >>>>> org.apache.kafka.streams.kstream.internals.KStreamFilter$ > >>>>> KStreamFilterProcessor.process(KStreamFilter.java:44) > >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at > >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process > (Pr > > > >>>>> > ocessorNode.java:82) > >>>>> > >>>>> > >>>>> > > ~[kafka-streams-0.10.1.0.jar!/:?] > >>>>> at org.apache.kafka.streams.processor.internals. > >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >>>>> > >>>>> > ~[kafka-streams-0.10.1.0.jar!/:?] at > >>>>> org.apache.kafka.streams.processor.internals. > >>>>> SourceNode.process(SourceNode.java:66) > >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at > >>>>> org.apache.kafka.streams.processor.internals. > >>>>> StreamTask.process(StreamTask.java:181) > >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at > >>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > Str > > > >>>>> > eamThread.java:436) > >>>>> > >>>>> > >>>>> > > ~[kafka-streams-0.10.1.0.jar!/:?] > >>>>> at org.apache.kafka.streams.processor.internals. > >>>>> StreamThread.run(StreamThread.java:242) > >>>>> [kafka-streams-0.10.1.0.jar!/:?] > >>>>> ================================================================== > === > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > > > >>>>> > Regards > >>>>> Sai > >>>>> > >>>> > >> > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYD5yXAAoJECnhiMLycopP+Y4QAK8vkvLauOiJQrb2b56bwZIG > 7uSMnCagjGYnpMnpI1pB88ccJhEMgCvKI8NctXzv967oRoNsRAREdbmF/Li3Vfot > u/Qkk4OUHQzHVu+vn+jEoIG3wZ2d6I40lQWKewJj7YAFhFgLcXLDO9qb447UMJgf > SNI30pmVXb0s6jrF2y20x/GLXlbQbdPrl0Rve5zK7uO1uHQ/i9K3k3Z3Xa34tHdj > XgKJfgDL2sX+1vm1Rlw6aOLesPfsv5L3WSnWVMia9c4+oWoox7UQ2sO2oq5bTVs0 > ey2F2PGY7eiPHF/Bp2ffcRInH2X2XG+YNY3txl89x5hgeK5WylTnotabZzKaSYJt > /XOT7JdfmyBqxo9fpltX9JGuIEZh9Lck7a8+G+UlkFgwmMx9YRaYZtFDKw/u27BT > CMfUyhwvVXAC7Mgqi+D2Cvu7CBroe7vMp1Y0cnutDoAcFYPVe/143cyV+66EjX9d > PIkmcRGf99pj1yzJpZ8tZJr3O8iQXeDAVsf+imH+8KRWdbwjIZZXpGPlwr8Vts0G > mI0/3twjskrIkpTMPd2XtT9BqmbyrqvidMRLdDSq+sEEyEY1K3vqny0en3oYX8EX > 8RC4MkJN/7rZjZyOsTWRA+J474qUJmK+m2MmLzp25Fr1ivcjd5XcQR77uGSpKjWR > qgTIsl1GY1zR1RLsoRKQ > =4hLW > -----END PGP SIGNATURE----- >