Just created the JIRA https://issues.apache.org/jira/browse/KAFKA-4344
Regards Sai On Tue, Oct 25, 2016 at 11:59 AM, saiprasad mishra < saiprasadmis...@gmail.com> wrote: > My JIRA id is saimishra > > Regards > Sai > > On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> -----BEGIN PGP SIGNED MESSAGE----- >> Hash: SHA512 >> >> What is your JIRA ID? We can add you to the contributor list to give >> you permission. >> >> - -Matthias >> >> >> On 10/25/16 10:48 AM, saiprasad mishra wrote: >> > Hi Matthias Thanks for the reply. I think I don't have permission >> > for this. If you can grant me permission I can create one (my >> > handle is saimishra). Or you can go ahead and create one >> > >> > I may need permission to create JIRA as I might report more issues >> > after discussing with you over here. >> > >> > Regards Sai >> > >> > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax >> > <matth...@confluent.io> wrote: >> > >> > Hi, >> > >> > sorry for late reply. Seems like a bug to me; within >> > Processor#process() accessing the context should work. Can you open >> > a JIRA for it? >> > >> > -Matthias >> > >> > On 10/23/16 10:28 PM, saiprasad mishra wrote: >> >>>> Sorry for the email again >> >>>> >> >>>> I was expecting it to work always when accessed from >> >>>> process() method as this corresponds to each kafka >> >>>> message/record processing. I understand illegalstate by the >> >>>> time punctuate() is called as its already batched by time >> >>>> interval >> >>>> >> >>>> Regards Sai >> >>>> >> >>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra >> >>>> <saiprasadmis...@gmail.com >> >>>>> wrote: >> >>>> >> >>>>> Hi >> >>>>> >> >>>>> his is with my streaming app kafka 10.1.0. >> >>>>> >> >>>>> My flow looks something like below >> >>>>> >> >>>>> source topic stream -> filter for null value ->map to make >> >>>>> it keyed by id ->custom processor to mystore -> to another >> >>>>> topic -> ktable >> >>>>> >> >>>>> I am hitting the below type of exception in a custom >> >>>>> processor class if I try to access offset() or partition() >> >>>>> or timestamp() from the ProcessorContext in the process() >> >>>>> method. I was hoping it would return the partition and >> >>>>> offset for the enclosing topic(in this case source topic) >> >>>>> where its consuming from or -1 based on the api docs. >> >>>>> >> >>>>> Looks like only in certain cases it is accessible. is it >> >>>>> getting lost in transformation phases. >> >>>>> >> >>>>> Same issue happens on if i try to access them in >> >>>>> punctuate() method but some where I saw that it might not >> >>>>> work in punctuate(). Any reason for this or any link >> >>>>> describing this will be helpful >> >>>>> >> >>>>> >> >>>>> ================================================================== >> == >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> > >> >>>>> >> java.lang.IllegalStateException: This should not happen as offset() >> >>>>> should only be called while a record is processed at >> >>>>> org.apache.kafka.streams.processor.internals. >> >>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) >> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >> >>>>> com.sai.repo.MyStore.process(MyStore.java:72) >> >>>>> ~[classes!/:?] at >> >>>>> com.sai.repo.MyStore.process(MyStore.java:39) >> >>>>> ~[classes!/:?] at >> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process >> (Pr >> > >> >>>>> >> ocessorNode.java:82) >> >>>>> >> >>>>> >> >>>>> >> > ~[kafka-streams-0.10.1.0.jar!/:?] >> >>>>> at org.apache.kafka.streams.processor.internals. >> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> >>>>> >> >>>>> >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> >>>>> org.apache.kafka.streams.kstream.internals.KStreamMap$ >> >>>>> KStreamMapProcessor.process(KStreamMap.java:43) >> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process >> (Pr >> > >> >>>>> >> ocessorNode.java:82) >> >>>>> >> >>>>> >> >>>>> >> > ~[kafka-streams-0.10.1.0.jar!/:?] >> >>>>> at org.apache.kafka.streams.processor.internals. >> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> >>>>> >> >>>>> >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> >>>>> org.apache.kafka.streams.kstream.internals.KStreamFilter$ >> >>>>> KStreamFilterProcessor.process(KStreamFilter.java:44) >> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process >> (Pr >> > >> >>>>> >> ocessorNode.java:82) >> >>>>> >> >>>>> >> >>>>> >> > ~[kafka-streams-0.10.1.0.jar!/:?] >> >>>>> at org.apache.kafka.streams.processor.internals. >> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> >>>>> >> >>>>> >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> >>>>> org.apache.kafka.streams.processor.internals. >> >>>>> SourceNode.process(SourceNode.java:66) >> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >> >>>>> org.apache.kafka.streams.processor.internals. >> >>>>> StreamTask.process(StreamTask.java:181) >> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >> >>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> Str >> > >> >>>>> >> eamThread.java:436) >> >>>>> >> >>>>> >> >>>>> >> > ~[kafka-streams-0.10.1.0.jar!/:?] >> >>>>> at org.apache.kafka.streams.processor.internals. >> >>>>> StreamThread.run(StreamThread.java:242) >> >>>>> [kafka-streams-0.10.1.0.jar!/:?] >> >>>>> ================================================================== >> === >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> > >> >>>>> >> Regards >> >>>>> Sai >> >>>>> >> >>>> >> >> >> > >> -----BEGIN PGP SIGNATURE----- >> Comment: GPGTools - https://gpgtools.org >> >> iQIcBAEBCgAGBQJYD5yXAAoJECnhiMLycopP+Y4QAK8vkvLauOiJQrb2b56bwZIG >> 7uSMnCagjGYnpMnpI1pB88ccJhEMgCvKI8NctXzv967oRoNsRAREdbmF/Li3Vfot >> u/Qkk4OUHQzHVu+vn+jEoIG3wZ2d6I40lQWKewJj7YAFhFgLcXLDO9qb447UMJgf >> SNI30pmVXb0s6jrF2y20x/GLXlbQbdPrl0Rve5zK7uO1uHQ/i9K3k3Z3Xa34tHdj >> XgKJfgDL2sX+1vm1Rlw6aOLesPfsv5L3WSnWVMia9c4+oWoox7UQ2sO2oq5bTVs0 >> ey2F2PGY7eiPHF/Bp2ffcRInH2X2XG+YNY3txl89x5hgeK5WylTnotabZzKaSYJt >> /XOT7JdfmyBqxo9fpltX9JGuIEZh9Lck7a8+G+UlkFgwmMx9YRaYZtFDKw/u27BT >> CMfUyhwvVXAC7Mgqi+D2Cvu7CBroe7vMp1Y0cnutDoAcFYPVe/143cyV+66EjX9d >> PIkmcRGf99pj1yzJpZ8tZJr3O8iQXeDAVsf+imH+8KRWdbwjIZZXpGPlwr8Vts0G >> mI0/3twjskrIkpTMPd2XtT9BqmbyrqvidMRLdDSq+sEEyEY1K3vqny0en3oYX8EX >> 8RC4MkJN/7rZjZyOsTWRA+J474qUJmK+m2MmLzp25Fr1ivcjd5XcQR77uGSpKjWR >> qgTIsl1GY1zR1RLsoRKQ >> =4hLW >> -----END PGP SIGNATURE----- >> > >