[ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ko byoung kwon updated KAFKA-6860:
----------------------------------
    Description: 
First of all, please understand that my English is not fluent.

*Symptom*
 With EOS enabled , Reinitializing stateStores get an NPE because checkpoint is 
null.
{code:java}
2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1] 
Encountered the following error during processing:
java.lang.NullPointerException: null
        at 
org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
 ~[kafka-streams-1.1.0.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
 ~[kafka-streams-1.1.0.jar:na]
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
 ~[kafka-streams-1.1.0.jar:na]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
 ~[kafka-streams-1.1.0.jar:na]
{code}
*How to reproduce*

*configure as*
 - changelog topic with short `retention.ms` and `delete` policy (just to 
reproduce the symptom easily)
 ex) retention.ms=30000,cleanup.policy=delete 
 - exaclty once semantic enabled
 - no cleanup

*Step*
 - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
was#2:task[0_1])
 - write some data each state store(changelog topic will soon erase those 
messages. by short "retentin.ms")
 - when was#2 killed, then was#1 will restore task[0_1]'s data on its own 
rocksDB
 - In the process, it finds a checkpoint and an error 
occurs.(AbstractStateManager #66)

{code:java}
// My code
Map<String, String> topicConfiguration = new HashMap<>();
        topicConfiguration.putIfAbsent("cleanup.policy", "delete");
        topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
        topicConfiguration.putIfAbsent("retention.ms", "3000");

        builder.stream(properties.getSourceTopic(),
                       Consumed.with(Serdes.Long(), Serdes.String()))
               .groupByKey()
               .count(Materialized
                          .<Long, Long, KeyValueStore<Bytes, 
byte[]>>as(ORDER_STORE_NAME)
                          .withKeySerde(Serdes.Long())
                          .withValueSerde(Serdes.Long())
                          .withLoggingEnabled(topicConfiguration));
{code}
*Suggestion*

When EOS is enabled, the checkpoint will be null.
 I think , need to add some code to create a Checkpoint. 
 As follows
{code:java}
// # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
// # suggestion start
if (checkpoint == null) {
    checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
}
// # suggestion end

try {
    checkpoint.write(checkpointableOffsets);
} catch (final IOException fatalException) {
    log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stateStores, fatalException);
 throw new StreamsException("Failed to reinitialize global store.", 
fatalException);
}
{code}

  was:
First of all, please understand that my English is not fluent.

*Symptom*
 With EOS enabled , Reinitializing stateStores get an NPE because checkpoint is 
null.
{code:java}
2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1] 
Encountered the following error during processing:
java.lang.NullPointerException: null
        at 
org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
 ~[kafka-streams-1.1.0.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
 ~[kafka-streams-1.1.0.jar:na]
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
 ~[kafka-streams-1.1.0.jar:na]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
 ~[kafka-streams-1.1.0.jar:na]
{code}
*How to reproduce*

*configure as*
 - changelog topic with short `retention.ms` and `delete` policy
 ex) retention.ms=30000,cleanup.policy=delete 
 - exaclty once semantic enabled
 - no cleanup

*Step*
 - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
was#2:task[0_1])
 - write some data each state store(changelog topic will soon erase those 
messages. by short "retentin.ms")
 - when was#2 killed, then was#1 will restore task[0_1]'s data on it's own 
rocksDB
 - In the process, it finds a checkpoint and an error 
occurs.(AbstractStateManager #66)

{code:java}
// My code
Map<String, String> topicConfiguration = new HashMap<>();
        topicConfiguration.putIfAbsent("cleanup.policy", "delete");
        topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
        topicConfiguration.putIfAbsent("retention.ms", "3000");

        builder.stream(properties.getSourceTopic(),
                       Consumed.with(Serdes.Long(), Serdes.String()))
               .groupByKey()
               .count(Materialized
                          .<Long, Long, KeyValueStore<Bytes, 
byte[]>>as(ORDER_STORE_NAME)
                          .withKeySerde(Serdes.Long())
                          .withValueSerde(Serdes.Long())
                          .withLoggingEnabled(topicConfiguration));
{code}
*Suggestion*

When EOS is enabled, the checkpoint will be null.
 I think , need to add some code to create a Checkpoint. 
 As follows
{code:java}
// # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
// # from
if (checkpoint == null) {
    checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
}
// # to

try {
    checkpoint.write(checkpointableOffsets);
} catch (final IOException fatalException) {
    log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stateStores, fatalException);
 throw new StreamsException("Failed to reinitialize global store.", 
fatalException);
}
{code}


> missing creating checkpoint when reinitializeStateStores with eosEnabled(true)
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-6860
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6860
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>         Environment: mac, kafka1.1
>            Reporter: ko byoung kwon
>            Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> First of all, please understand that my English is not fluent.
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
>         at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=30000,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map<String, String> topicConfiguration = new HashMap<>();
>         topicConfiguration.putIfAbsent("cleanup.policy", "delete");
>         topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
>         topicConfiguration.putIfAbsent("retention.ms", "3000");
>         builder.stream(properties.getSourceTopic(),
>                        Consumed.with(Serdes.Long(), Serdes.String()))
>                .groupByKey()
>                .count(Materialized
>                           .<Long, Long, KeyValueStore<Bytes, 
> byte[]>>as(ORDER_STORE_NAME)
>                           .withKeySerde(Serdes.Long())
>                           .withValueSerde(Serdes.Long())
>                           .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
>     checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
>     checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
>     log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> fatalException);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to