I add some log on StoreChangeLog

for (K k : this.dirty) {
    V v = getter.get(k);
    log.info("logChange key:{},value:{}", k, v);
    collector.send(new ProducerRecord<>(this.topic, this.partition, k,
v), keySerializer, valueSerializer);
}

and found the print result is normal, just some byte:

[2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka]
(org.apache.kafka.streams.state.internals.StoreChangeLogger)
[2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2]
(org.apache.kafka.streams.state.internals.StoreChangeLogger)

but console-consumer still output kafka:

so this may be problem of console-consume.


2017-06-07 18:24 GMT+08:00 john cheng <zqhxuy...@gmail.com>:

> I'm running WordCountProcessorDemo with Processor API. and change
> something below
> 1. config 1 stream-thread and 1 replicas
> 2. change inMemory() to persistent()
> MyKakfa version is 0.10.0.0. After running streaming application, I check
> msg output by console-consumer
> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list
> localhost:9092 --topic streams-input2
> msg1  # by console-producer, we can only produce message's value. so
> message produce to input topic will use roundrobbin partition
> msg2
> msg3
> msg4
> msg5
> msg6
> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
> --bootstrap-server localhost:9092 --topic streams-input2 --property
> print.key=true --property key.separator=":" --from-beginning
> null:msg2  # key is null, value is what we produce above
> null:msg4
> null:msg6
> null:msg1
> null:msg3
> null:msg5
> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
> --bootstrap-server localhost:9092 --topic streams-output2 --property
> print.key=true --property key.separator=":" --from-beginning
> msg2:1
> msg1:1
> msg1:1
> msg3:1
> msg2:1
> msg4:1
> msg1:1
> msg3:1
> msg5:1
> msg2:1
> msg4:1
> msg6:1  # due to log compaction, same key will be overwrite. this is ok...
> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
> --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog
> --property print.key=true --property key.separator=":" --from-beginning
> *msg2:*
> *msg4:*
> *msg6:*
> *msg1:*
> *msg3:*
> *msg5:*
> Everything is ok, except changelog-topic trouble me. Why it's value is
> empty?
>
> I have dig into source code, and summary the workflow :
> 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore, no
> matter Memory or RocksDB, they both put into local storage, then append msg
> to StoreChangeLogger
> 2. the key append to StoreChangeLogger first, and invoke maybeLogChange by
> passing ValueGetter, this getter will get value from local storage when
> logChange() operation happen
> 3. when logChange() on StoreChangeLogger happen, send KeyValue message to
> changelog topic, here is  streams-wordcount-Counts-changelog
> 4. StoreChangeLogger use dirty and remove Set to swap between logChange()
> and add().
>
> for (K k : this.dirty) { // logChange() method flush dirty to changelog topic
>     V v = getter.get(k);  // value getter will get value from local storage
>     collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), 
> keySerializer, valueSerializer);
> }
>
>
> The Only reason I can figure out why value is empty is when invoke
> KeyValueStore's delete method. But Actualy the application did't do it
>
> Anyone help me, Tks .
>
>

Reply via email to