Created https://issues.apache.org/jira/browse/KAFKA-4253 for this issue.


Guozhang

On Tue, Oct 4, 2016 at 3:08 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Srikanth,
>
> We close the underlying clients before closing the state manager (hence
> the states) because for example we need to make sure producer's sent
> records have all been acked before the state manager records the changelog
> sent offsets as end offsets. This is kind of chicken-and-egg problem, and
> we may be able to re-order the shutting down process in the future with
> some added shutdown hooks.
>
> As of now, there is not a perfect solution to your scenario, and I would
> like to suggest checking if producer's own batching mechanism is good
> enough so you do not need to do this in the streams client layer.
>
>
> Guozhang
>
> On Sat, Oct 1, 2016 at 2:20 PM, Srikanth <srikanth...@gmail.com> wrote:
>
>> Hello,
>>
>> I'm testing out a WriteToSinkProcessor() that batches records before
>> writing it to a sink.
>> The actual commit to sink happens in punctuate(). I also wanted to commit
>> in close().
>> Idea here is, during a regular shutdown, we'll commit all records and
>> ideally stop with an empty state.
>> My commit() process is 3 step 1) Read from KV store 2) write to sink 3)
>> delete written keys from KV store.
>>
>> I get this exception when closing though. It looks like the kafka producer
>> is closed before the changelog topic is updated after close().
>> Should the producer be closed after all tasks and processors are closed?
>>
>> 16/10/01 17:01:15 INFO StreamThread-1 WriteToSinkProcessor: Closing
>> processor instance
>> 16/10/01 17:01:16 ERROR StreamThread-1 StreamThread: Failed to remove
>> stream tasks in thread [StreamThread-1]:
>> java.lang.IllegalStateException: Cannot send after the producer is
>> closed.
>>         at
>> org.apache.kafka.clients.producer.internals.RecordAccumulato
>> r.append(RecordAccumulator.java:173)
>>         at
>> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka
>> Producer.java:467)
>>         at
>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr
>> oducer.java:430)
>>         at
>> org.apache.kafka.streams.processor.internals.RecordCollector
>> .send(RecordCollector.java:84)
>>         at
>> org.apache.kafka.streams.processor.internals.RecordCollector
>> .send(RecordCollector.java:71)
>>         at
>> org.apache.kafka.streams.state.internals.StoreChangeLogger.
>> logChange(StoreChangeLogger.java:108)
>>         at
>> org.apache.kafka.streams.state.internals.InMemoryKeyValueLog
>> gedStore.flush(InMemoryKeyValueLoggedStore.java:161)
>>         at
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>> e.flush(MeteredKeyValueStore.java:165)
>>         at
>> org.apache.kafka.streams.processor.internals.ProcessorStateM
>> anager.close(ProcessorStateManager.java:343)
>>         at
>> org.apache.kafka.streams.processor.internals.AbstractTask.
>> close(AbstractTask.java:112)
>>         at
>> org.apache.kafka.streams.processor.internals.StreamTask.
>> close(StreamTask.java:317)
>>
>> Srikanth
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to