-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hi,

sorry for late reply. Seems like a bug to me; within
Processor#process() accessing the context should work. Can you open a
JIRA for it?

- -Matthias

On 10/23/16 10:28 PM, saiprasad mishra wrote:
> Sorry for the email again
> 
> I was expecting it to work always when accessed from process() 
> method as this corresponds to each kafka message/record processing.
> I understand illegalstate by the time punctuate() is called as its
> already batched by time interval
> 
> Regards Sai
> 
> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra 
> <saiprasadmis...@gmail.com
>> wrote:
> 
>> Hi
>> 
>> his is with my streaming app kafka 10.1.0.
>> 
>> My flow looks something like below
>> 
>> source topic stream -> filter for null value ->map to make it 
>> keyed by id ->custom processor to mystore -> to another topic -> 
>> ktable
>> 
>> I am hitting the below type of exception in a custom processor 
>> class if I try to access offset() or partition() or timestamp() 
>> from the ProcessorContext in the process() method. I was hoping 
>> it would return the partition and offset for the enclosing 
>> topic(in this case source topic) where its consuming from or -1 
>> based on the api docs.
>> 
>> Looks like only in certain cases it is accessible. is it getting 
>> lost in transformation phases.
>> 
>> Same issue happens on if i try to access them in punctuate() 
>> method but some where I saw that it might not work in 
>> punctuate(). Any reason for this or any link describing this
>> will be helpful
>> 
>> 
>> ====================================================================
>>
>>
>>
>> 
java.lang.IllegalStateException: This should not happen as offset()
>> should only be called while a record is processed at 
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at 
>> com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at 
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
ocessorNode.java:82)
>>
>>
>> 
~[kafka-streams-0.10.1.0.jar!/:?]
>> at org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.kstream.internals.KStreamMap$ 
>> KStreamMapProcessor.process(KStreamMap.java:43) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
ocessorNode.java:82)
>>
>>
>> 
~[kafka-streams-0.10.1.0.jar!/:?]
>> at org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.kstream.internals.KStreamFilter$ 
>> KStreamFilterProcessor.process(KStreamFilter.java:44) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
ocessorNode.java:82)
>>
>>
>> 
~[kafka-streams-0.10.1.0.jar!/:?]
>> at org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals. 
>> SourceNode.process(SourceNode.java:66) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals. 
>> StreamTask.process(StreamTask.java:181) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(Str
eamThread.java:436)
>>
>>
>> 
~[kafka-streams-0.10.1.0.jar!/:?]
>> at org.apache.kafka.streams.processor.internals. 
>> StreamThread.run(StreamThread.java:242) 
>> [kafka-streams-0.10.1.0.jar!/:?] 
>> =====================================================================
>>
>>
>>
>>
>> 
Regards
>> Sai
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYD5XWAAoJECnhiMLycopPUogQAJ6qawqVVmUORrGugiAC3/YM
ge0bvBSLbwCbys1wkm8vi17iRcMcZgYV1kUbspCBa8Ax7sA7YgmeqEYJpuCt6rRG
AXOepZ7WCF+q9NK8aLGTr94ymKMT4t5KlaBMmR9AMR0jAK8iGZJIYcwWHdzYQZz8
DjY2lYmkkzAQSorx2s9v4AEU2LiCsug3jJY/3/uQYAQnEPmHG5IoOmHnQoWQqT8S
udLAtbzCRTcA3Fua5UE1P8KCQG2Pjw8DuDE5qxi0DWVmiuB+ASzp2V7+yVxLVotw
Okg2q1V0T9L0QorbwZ1nG6fys+OeOSIX3vg1KM8nUOCC2YbeGtueYqRte5ThE/Xp
5rVXNIHXGzpcO1BeZT8BdDHcFc/4AR6fHZy0XFv6gHDRn4nsemwGOiNRADjhNaNp
cM9w2Bo8Wxo9qPz0fAnaYTTNt/J4h2RkycIcFTY2xvBVfmjJZwq9XVVwIXkIDnxN
sTxM6Czy4L7bcP+y6B/tqOG96cIJ5czKZwD7qwEOM9D0KIns2iM2wuQSgqU/vweY
bWiwqEkodg+X+CuJ/5nch5z6xw+6d2MNC/mkYik5pFL4Na4O7eNjoIclVlq7bgcu
hAJMx1B4flAoGqcRjUCRq39/fzKDEp1cJ1G4FjM8wWdPagumKQcgH51GkVn+6+HU
a36xOHjdT3P3j+wcu512
=cUp7
-----END PGP SIGNATURE-----

Reply via email to