[ https://issues.apache.org/jira/browse/KAFKA-5213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16005581#comment-16005581 ]
ASF GitHub Bot commented on KAFKA-5213: --------------------------------------- GitHub user apurvam opened a pull request: https://github.com/apache/kafka/pull/3015 KAFKA-5213; Mark a MemoryRecordsBuilder as full as soon as the append stream is closed You can merge this pull request into a Git repository by running: $ git pull https://github.com/apurvam/kafka KAFKA-5213-illegalstateexception-in-ensureOpenForAppend Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3015.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3015 ---- commit 799fd8d3d60e3f8950bbe4b7d5e8865e6755f5aa Author: Apurva Mehta <apu...@confluent.io> Date: 2017-05-10T22:35:52Z Mark a MemoryRecordsBuilder as full as soon as the append stream is closed ---- > IllegalStateException in ensureOpenForRecordAppend > -------------------------------------------------- > > Key: KAFKA-5213 > URL: https://issues.apache.org/jira/browse/KAFKA-5213 > Project: Kafka > Issue Type: Bug > Reporter: dan norwood > Assignee: Apurva Mehta > Priority: Critical > Fix For: 0.11.0.0 > > > i have a streams app that was working recently while pointing at trunk. this > morning i ran it and now get > {noformat} > [2017-05-10 14:29:26,266] ERROR stream-thread > [_confluent-controlcenter-3-3-0-1-04624550-88f9-4557-a47f-3dfbec3bc3d1-StreamThread-4] > Streams application error during processing: {} > (org.apache.kafka.streams.processor.internals.StreamThread:518) > java.lang.IllegalStateException: Tried to append a record, but > MemoryRecordsBuilder is closed for record appends > at > org.apache.kafka.common.record.MemoryRecordsBuilder.ensureOpenForRecordAppend(MemoryRecordsBuilder.java:607) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.appendLegacyRecord(MemoryRecordsBuilder.java:567) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:353) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:382) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:440) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:463) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:83) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.tryAppend(RecordAccumulator.java:257) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:210) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:645) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:598) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:97) > at > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:55) > at > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:100) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:51) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:42) > at > org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:90) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:145) > at > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:239) > at > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:214) > at > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:122) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:143) > at > org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:111) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:69) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:175) > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:657) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:540) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:511) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)