Hello,

I'm testing out a WriteToSinkProcessor() that batches records before
writing it to a sink.
The actual commit to sink happens in punctuate(). I also wanted to commit
in close().
Idea here is, during a regular shutdown, we'll commit all records and
ideally stop with an empty state.
My commit() process is 3 step 1) Read from KV store 2) write to sink 3)
delete written keys from KV store.

I get this exception when closing though. It looks like the kafka producer
is closed before the changelog topic is updated after close().
Should the producer be closed after all tasks and processors are closed?

16/10/01 17:01:15 INFO StreamThread-1 WriteToSinkProcessor: Closing
processor instance
16/10/01 17:01:16 ERROR StreamThread-1 StreamThread: Failed to remove
stream tasks in thread [StreamThread-1]:
java.lang.IllegalStateException: Cannot send after the producer is closed.
        at
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:173)
        at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:467)
        at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
        at
org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:84)
        at
org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:71)
        at
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:108)
        at
org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.flush(InMemoryKeyValueLoggedStore.java:161)
        at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:165)
        at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:343)
        at
org.apache.kafka.streams.processor.internals.AbstractTask.close(AbstractTask.java:112)
        at
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:317)

Srikanth

Reply via email to