[ https://issues.apache.org/jira/browse/KAFKA-4344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15609687#comment-15609687 ]
saiprasad mishra commented on KAFKA-4344: ----------------------------------------- Thanks [~guozhang] for your time on testing this and sorry for my late reply. I think this is not a bug and this is how kafka streams should be used combined with other frameworks I just figured out that this was all caused by my spring boot app(which has spring ioc container) and how it instantiates the java classes used by kafka streams. The result was it would not start the kafka stream thread as while accessing the offset from context in process() method throws the above exception. I confirm that the regular java app to create and start and stop kafka streams pipeline works perfectly with all offset and partition info visible. You are also seeing this way working fine at your end too. So to fix this kafka streams needs to be started correctly. So the approach i took is to use the same way in my spring boot app so that all the store and processor instances are tied to the right topologies(or tasks) within a main kafka streams pipeline properly (as I use many processor classes which are backed by many rocksdb stores). This is very crucial otherwise the context object does not have the visibility to all the dependent stores if we want to access one store from other To expose the api for the interactive queries from spring boot rest controller, I just start the kafka stream class like below(CatalogStreamPipeline is a regular java class without reference to any spring beans and start method creates the kafkastreams with new instances of the processor classes passed to every process step and starts it). This approach creates a new processor always and ties to the right stream tasks properly. I keep a reference to it so that I can use it to get the metadata for stores and this works beautifully CatalogStreamPipeline looks something like below KStream<String, String> rawEvents = rawStream.filter((k,v) -> v != null).map((k,v) -> new KeyValue<>(v.getId(),Utils.writeValueAsString(v))); KStream<String, String> rawEventsKeyed = rawEvents.through(stringSerde, stringSerde, eventsTopic); rawEventsKeyed.process(MyProcessor1 :: new, store1.name()); rawEventsKeyed.process(MyProcessor :: new, store.name()); Spring Boot Rest Controller looks like below @PostConstruct public void init() throws Exception { catalogStreamPipeline = new CatalogStreamPipeline(); catalogStreamPipeline.start(); } @RequestMapping(method = RequestMethod.GET, params = {"id"}) public String getData(@RequestParam(value="id") String idStr) throws Exception { try { KafkaStreams kafkaStreams = catalogStreamPipeline.getKafkaStreams(); final ReadOnlyKeyValueStore<String, String> mystore = kafkaStreams.store("mystorex", QueryableStoreTypes.<String, String>keyValueStore()); String result = mystore.get(idStr); return result; } catch (Throwable e) { e.printStackTrace(); return ""; } } Apology for the long story and not providing the full context beforehand. Regards Sai > Exception when accessing partition, offset and timestamp in processor class > --------------------------------------------------------------------------- > > Key: KAFKA-4344 > URL: https://issues.apache.org/jira/browse/KAFKA-4344 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.1.0 > Reporter: saiprasad mishra > Assignee: Guozhang Wang > Priority: Minor > > I have a kafka stream pipeline like below > source topic stream -> filter for null value ->map to make it keyed by id > ->custom processor to mystore ->to another topic -> ktable > I am hitting the below type of exception in a custom processor class if I try > to access offset() or partition() or timestamp() from the ProcessorContext in > the process() method. I was hoping it would return the partition and offset > for the enclosing topic(in this case source topic) where its consuming from > or -1 based on the api docs. > java.lang.IllegalStateException: This should not happen as offset() should > only be called while a record is processed > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > ~[kafka-streams-0.10.1.0.jar!/:?] > at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] > at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > [kafka-streams-0.10.1.0.jar!/:?] -- This message was sent by Atlassian JIRA (v6.3.4#6332)