[ https://issues.apache.org/jira/browse/KAFKA-13431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17582899#comment-17582899 ]
Yash Mayya edited comment on KAFKA-13431 at 8/22/22 12:16 PM: -------------------------------------------------------------- Thanks Diego. [~ChrisEgerton] with regard to {code:java} I'm also wondering about how exactly this method will be implemented. Will we automatically create a new SinkRecord instance at the end of the transformation chain in order to provide the correct topic partition (and possibly offset)? If so, this should be called out since it means that transformations that return custom subclasses of SinkRecord will no longer be able to do so (or rather, they will still be able to, but these custom subclasses will never be visible to sink tasks).{code} -I think we have two options here right? The first is what you discussed above in the mailing discussion thread. The second option is to not do so, and assume that well-behaved transformations only use `ConnectRecord::newRecord` to return new records whose Javadoc states:- -{color:#629755}* Create a new record of the same type as itself, with the specified parameter values. All other fields in this record will be copied{color}{color:#629755}* over to the new record.{color}- {color:#172b4d}-This would make sure that the new record retains the original topic and partition information from the original record. However, I don't think it is documented or enforced anywhere that SMTs must use one of the `newRecord` methods to create new records and they could simply use one of the public constructors instead I presume (although none of the SMTs in AK do this)? So, I think option 1 is the better one and we would need to call out / document that SMTs shouldn't use custom subclasses of `ConnectRecord` / `SinkRecord` / `SourceRecord` or at least that they may or may not be passed on as is to tasks.- {color} {color:#172b4d}Edit: Nvm, I guess this whole thing is a moot point considering the `WorkerSinkTask` already creates a new `InternalSinkRecord` after the transformation chain is applied which means that any custom subclasses of `SinkRecord` returned by SMTs will not be visible to sink tasks even today:{color} {color:#172b4d}[https://github.com/apache/kafka/blob/b392cf212f7ed4a82b79c3690b488619c027dba9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L540-L546]{color} {color:#172b4d}[https://github.com/apache/kafka/blob/b392cf212f7ed4a82b79c3690b488619c027dba9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L35-L40]{color} was (Author: yash.mayya): Thanks Diego. [~ChrisEgerton] with regard to {code:java} I'm also wondering about how exactly this method will be implemented. Will we automatically create a new SinkRecord instance at the end of the transformation chain in order to provide the correct topic partition (and possibly offset)? If so, this should be called out since it means that transformations that return custom subclasses of SinkRecord will no longer be able to do so (or rather, they will still be able to, but these custom subclasses will never be visible to sink tasks).{code} I think we have two options here right? The first is what you discussed above in the mailing discussion thread. The second option is to not do so, and assume that well-behaved transformations only use `ConnectRecord::newRecord` to return new records whose Javadoc states: {color:#629755}* Create a new record of the same type as itself, with the specified parameter values. All other fields in this record will be copied{color}{color:#629755}* over to the new record.{color} {color:#172b4d}This would make sure that the new record retains the original topic and partition information from the original record. However, I don't think it is documented or enforced anywhere that SMTs must use one of the `newRecord` methods to create new records and they could simply use one of the public constructors instead I presume (although none of the SMTs in AK do this)? So, I think option 1 is the better one and we would need to call out / document that SMTs shouldn't use custom subclasses of `ConnectRecord` / `SinkRecord` / `SourceRecord` or at least that they may or may not be passed on as is to tasks. {color} {color:#172b4d}Edit: Nvm, I guess this whole thing is a moot point considering the `WorkerSinkTask` already creates a new `InternalSinkRecord` after the transformation chain is applied :{color} {color:#172b4d}[https://github.com/apache/kafka/blob/b392cf212f7ed4a82b79c3690b488619c027dba9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L540-L546]{color} {color:#172b4d}[https://github.com/apache/kafka/blob/b392cf212f7ed4a82b79c3690b488619c027dba9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L35-L40]{color} > Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit > users) > ----------------------------------------------------------------------------------- > > Key: KAFKA-13431 > URL: https://issues.apache.org/jira/browse/KAFKA-13431 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Reporter: Diego Erdody > Assignee: Yash Mayya > Priority: Major > Labels: needs-kip > > There's currently an incompatibility between Sink connectors overriding the > {{SinkTask.preCommit}} method (for asynchronous processing) and SMTs that > mutate the topic field. > The problem was present since the {{preCommit}} method inception and is > rooted in a mismatch between the topic/partition that is passed to > {{open/preCommit}} (the original topic and partition before applying any > transformations) and the topic partition that is present in the SinkRecord > that the {{SinkTask.put}} method receives (after transformations are > applied). Since that's all the information the connector has to implement any > kind of internal offset tracking, the topic/partitions it can return in > preCommit will correspond to the transformed topic, when the framework > actually expects it to be the original topic. -- This message was sent by Atlassian Jira (v8.20.10#820010)