Hello Srikanth,

We close the underlying clients before closing the state manager (hence the
states) because for example we need to make sure producer's sent records
have all been acked before the state manager records the changelog sent
offsets as end offsets. This is kind of chicken-and-egg problem, and we may
be able to re-order the shutting down process in the future with some added
shutdown hooks.

As of now, there is not a perfect solution to your scenario, and I would
like to suggest checking if producer's own batching mechanism is good
enough so you do not need to do this in the streams client layer.


Guozhang

On Sat, Oct 1, 2016 at 2:20 PM, Srikanth <srikanth...@gmail.com> wrote:

> Hello,
>
> I'm testing out a WriteToSinkProcessor() that batches records before
> writing it to a sink.
> The actual commit to sink happens in punctuate(). I also wanted to commit
> in close().
> Idea here is, during a regular shutdown, we'll commit all records and
> ideally stop with an empty state.
> My commit() process is 3 step 1) Read from KV store 2) write to sink 3)
> delete written keys from KV store.
>
> I get this exception when closing though. It looks like the kafka producer
> is closed before the changelog topic is updated after close().
> Should the producer be closed after all tasks and processors are closed?
>
> 16/10/01 17:01:15 INFO StreamThread-1 WriteToSinkProcessor: Closing
> processor instance
> 16/10/01 17:01:16 ERROR StreamThread-1 StreamThread: Failed to remove
> stream tasks in thread [StreamThread-1]:
> java.lang.IllegalStateException: Cannot send after the producer is closed.
>         at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(
> RecordAccumulator.java:173)
>         at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(
> KafkaProducer.java:467)
>         at
> org.apache.kafka.clients.producer.KafkaProducer.send(
> KafkaProducer.java:430)
>         at
> org.apache.kafka.streams.processor.internals.RecordCollector.send(
> RecordCollector.java:84)
>         at
> org.apache.kafka.streams.processor.internals.RecordCollector.send(
> RecordCollector.java:71)
>         at
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:108)
>         at
> org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.
> flush(InMemoryKeyValueLoggedStore.java:161)
>         at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(
> MeteredKeyValueStore.java:165)
>         at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(
> ProcessorStateManager.java:343)
>         at
> org.apache.kafka.streams.processor.internals.AbstractTask.close(
> AbstractTask.java:112)
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.
> java:317)
>
> Srikanth
>



-- 
-- Guozhang

Reply via email to