Created https://issues.apache.org/jira/browse/KAFKA-4253 for this issue.
Guozhang On Tue, Oct 4, 2016 at 3:08 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Srikanth, > > We close the underlying clients before closing the state manager (hence > the states) because for example we need to make sure producer's sent > records have all been acked before the state manager records the changelog > sent offsets as end offsets. This is kind of chicken-and-egg problem, and > we may be able to re-order the shutting down process in the future with > some added shutdown hooks. > > As of now, there is not a perfect solution to your scenario, and I would > like to suggest checking if producer's own batching mechanism is good > enough so you do not need to do this in the streams client layer. > > > Guozhang > > On Sat, Oct 1, 2016 at 2:20 PM, Srikanth <srikanth...@gmail.com> wrote: > >> Hello, >> >> I'm testing out a WriteToSinkProcessor() that batches records before >> writing it to a sink. >> The actual commit to sink happens in punctuate(). I also wanted to commit >> in close(). >> Idea here is, during a regular shutdown, we'll commit all records and >> ideally stop with an empty state. >> My commit() process is 3 step 1) Read from KV store 2) write to sink 3) >> delete written keys from KV store. >> >> I get this exception when closing though. It looks like the kafka producer >> is closed before the changelog topic is updated after close(). >> Should the producer be closed after all tasks and processors are closed? >> >> 16/10/01 17:01:15 INFO StreamThread-1 WriteToSinkProcessor: Closing >> processor instance >> 16/10/01 17:01:16 ERROR StreamThread-1 StreamThread: Failed to remove >> stream tasks in thread [StreamThread-1]: >> java.lang.IllegalStateException: Cannot send after the producer is >> closed. >> at >> org.apache.kafka.clients.producer.internals.RecordAccumulato >> r.append(RecordAccumulator.java:173) >> at >> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka >> Producer.java:467) >> at >> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr >> oducer.java:430) >> at >> org.apache.kafka.streams.processor.internals.RecordCollector >> .send(RecordCollector.java:84) >> at >> org.apache.kafka.streams.processor.internals.RecordCollector >> .send(RecordCollector.java:71) >> at >> org.apache.kafka.streams.state.internals.StoreChangeLogger. >> logChange(StoreChangeLogger.java:108) >> at >> org.apache.kafka.streams.state.internals.InMemoryKeyValueLog >> gedStore.flush(InMemoryKeyValueLoggedStore.java:161) >> at >> org.apache.kafka.streams.state.internals.MeteredKeyValueStor >> e.flush(MeteredKeyValueStore.java:165) >> at >> org.apache.kafka.streams.processor.internals.ProcessorStateM >> anager.close(ProcessorStateManager.java:343) >> at >> org.apache.kafka.streams.processor.internals.AbstractTask. >> close(AbstractTask.java:112) >> at >> org.apache.kafka.streams.processor.internals.StreamTask. >> close(StreamTask.java:317) >> >> Srikanth >> > > > > -- > -- Guozhang > -- -- Guozhang