-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Thx! :)

On 10/25/16 2:03 PM, saiprasad mishra wrote:
> Just created the JIRA
> 
> https://issues.apache.org/jira/browse/KAFKA-4344
> 
> Regards Sai
> 
> On Tue, Oct 25, 2016 at 11:59 AM, saiprasad mishra < 
> saiprasadmis...@gmail.com> wrote:
> 
>> My JIRA id is saimishra
>> 
>> Regards Sai
>> 
>> On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax
>> <matth...@confluent.io> wrote:
>> 
> What is your JIRA ID? We can add you to the contributor list to
> give you permission.
> 
> -Matthias
> 
> 
> On 10/25/16 10:48 AM, saiprasad mishra wrote:
>>>>> Hi Matthias Thanks for the reply. I think I don't have
>>>>> permission for this. If you can grant me permission I can
>>>>> create one (my handle is saimishra). Or you can go ahead
>>>>> and create one
>>>>> 
>>>>> I may need permission to create JIRA as I might report more
>>>>> issues after discussing with you over here.
>>>>> 
>>>>> Regards Sai
>>>>> 
>>>>> On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax 
>>>>> <matth...@confluent.io> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> sorry for late reply. Seems like a bug to me; within 
>>>>> Processor#process() accessing the context should work. Can
>>>>> you open a JIRA for it?
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 10/23/16 10:28 PM, saiprasad mishra wrote:
>>>>>>>> Sorry for the email again
>>>>>>>> 
>>>>>>>> I was expecting it to work always when accessed from 
>>>>>>>> process() method as this corresponds to each kafka 
>>>>>>>> message/record processing. I understand illegalstate
>>>>>>>> by the time punctuate() is called as its already
>>>>>>>> batched by time interval
>>>>>>>> 
>>>>>>>> Regards Sai
>>>>>>>> 
>>>>>>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra 
>>>>>>>> <saiprasadmis...@gmail.com
>>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi
>>>>>>>>> 
>>>>>>>>> his is with my streaming app kafka 10.1.0.
>>>>>>>>> 
>>>>>>>>> My flow looks something like below
>>>>>>>>> 
>>>>>>>>> source topic stream -> filter for null value ->map
>>>>>>>>> to make it keyed by id ->custom processor to
>>>>>>>>> mystore -> to another topic -> ktable
>>>>>>>>> 
>>>>>>>>> I am hitting the below type of exception in a
>>>>>>>>> custom processor class if I try to access offset()
>>>>>>>>> or partition() or timestamp() from the
>>>>>>>>> ProcessorContext in the process() method. I was
>>>>>>>>> hoping it would return the partition and offset for
>>>>>>>>> the enclosing topic(in this case source topic) 
>>>>>>>>> where its consuming from or -1 based on the api
>>>>>>>>> docs.
>>>>>>>>> 
>>>>>>>>> Looks like only in certain cases it is accessible.
>>>>>>>>> is it getting lost in transformation phases.
>>>>>>>>> 
>>>>>>>>> Same issue happens on if i try to access them in 
>>>>>>>>> punctuate() method but some where I saw that it
>>>>>>>>> might not work in punctuate(). Any reason for this
>>>>>>>>> or any link describing this will be helpful
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> ==============================================================
====
>
>>>>>>>>> 
==
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>> 
>>>>>>>>> 
> java.lang.IllegalStateException: This should not happen as
> offset()
>>>>>>>>> should only be called while a record is processed
>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
>>>>>>>>>
>>>>>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>>>>>> com.sai.repo.MyStore.process(MyStore.java:72) 
>>>>>>>>> ~[classes!/:?] at 
>>>>>>>>> com.sai.repo.MyStore.process(MyStore.java:39) 
>>>>>>>>> ~[classes!/:?] at 
>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pro
cess
>
>>>>>>>>> 
(Pr
>>>>> 
>>>>>>>>> 
> ocessorNode.java:82)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>>>>>>>>
>>>>>>>>>
>
>>>>>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMap$
>>>>>>>>>
>>>>>>>>> 
KStreamMapProcessor.process(KStreamMap.java:43)
>>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pro
cess
>
>>>>>>>>> 
(Pr
>>>>> 
>>>>>>>>> 
> ocessorNode.java:82)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>>>>>>>>
>>>>>>>>>
>
>>>>>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamFilter$
>>>>>>>>>
>>>>>>>>> 
KStreamFilterProcessor.process(KStreamFilter.java:44)
>>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pro
cess
>
>>>>>>>>> 
(Pr
>>>>> 
>>>>>>>>> 
> ocessorNode.java:82)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>>>>>>>>
>>>>>>>>>
>
>>>>>>>>> 
~[kafka-streams-0.10.1.0.jar!/:?] at
>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>> SourceNode.process(SourceNode.java:66) 
>>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>> StreamTask.process(StreamTask.java:181) 
>>>>>>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runL
oop(
>
>>>>>>>>> 
Str
>>>>> 
>>>>>>>>> 
> eamThread.java:436)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>> ~[kafka-streams-0.10.1.0.jar!/:?]
>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>> StreamThread.run(StreamThread.java:242) 
>>>>>>>>> [kafka-streams-0.10.1.0.jar!/:?] 
>>>>>>>>> ==============================================================
====
>
>>>>>>>>> 
===
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>> 
>>>>>>>>> 
> Regards
>>>>>>>>> Sai
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYD991AAoJECnhiMLycopPoTsQAJTJ5WoftXEIcTUOyeJW2QT7
FOi/SKBNZOgIl7MKjpUFE1c1LZED5iWp/f1+JzzMP3IwrXmLNMpaa2qTAtulp7XM
xsuVg8W+KJ1oxDMrErknegXoI7BxI4Q6+VewANWS7l6b2CObKPCqryDJ5ak0G9Tx
kDB6j8Tvkg9dIfyP3HmgGrkrcpxJqUt0VLf9pSAnCNE67Ikg/uufi8QP8B9ZbvHZ
YJtf0HUdSjEqoLsWI0cvuZOkkCKqasTwyF4AIomuon5Lf3SzLmhQOgXiJoDVyG+r
LsvOP+uPlEi5i54vETlJ2+EWVr8LO9HVIzxg7Fmm0m99Z1tsYIapfBhHCyCUNMDu
nFBN00v2Auu90959X901JEhRipeJmBcpHwzHdw3raqnwMRruVq3OyWZs0wGnkEej
kSEHK/j50UZZ+RYEkneFe17vWws0JDhka5ginUXUzPWvbDwXS5VB5diDNYVYHnRW
ZtlIOGtwF72ZoBRQO1i3iwnebjM2U8U36W13mj/Qt72uD2tD3fZiLU922rLIQWTd
praF/k+rHgBoDyM045cdvx+Fc5a7xEMzGQRDl2WfHwijq7tmMNBFQQJjpkPckbxE
stF17J+AfoAYRullXP4awVEGJULmwpsl0eUCIenZ1lEe+o4i2v9AD8XbxQzLKOnX
QG6qCk0H26/+1owryB8l
=PcVz
-----END PGP SIGNATURE-----

Reply via email to