[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161440#comment-17161440
 ] 

Almog Gavra commented on KAFKA-8037:
------------------------------------

I'm not totally sure I'm understanding the solutions, but figured I should 
throw this out there anyway for the record.  [~ableegoldman] wrote a good 
summary of the goals:
 # reuse the input topic as the changelog to avoid replicating all that data
 # make restoration as fast as possible by copying plain bytes with_out_ 
deserializing
 # don't load bad data, or copy bytes during restoration that weren't copied 
during normal processing

I think 1 and 3 are mutually exclusive independently of 2. If I understand 
correctly, kafka doesn't care about "good" and "bad" data - so if there are two 
events for the same key and the second of the events is "bad", eventually the 
topic will be compacted and the good data won't even exist anymore. Worded 
differently, even if we passed the "bad" record into the deserializer and 
identified it as a bad record (which the user's handler decides to drop), we 
would have no way to update the state store to the "good" value.

Riffing off Sophie's idea of an inverse changelog topic, I think it would need 
to include the bad contents and the previously-known good contents. Then you 
could do a table scan of the materialized inverse changelog topic at the end 
and compare any keys that are present in the materialized source topic - if 
those keys have the same byte-for-byte data in them as the "bad" contents in 
the inverse-changelog topic, you would restore it to the "last known good" 
content. Otherwise (the data is not the "bad" data) you know a new event came 
in that was good and you can send a tombstone to the inverse-changelog topic.

> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>              Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to