Hi Matthias
Thanks for the reply.
I think I don't have permission for this.
If you can grant me permission I can create one (my handle is saimishra).
Or you can go ahead and create one

I may need permission to create JIRA as I might report more issues after
discussing with you over here.

Regards
Sai

On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hi,
>
> sorry for late reply. Seems like a bug to me; within
> Processor#process() accessing the context should work. Can you open a
> JIRA for it?
>
> - -Matthias
>
> On 10/23/16 10:28 PM, saiprasad mishra wrote:
> > Sorry for the email again
> >
> > I was expecting it to work always when accessed from process()
> > method as this corresponds to each kafka message/record processing.
> > I understand illegalstate by the time punctuate() is called as its
> > already batched by time interval
> >
> > Regards Sai
> >
> > On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra
> > <saiprasadmis...@gmail.com
> >> wrote:
> >
> >> Hi
> >>
> >> his is with my streaming app kafka 10.1.0.
> >>
> >> My flow looks something like below
> >>
> >> source topic stream -> filter for null value ->map to make it
> >> keyed by id ->custom processor to mystore -> to another topic ->
> >> ktable
> >>
> >> I am hitting the below type of exception in a custom processor
> >> class if I try to access offset() or partition() or timestamp()
> >> from the ProcessorContext in the process() method. I was hoping
> >> it would return the partition and offset for the enclosing
> >> topic(in this case source topic) where its consuming from or -1
> >> based on the api docs.
> >>
> >> Looks like only in certain cases it is accessible. is it getting
> >> lost in transformation phases.
> >>
> >> Same issue happens on if i try to access them in punctuate()
> >> method but some where I saw that it might not work in
> >> punctuate(). Any reason for this or any link describing this
> >> will be helpful
> >>
> >>
> >> ====================================================================
> >>
> >>
> >>
> >>
> java.lang.IllegalStateException: This should not happen as offset()
> >> should only be called while a record is processed at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at
> >> com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
> ocessorNode.java:82)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.kstream.internals.KStreamMap$
> >> KStreamMapProcessor.process(KStreamMap.java:43)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
> ocessorNode.java:82)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.kstream.internals.KStreamFilter$
> >> KStreamFilterProcessor.process(KStreamFilter.java:44)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
> ocessorNode.java:82)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.
> >> SourceNode.process(SourceNode.java:66)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:181)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(Str
> eamThread.java:436)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:242)
> >> [kafka-streams-0.10.1.0.jar!/:?]
> >> =====================================================================
> >>
> >>
> >>
> >>
> >>
> Regards
> >> Sai
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYD5XWAAoJECnhiMLycopPUogQAJ6qawqVVmUORrGugiAC3/YM
> ge0bvBSLbwCbys1wkm8vi17iRcMcZgYV1kUbspCBa8Ax7sA7YgmeqEYJpuCt6rRG
> AXOepZ7WCF+q9NK8aLGTr94ymKMT4t5KlaBMmR9AMR0jAK8iGZJIYcwWHdzYQZz8
> DjY2lYmkkzAQSorx2s9v4AEU2LiCsug3jJY/3/uQYAQnEPmHG5IoOmHnQoWQqT8S
> udLAtbzCRTcA3Fua5UE1P8KCQG2Pjw8DuDE5qxi0DWVmiuB+ASzp2V7+yVxLVotw
> Okg2q1V0T9L0QorbwZ1nG6fys+OeOSIX3vg1KM8nUOCC2YbeGtueYqRte5ThE/Xp
> 5rVXNIHXGzpcO1BeZT8BdDHcFc/4AR6fHZy0XFv6gHDRn4nsemwGOiNRADjhNaNp
> cM9w2Bo8Wxo9qPz0fAnaYTTNt/J4h2RkycIcFTY2xvBVfmjJZwq9XVVwIXkIDnxN
> sTxM6Czy4L7bcP+y6B/tqOG96cIJ5czKZwD7qwEOM9D0KIns2iM2wuQSgqU/vweY
> bWiwqEkodg+X+CuJ/5nch5z6xw+6d2MNC/mkYik5pFL4Na4O7eNjoIclVlq7bgcu
> hAJMx1B4flAoGqcRjUCRq39/fzKDEp1cJ1G4FjM8wWdPagumKQcgH51GkVn+6+HU
> a36xOHjdT3P3j+wcu512
> =cUp7
> -----END PGP SIGNATURE-----
>

Reply via email to