[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16785127#comment-16785127
 ] 

Guozhang Wang commented on KAFKA-8037:
--------------------------------------

We use `offsetLimit` in ProcessorStateManager for source table's committed 
offset, if there's a committed offset X, then we stop restoring data at X other 
than till the log-end-offset; if there's no committed offset, then we will try 
to restore to the log-end-offset at starting up (log may keep growing but we 
will only restore to the point we saw at that point). So suppose we start 
successfully without bad data, do normal processing that deserialize and we 
committed at offset X, and suppose there's a bad data at offset Y > X. If we 
failed and restart, the restoration will not reach Y but stop at X.

However this issue is still valid if there is a bad data and the app is 
starting for the first time, in which case there's no committed offset and it 
will tries to restore to the log end offset.

> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to