-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

What is your JIRA ID? We can add you to the contributor list to give
you permission.

- -Matthias


On 10/25/16 10:48 AM, saiprasad mishra wrote:
> Hi Matthias Thanks for the reply. I think I don't have permission
> for this. If you can grant me permission I can create one (my
> handle is saimishra). Or you can go ahead and create one
> 
> I may need permission to create JIRA as I might report more issues
> after discussing with you over here.
> 
> Regards Sai
> 
> On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax
> <matth...@confluent.io> wrote:
> 
> Hi,
> 
> sorry for late reply. Seems like a bug to me; within 
> Processor#process() accessing the context should work. Can you open
> a JIRA for it?
> 
> -Matthias
> 
> On 10/23/16 10:28 PM, saiprasad mishra wrote:
>>>> Sorry for the email again
>>>> 
>>>> I was expecting it to work always when accessed from
>>>> process() method as this corresponds to each kafka
>>>> message/record processing. I understand illegalstate by the
>>>> time punctuate() is called as its already batched by time
>>>> interval
>>>> 
>>>> Regards Sai
>>>> 
>>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra 
>>>> <saiprasadmis...@gmail.com
>>>>> wrote:
>>>> 
>>>>> Hi
>>>>> 
>>>>> his is with my streaming app kafka 10.1.0.
>>>>> 
>>>>> My flow looks something like below
>>>>> 
>>>>> source topic stream -> filter for null value ->map to make
>>>>> it keyed by id ->custom processor to mystore -> to another
>>>>> topic -> ktable
>>>>> 
>>>>> I am hitting the below type of exception in a custom
>>>>> processor class if I try to access offset() or partition()
>>>>> or timestamp() from the ProcessorContext in the process()
>>>>> method. I was hoping it would return the partition and
>>>>> offset for the enclosing topic(in this case source topic)
>>>>> where its consuming from or -1 based on the api docs.
>>>>> 
>>>>> Looks like only in certain cases it is accessible. is it
>>>>> getting lost in transformation phases.
>>>>> 
>>>>> Same issue happens on if i try to access them in
>>>>> punctuate() method but some where I saw that it might not
>>>>> work in punctuate(). Any reason for this or any link
>>>>> describing this will be helpful
>>>>> 
>>>>> 
>>>>> ==================================================================
==
>>>>>
>>>>>
>>>>>
>>>>>
>
>>>>> 
java.lang.IllegalStateException: This should not happen as offset()
>>>>> should only be called while a record is processed at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>> com.sai.repo.MyStore.process(MyStore.java:72)
>>>>> ~[classes!/:?] at 
>>>>> com.sai.repo.MyStore.process(MyStore.java:39)
>>>>> ~[classes!/:?] at 
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(Pr
>
>>>>> 
ocessorNode.java:82)
>>>>> 
>>>>> 
>>>>> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>>>>
>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>> org.apache.kafka.streams.kstream.internals.KStreamMap$ 
>>>>> KStreamMapProcessor.process(KStreamMap.java:43) 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(Pr
>
>>>>> 
ocessorNode.java:82)
>>>>> 
>>>>> 
>>>>> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>>>>
>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>> org.apache.kafka.streams.kstream.internals.KStreamFilter$ 
>>>>> KStreamFilterProcessor.process(KStreamFilter.java:44) 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(Pr
>
>>>>> 
ocessorNode.java:82)
>>>>> 
>>>>> 
>>>>> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>>>>
>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> SourceNode.process(SourceNode.java:66) 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.process(StreamTask.java:181) 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
Str
>
>>>>> 
eamThread.java:436)
>>>>> 
>>>>> 
>>>>> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> StreamThread.run(StreamThread.java:242) 
>>>>> [kafka-streams-0.10.1.0.jar!/:?] 
>>>>> ==================================================================
===
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>
>>>>> 
Regards
>>>>> Sai
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYD5yXAAoJECnhiMLycopP+Y4QAK8vkvLauOiJQrb2b56bwZIG
7uSMnCagjGYnpMnpI1pB88ccJhEMgCvKI8NctXzv967oRoNsRAREdbmF/Li3Vfot
u/Qkk4OUHQzHVu+vn+jEoIG3wZ2d6I40lQWKewJj7YAFhFgLcXLDO9qb447UMJgf
SNI30pmVXb0s6jrF2y20x/GLXlbQbdPrl0Rve5zK7uO1uHQ/i9K3k3Z3Xa34tHdj
XgKJfgDL2sX+1vm1Rlw6aOLesPfsv5L3WSnWVMia9c4+oWoox7UQ2sO2oq5bTVs0
ey2F2PGY7eiPHF/Bp2ffcRInH2X2XG+YNY3txl89x5hgeK5WylTnotabZzKaSYJt
/XOT7JdfmyBqxo9fpltX9JGuIEZh9Lck7a8+G+UlkFgwmMx9YRaYZtFDKw/u27BT
CMfUyhwvVXAC7Mgqi+D2Cvu7CBroe7vMp1Y0cnutDoAcFYPVe/143cyV+66EjX9d
PIkmcRGf99pj1yzJpZ8tZJr3O8iQXeDAVsf+imH+8KRWdbwjIZZXpGPlwr8Vts0G
mI0/3twjskrIkpTMPd2XtT9BqmbyrqvidMRLdDSq+sEEyEY1K3vqny0en3oYX8EX
8RC4MkJN/7rZjZyOsTWRA+J474qUJmK+m2MmLzp25Fr1ivcjd5XcQR77uGSpKjWR
qgTIsl1GY1zR1RLsoRKQ
=4hLW
-----END PGP SIGNATURE-----

Reply via email to