-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Thx! :)
On 10/25/16 2:03 PM, saiprasad mishra wrote: > Just created the JIRA > > https://issues.apache.org/jira/browse/KAFKA-4344 > > Regards Sai > > On Tue, Oct 25, 2016 at 11:59 AM, saiprasad mishra < > saiprasadmis...@gmail.com> wrote: > >> My JIRA id is saimishra >> >> Regards Sai >> >> On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax >> <matth...@confluent.io> wrote: >> > What is your JIRA ID? We can add you to the contributor list to > give you permission. > > -Matthias > > > On 10/25/16 10:48 AM, saiprasad mishra wrote: >>>>> Hi Matthias Thanks for the reply. I think I don't have >>>>> permission for this. If you can grant me permission I can >>>>> create one (my handle is saimishra). Or you can go ahead >>>>> and create one >>>>> >>>>> I may need permission to create JIRA as I might report more >>>>> issues after discussing with you over here. >>>>> >>>>> Regards Sai >>>>> >>>>> On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax >>>>> <matth...@confluent.io> wrote: >>>>> >>>>> Hi, >>>>> >>>>> sorry for late reply. Seems like a bug to me; within >>>>> Processor#process() accessing the context should work. Can >>>>> you open a JIRA for it? >>>>> >>>>> -Matthias >>>>> >>>>> On 10/23/16 10:28 PM, saiprasad mishra wrote: >>>>>>>> Sorry for the email again >>>>>>>> >>>>>>>> I was expecting it to work always when accessed from >>>>>>>> process() method as this corresponds to each kafka >>>>>>>> message/record processing. I understand illegalstate >>>>>>>> by the time punctuate() is called as its already >>>>>>>> batched by time interval >>>>>>>> >>>>>>>> Regards Sai >>>>>>>> >>>>>>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra >>>>>>>> <saiprasadmis...@gmail.com >>>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi >>>>>>>>> >>>>>>>>> his is with my streaming app kafka 10.1.0. >>>>>>>>> >>>>>>>>> My flow looks something like below >>>>>>>>> >>>>>>>>> source topic stream -> filter for null value ->map >>>>>>>>> to make it keyed by id ->custom processor to >>>>>>>>> mystore -> to another topic -> ktable >>>>>>>>> >>>>>>>>> I am hitting the below type of exception in a >>>>>>>>> custom processor class if I try to access offset() >>>>>>>>> or partition() or timestamp() from the >>>>>>>>> ProcessorContext in the process() method. I was >>>>>>>>> hoping it would return the partition and offset for >>>>>>>>> the enclosing topic(in this case source topic) >>>>>>>>> where its consuming from or -1 based on the api >>>>>>>>> docs. >>>>>>>>> >>>>>>>>> Looks like only in certain cases it is accessible. >>>>>>>>> is it getting lost in transformation phases. >>>>>>>>> >>>>>>>>> Same issue happens on if i try to access them in >>>>>>>>> punctuate() method but some where I saw that it >>>>>>>>> might not work in punctuate(). Any reason for this >>>>>>>>> or any link describing this will be helpful >>>>>>>>> >>>>>>>>> >>>>>>>>> ============================================================== ==== > >>>>>>>>> == >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>> >>>>>>>>> > java.lang.IllegalStateException: This should not happen as > offset() >>>>>>>>> should only be called while a record is processed >>>>>>>>> at org.apache.kafka.streams.processor.internals. >>>>>>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) >>>>>>>>> >>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >>>>>>>>> com.sai.repo.MyStore.process(MyStore.java:72) >>>>>>>>> ~[classes!/:?] at >>>>>>>>> com.sai.repo.MyStore.process(MyStore.java:39) >>>>>>>>> ~[classes!/:?] at >>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pro cess > >>>>>>>>> (Pr >>>>> >>>>>>>>> > ocessorNode.java:82) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] >>>>>>>>> at org.apache.kafka.streams.processor.internals. >>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >>>>>>>>> >>>>>>>>> > >>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMap$ >>>>>>>>> >>>>>>>>> KStreamMapProcessor.process(KStreamMap.java:43) >>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pro cess > >>>>>>>>> (Pr >>>>> >>>>>>>>> > ocessorNode.java:82) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] >>>>>>>>> at org.apache.kafka.streams.processor.internals. >>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >>>>>>>>> >>>>>>>>> > >>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamFilter$ >>>>>>>>> >>>>>>>>> KStreamFilterProcessor.process(KStreamFilter.java:44) >>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pro cess > >>>>>>>>> (Pr >>>>> >>>>>>>>> > ocessorNode.java:82) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] >>>>>>>>> at org.apache.kafka.streams.processor.internals. >>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >>>>>>>>> >>>>>>>>> > >>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >>>>>>>>> org.apache.kafka.streams.processor.internals. >>>>>>>>> SourceNode.process(SourceNode.java:66) >>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >>>>>>>>> org.apache.kafka.streams.processor.internals. >>>>>>>>> StreamTask.process(StreamTask.java:181) >>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runL oop( > >>>>>>>>> Str >>>>> >>>>>>>>> > eamThread.java:436) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] >>>>>>>>> at org.apache.kafka.streams.processor.internals. >>>>>>>>> StreamThread.run(StreamThread.java:242) >>>>>>>>> [kafka-streams-0.10.1.0.jar!/:?] >>>>>>>>> ============================================================== ==== > >>>>>>>>> === >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>> >>>>>>>>> > Regards >>>>>>>>> Sai >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> >> > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYD991AAoJECnhiMLycopPoTsQAJTJ5WoftXEIcTUOyeJW2QT7 FOi/SKBNZOgIl7MKjpUFE1c1LZED5iWp/f1+JzzMP3IwrXmLNMpaa2qTAtulp7XM xsuVg8W+KJ1oxDMrErknegXoI7BxI4Q6+VewANWS7l6b2CObKPCqryDJ5ak0G9Tx kDB6j8Tvkg9dIfyP3HmgGrkrcpxJqUt0VLf9pSAnCNE67Ikg/uufi8QP8B9ZbvHZ YJtf0HUdSjEqoLsWI0cvuZOkkCKqasTwyF4AIomuon5Lf3SzLmhQOgXiJoDVyG+r LsvOP+uPlEi5i54vETlJ2+EWVr8LO9HVIzxg7Fmm0m99Z1tsYIapfBhHCyCUNMDu nFBN00v2Auu90959X901JEhRipeJmBcpHwzHdw3raqnwMRruVq3OyWZs0wGnkEej kSEHK/j50UZZ+RYEkneFe17vWws0JDhka5ginUXUzPWvbDwXS5VB5diDNYVYHnRW ZtlIOGtwF72ZoBRQO1i3iwnebjM2U8U36W13mj/Qt72uD2tD3fZiLU922rLIQWTd praF/k+rHgBoDyM045cdvx+Fc5a7xEMzGQRDl2WfHwijq7tmMNBFQQJjpkPckbxE stF17J+AfoAYRullXP4awVEGJULmwpsl0eUCIenZ1lEe+o4i2v9AD8XbxQzLKOnX QG6qCk0H26/+1owryB8l =PcVz -----END PGP SIGNATURE-----