[ https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ko byoung kwon updated KAFKA-6860: ---------------------------------- Description: First of all, please understand that my English is not fluent. *Symptom* With EOS enabled , Reinitializing stateStores get an NPE because checkpoint is null. {code:java} 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1] Encountered the following error during processing: java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66) ~[kafka-streams-1.1.0.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) ~[kafka-streams-1.1.0.jar:na] at org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230) ~[kafka-streams-1.1.0.jar:na] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) ~[kafka-streams-1.1.0.jar:na] {code} *How to reproduce* *configure as* - changelog topic with short `retention.ms` and `delete` policy (just to reproduce the symptom easily) ex) retention.ms=30000,cleanup.policy=delete - exaclty once semantic enabled - no cleanup *Step* - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], was#2:task[0_1]) - write some data each state store(changelog topic will soon erase those messages. by short "retentin.ms") - when was#2 killed, then was#1 will restore task[0_1]'s data on its own rocksDB - In the process, it finds a checkpoint and an error occurs.(AbstractStateManager #66) {code:java} // My code Map<String, String> topicConfiguration = new HashMap<>(); topicConfiguration.putIfAbsent("cleanup.policy", "delete"); topicConfiguration.putIfAbsent("file.delete.delay.ms", "0"); topicConfiguration.putIfAbsent("retention.ms", "3000"); builder.stream(properties.getSourceTopic(), Consumed.with(Serdes.Long(), Serdes.String())) .groupByKey() .count(Materialized .<Long, Long, KeyValueStore<Bytes, byte[]>>as(ORDER_STORE_NAME) .withKeySerde(Serdes.Long()) .withValueSerde(Serdes.Long()) .withLoggingEnabled(topicConfiguration)); {code} *Suggestion* When EOS is enabled, the checkpoint will be null. I think , need to add some code to create a Checkpoint. As follows {code:java} // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66 // # suggestion start if (checkpoint == null) { checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); } // # suggestion end try { checkpoint.write(checkpointableOffsets); } catch (final IOException fatalException) { log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException); throw new StreamsException("Failed to reinitialize global store.", fatalException); } {code} was: First of all, please understand that my English is not fluent. *Symptom* With EOS enabled , Reinitializing stateStores get an NPE because checkpoint is null. {code:java} 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1] Encountered the following error during processing: java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66) ~[kafka-streams-1.1.0.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) ~[kafka-streams-1.1.0.jar:na] at org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230) ~[kafka-streams-1.1.0.jar:na] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) ~[kafka-streams-1.1.0.jar:na] {code} *How to reproduce* *configure as* - changelog topic with short `retention.ms` and `delete` policy ex) retention.ms=30000,cleanup.policy=delete - exaclty once semantic enabled - no cleanup *Step* - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], was#2:task[0_1]) - write some data each state store(changelog topic will soon erase those messages. by short "retentin.ms") - when was#2 killed, then was#1 will restore task[0_1]'s data on it's own rocksDB - In the process, it finds a checkpoint and an error occurs.(AbstractStateManager #66) {code:java} // My code Map<String, String> topicConfiguration = new HashMap<>(); topicConfiguration.putIfAbsent("cleanup.policy", "delete"); topicConfiguration.putIfAbsent("file.delete.delay.ms", "0"); topicConfiguration.putIfAbsent("retention.ms", "3000"); builder.stream(properties.getSourceTopic(), Consumed.with(Serdes.Long(), Serdes.String())) .groupByKey() .count(Materialized .<Long, Long, KeyValueStore<Bytes, byte[]>>as(ORDER_STORE_NAME) .withKeySerde(Serdes.Long()) .withValueSerde(Serdes.Long()) .withLoggingEnabled(topicConfiguration)); {code} *Suggestion* When EOS is enabled, the checkpoint will be null. I think , need to add some code to create a Checkpoint. As follows {code:java} // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66 // # from if (checkpoint == null) { checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); } // # to try { checkpoint.write(checkpointableOffsets); } catch (final IOException fatalException) { log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException); throw new StreamsException("Failed to reinitialize global store.", fatalException); } {code} > missing creating checkpoint when reinitializeStateStores with eosEnabled(true) > ------------------------------------------------------------------------------ > > Key: KAFKA-6860 > URL: https://issues.apache.org/jira/browse/KAFKA-6860 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Environment: mac, kafka1.1 > Reporter: ko byoung kwon > Priority: Major > Original Estimate: 2h > Remaining Estimate: 2h > > First of all, please understand that my English is not fluent. > *Symptom* > With EOS enabled , Reinitializing stateStores get an NPE because checkpoint > is null. > {code:java} > 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1] > Encountered the following error during processing: > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) > ~[kafka-streams-1.1.0.jar:na] > {code} > *How to reproduce* > *configure as* > - changelog topic with short `retention.ms` and `delete` policy (just to > reproduce the symptom easily) > ex) retention.ms=30000,cleanup.policy=delete > - exaclty once semantic enabled > - no cleanup > *Step* > - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], > was#2:task[0_1]) > - write some data each state store(changelog topic will soon erase those > messages. by short "retentin.ms") > - when was#2 killed, then was#1 will restore task[0_1]'s data on its own > rocksDB > - In the process, it finds a checkpoint and an error > occurs.(AbstractStateManager #66) > {code:java} > // My code > Map<String, String> topicConfiguration = new HashMap<>(); > topicConfiguration.putIfAbsent("cleanup.policy", "delete"); > topicConfiguration.putIfAbsent("file.delete.delay.ms", "0"); > topicConfiguration.putIfAbsent("retention.ms", "3000"); > builder.stream(properties.getSourceTopic(), > Consumed.with(Serdes.Long(), Serdes.String())) > .groupByKey() > .count(Materialized > .<Long, Long, KeyValueStore<Bytes, > byte[]>>as(ORDER_STORE_NAME) > .withKeySerde(Serdes.Long()) > .withValueSerde(Serdes.Long()) > .withLoggingEnabled(topicConfiguration)); > {code} > *Suggestion* > When EOS is enabled, the checkpoint will be null. > I think , need to add some code to create a Checkpoint. > As follows > {code:java} > // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66 > // # suggestion start > if (checkpoint == null) { > checkpoint = new OffsetCheckpoint(new File(baseDir, > CHECKPOINT_FILE_NAME)); > } > // # suggestion end > try { > checkpoint.write(checkpointableOffsets); > } catch (final IOException fatalException) { > log.error("Failed to write offset checkpoint file to {} while > re-initializing {}: {}", checkpoint, stateStores, fatalException); > throw new StreamsException("Failed to reinitialize global store.", > fatalException); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)