Elias Levy created KAFKA-4683:
---------------------------------

             Summary: Mismatch between Stream windowed store and broker log 
retention logic
                 Key: KAFKA-4683
                 URL: https://issues.apache.org/jira/browse/KAFKA-4683
             Project: Kafka
          Issue Type: Bug
          Components: log, streams
    Affects Versions: 0.10.1.1
            Reporter: Elias Levy


The RocksDBWindowStore keeps key-value entries for a configurable retention 
period.  The leading edge of the time period kept is determined the newest 
timestamp of an inserted KV.  The trailing edge is this leading edge minus the 
requested retention period.

If logging is enabled, changes to the store are written to a change log topic 
that is configured with a retention.ms value equal to the store retention 
period.  The leading edge of the time period kept by the log is the current 
time.  The trailing edge is the leading edge minus the requested retention 
period.

The difference on how the leading edge is determined can result in unexpected 
behavior.

If the stream application is processing data older than the retention period 
and storing it in a windowed store, the store will have data for the retention 
period looking back from the newest timestamp of the processed message.  But 
the messages written to the state changeling will almost immediately be deleted 
by the broker, as they will fall outside of the retention window as it computes 
it.  

If the application is stopped and restarted in this state, and if the local 
state has been lost of some reason, the application won't be able to recover 
the sate from the broker, as it broker has deleted it.


In addition, I've noticed that there is a discrepancy on what timestamp is used 
between the store and the change log.  The store will use the timestamp passed 
as an argument to {{put}}, or if no timestamp is passed, fallback to 
{{context.timestamp}}.  But {{StoreChangeLogger.logChange}} does not take a 
timestamp.  Instead is always uses {{context.timestamp}} to write the change to 
the broker.  Thus it is possible that the state store and the change log to use 
different timestamps for the same KV.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to