[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17008094#comment-17008094 ]
John Roesler commented on KAFKA-8037: ------------------------------------- Hey, all. If I understood, the proposal is something like: Patrik: when restoring, always round trip through the serde when the optimization is in effect and we are treating the source as a changelog. In the other case where there is a real changelog, it has already been sanitized, so we can just use bulk restoration as today. Guozhang: When the optimization is in effect and we are treating the input as a changelog, during restoration, we should never restore past the committed offset. Once we reach the committed offset, we should transition to normal processing so that time synchronization is handled properly. In the special case where there is no committed offset, we should conclude there is nothing to restore and immediately transition to normal processing (which is a change from what we do today). >From where I’m sitting, these are both correct and we should do both. they are >also orthogonal. This particular ticket seems only to be relevant to Patrik’s >proposal. Maybe Guozhang should make another ticket? Also, I didn’t realize that PR had stalled. I can take another look at it next week. > KTable restore may load bad data > -------------------------------- > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)