[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163126#comment-17163126 ]
Matthias J. Sax commented on KAFKA-8037: ---------------------------------------- {quote}It is currently true that this is _only_ used to apply the source changelog optimization. {quote} How thought? Our optimizer does more, eg, merging redundant repartition topics... And in contract to remove/deprecate `build()`, I actually proposed in KIP-591 to deprecate the parameterless `build()` method and only keep `build(Properties)`... {quote}you have to pass two different instances of the same serde supplier instead of the same instance of the serde supplier twice in order to get a changelog topic for your source table {quote} Luckily, there is nothing like a SerdeSupplier :) In general, I think we should try do this optimization by default, because (1) it reduces the cost to run a topology (2) we have in general positive experience with it, and (3) it seems to be the most common use-case for `builder.table()` that people just want to populate a changelog into a table without any data modification. – All mentioned corner cases for which we cannot do the optimization seem to be in the minority. For schema evolution, I don't buy the argument that the serde is strictly asymmetric, or to be more precise, from a correctness point of view I don't see why it's problematic? If a serde read a byte-array A (with old schema) and writes it back as byte-array B (with new schema), even if we apply the optimization and push byte-array A into the store later during a restore, the Serde would still be able to read it (it was also able to read A from the source topics, so I can also read it from the store). For Serdes that actually do a projection-on-read, I agree with Sophie that this should be discouraged in general and a proper map()/mapValues() should be used instead – for this case, the user automatically opts-out anyway as there are "two table" and the second table will get it's own changelog topic (if a user want to do the projection with the serde for per-reasons, they can still do is, and add an no-op/identify mapValues() step in between – I would rather optimize for less experienced users than for experiences once. To quote my favorite API design advice: "simple thing should be easy, and hard things should be possible"). > KTable restore may load bad data > -------------------------------- > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)