[ 
https://issues.apache.org/jira/browse/KAFKA-4344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15609687#comment-15609687
 ] 

saiprasad mishra commented on KAFKA-4344:
-----------------------------------------

Thanks [~guozhang] for your time on testing this and sorry for my late reply.
I think this is not a bug and this is how kafka streams should be used combined 
with other frameworks

 I just figured out that this was all caused by my spring boot app(which has 
spring ioc container) and how it instantiates the java classes used by kafka 
streams. The result was it would not start the kafka stream thread as while 
accessing the offset from context in process() method throws the above 
exception.

I confirm that the regular java app to create and start and stop kafka streams 
pipeline works perfectly with all offset and partition info visible.
You are also seeing this way working fine at your end too. 

So to fix this kafka streams needs to be started correctly.

So the approach i took is to use the same way in my spring boot app so that all 
the store and processor instances are tied to the right topologies(or tasks) 
within a main kafka streams pipeline properly (as I use many processor classes 
which are backed by many rocksdb stores). This is very crucial otherwise the 
context object does not have the visibility to all the dependent stores if we 
want to access one store from other

To expose the api for the interactive queries from spring boot rest controller, 
I just start the kafka stream class like below(CatalogStreamPipeline is a 
regular java class without reference to any spring beans and start method 
creates the kafkastreams with new instances of the processor classes passed to 
every process step and starts it). This approach creates a new processor always 
and ties to the right stream tasks properly.

I keep a reference to it so that I can use it to get the metadata for stores 
and this works beautifully

CatalogStreamPipeline looks something like below

KStream<String, String> rawEvents = rawStream.filter((k,v) -> v != 
null).map((k,v) -> new KeyValue<>(v.getId(),Utils.writeValueAsString(v)));
        KStream<String, String> rawEventsKeyed = rawEvents.through(stringSerde, 
stringSerde, eventsTopic);
                        rawEventsKeyed.process(MyProcessor1 :: new, 
store1.name());
                        rawEventsKeyed.process(MyProcessor :: new, 
store.name());

Spring Boot Rest Controller looks like below

@PostConstruct
    public void init() throws Exception {
        catalogStreamPipeline = new CatalogStreamPipeline();
        catalogStreamPipeline.start();
    }


@RequestMapping(method = RequestMethod.GET, params = {"id"})
    public String getData(@RequestParam(value="id") String idStr) throws 
Exception {
        try {
        
        
        KafkaStreams kafkaStreams = catalogStreamPipeline.getKafkaStreams();
        final ReadOnlyKeyValueStore<String, String> mystore = 
kafkaStreams.store("mystorex",
                QueryableStoreTypes.<String, String>keyValueStore());
        
                String result = mystore.get(idStr);
        
        return result;
        } catch (Throwable e) {
                e.printStackTrace();
                return "";
        }               
    }



Apology for the long story and not providing the full context beforehand.


Regards
Sai

> Exception when accessing partition, offset and timestamp in processor class
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-4344
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4344
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>            Reporter: saiprasad mishra
>            Assignee: Guozhang Wang
>            Priority: Minor
>
> I have a kafka stream pipeline like below
> source topic stream -> filter for null value ->map to make it keyed by id 
> ->custom processor to mystore ->to another topic -> ktable
> I am hitting the below type of exception in a custom processor class if I try 
> to access offset() or partition() or timestamp() from the ProcessorContext in 
> the process() method. I was hoping it would return the partition and offset 
> for the enclosing topic(in this case source topic) where its consuming from 
> or -1 based on the api docs.
> java.lang.IllegalStateException: This should not happen as offset() should 
> only be called while a record is processed
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>       at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
>       at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
>  ~[kafka-streams-0.10.1.0.jar!/:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
>  [kafka-streams-0.10.1.0.jar!/:?]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to