the right way to see changelog persistent by rocksdb is use ByteDeser, and
then decode hex to string

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");

for(ConsumerRecord<String, byte[]> record: consumerRecords) {

  print bytesToHexString(record.value())

}

public static String bytesToHexString(byte[] src){
    StringBuilder stringBuilder = new StringBuilder("");
    if (src == null || src.length <= 0) {
        return null;
    }
    for (int i = 0; i < src.length; i++) {
        int v = src[i] & 0xFF;
        String hv = Integer.toHexString(v);
        if (hv.length() < 2) {
            stringBuilder.append(0);
        }
        stringBuilder.append(hv);
    }
    return stringBuilder.toString();
}

output now collect

p:0,o:0,k:msg1,v:00000001
p:0,o:1,k:msg3,v:00000001
p:0,o:2,k:msg5,v:00000001
p:0,o:3,k:msg1,v:00000002
p:0,o:4,k:msg3,v:00000002
p:1,o:0,k:msg2,v:00000001
p:1,o:1,k:msg4,v:00000001
p:1,o:2,k:msg2,v:00000002
p:1,o:3,k:msg2,v:00000003


2017-06-07 18:42 GMT+08:00 john cheng <zqhxuy...@gmail.com>:

> I add some log on StoreChangeLog
>
> for (K k : this.dirty) {
>     V v = getter.get(k);
>     log.info("logChange key:{},value:{}", k, v);
>     collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), 
> keySerializer, valueSerializer);
> }
>
> and found the print result is normal, just some byte:
>
> [2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka] 
> (org.apache.kafka.streams.state.internals.StoreChangeLogger)
> [2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2] 
> (org.apache.kafka.streams.state.internals.StoreChangeLogger)
>
> but console-consumer still output kafka:
>
> so this may be problem of console-consume.
>
>
> 2017-06-07 18:24 GMT+08:00 john cheng <zqhxuy...@gmail.com>:
>
>> I'm running WordCountProcessorDemo with Processor API. and change
>> something below
>> 1. config 1 stream-thread and 1 replicas
>> 2. change inMemory() to persistent()
>> MyKakfa version is 0.10.0.0. After running streaming application, I check
>> msg output by console-consumer
>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list
>> localhost:9092 --topic streams-input2
>> msg1  # by console-producer, we can only produce message's value. so
>> message produce to input topic will use roundrobbin partition
>> msg2
>> msg3
>> msg4
>> msg5
>> msg6
>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>> --bootstrap-server localhost:9092 --topic streams-input2 --property
>> print.key=true --property key.separator=":" --from-beginning
>> null:msg2  # key is null, value is what we produce above
>> null:msg4
>> null:msg6
>> null:msg1
>> null:msg3
>> null:msg5
>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>> --bootstrap-server localhost:9092 --topic streams-output2 --property
>> print.key=true --property key.separator=":" --from-beginning
>> msg2:1
>> msg1:1
>> msg1:1
>> msg3:1
>> msg2:1
>> msg4:1
>> msg1:1
>> msg3:1
>> msg5:1
>> msg2:1
>> msg4:1
>> msg6:1  # due to log compaction, same key will be overwrite. this is ok...
>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>> --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog
>> --property print.key=true --property key.separator=":" --from-beginning
>> *msg2:*
>> *msg4:*
>> *msg6:*
>> *msg1:*
>> *msg3:*
>> *msg5:*
>> Everything is ok, except changelog-topic trouble me. Why it's value is
>> empty?
>>
>> I have dig into source code, and summary the workflow :
>> 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore,
>> no matter Memory or RocksDB, they both put into local storage, then append
>> msg to StoreChangeLogger
>> 2. the key append to StoreChangeLogger first, and invoke maybeLogChange
>> by passing ValueGetter, this getter will get value from local storage when
>> logChange() operation happen
>> 3. when logChange() on StoreChangeLogger happen, send KeyValue message to
>> changelog topic, here is  streams-wordcount-Counts-changelog
>> 4. StoreChangeLogger use dirty and remove Set to swap between logChange()
>> and add().
>>
>> for (K k : this.dirty) { // logChange() method flush dirty to changelog topic
>>     V v = getter.get(k);  // value getter will get value from local storage
>>     collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), 
>> keySerializer, valueSerializer);
>> }
>>
>>
>> The Only reason I can figure out why value is empty is when invoke
>> KeyValueStore's delete method. But Actualy the application did't do it
>>
>> Anyone help me, Tks .
>>
>>
>

Reply via email to