[ 
https://issues.apache.org/jira/browse/KAFKA-13431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17582899#comment-17582899
 ] 

Yash Mayya edited comment on KAFKA-13431 at 8/22/22 10:49 AM:
--------------------------------------------------------------

Thanks Diego.

 

[~ChrisEgerton] with regard to
{code:java}
I'm also wondering about how exactly this method will be implemented. Will
we automatically create a new SinkRecord instance at the end of the
transformation chain in order to provide the correct topic partition (and
possibly offset)? If so, this should be called out since it means that
transformations that return custom subclasses of SinkRecord will no longer
be able to do so (or rather, they will still be able to, but these custom
subclasses will never be visible to sink tasks).{code}
I think we have two options here right? The first is what you discussed above 
in the mailing discussion thread. The second option is to not do so, and assume 
that well-behaved transformations only use `ConnectRecord::newRecord` to return 
new records whose Javadoc states:

 

{color:#629755}* Create a new record of the same type as itself, with the 
specified parameter values. All other fields in this record will be 
copied{color}{color:#629755}* over to the new record.{color}

 

{color:#172b4d}This would make sure that the new record retains the original 
topic and partition information from the original record. However, I don't 
think it is documented or enforced anywhere that SMTs must use one of the 
`newRecord` methods to create new records and they could simply use one of the 
public constructors instead I presume (although none of the SMTs in AK do 
this)? So, I think option 1 is the better one and we would need to call out / 
document that SMTs shouldn't use custom subclasses of `ConnectRecord` / 
`SinkRecord` / `SourceRecord` or at least that they may or may not be passed on 
as is to tasks.{color}


was (Author: yash.mayya):
Thanks Diego.

 

[~ChrisEgerton] with regard to
{code:java}
I'm also wondering about how exactly this method will be implemented. Will
we automatically create a new SinkRecord instance at the end of the
transformation chain in order to provide the correct topic partition (and
possibly offset)? If so, this should be called out since it means that
transformations that return custom subclasses of SinkRecord will no longer
be able to do so (or rather, they will still be able to, but these custom
subclasses will never be visible to sink tasks).{code}
I think we have two options here right? The first is what you discussed above 
in the mailing discussion thread. The second option is to not do so, and assume 
that well-behaved transformations only use `ConnectRecord::newRecord` to return 
new records whose Javadoc states:

 

{color:#629755}* Create a new record of the same type as itself, with the 
specified parameter values. All other fields in this record will be 
copied{color}{color:#629755}* over to the new record.{color}

 

{color:#172b4d}This would make sure that the new record retains the original 
topic and partition information from the original record. However, I don't 
think it is documented or enforced anywhere that SMTs must use one of the 
`newRecord` methods to create new records and they could simply use one of the 
public constructors instead I presume (although none of the SMTs in AK do 
this)? So, I think option 1 is the better one and we would need to call out / 
document that SMTs shouldn't use custom subclasses of `ConnectRecord` / 
`SinkRecord` / `SourceRecord`.{color}

> Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit 
> users)
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-13431
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13431
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Diego Erdody
>            Assignee: Yash Mayya
>            Priority: Major
>              Labels: needs-kip
>
> There's currently an incompatibility between Sink connectors overriding the 
> {{SinkTask.preCommit}} method (for asynchronous processing) and SMTs that 
> mutate the topic field.
> The problem was present since the {{preCommit}} method inception and is 
> rooted in a mismatch between the topic/partition that is passed to 
> {{open/preCommit}} (the original topic and partition before applying any 
> transformations) and the topic partition that is present in the SinkRecord 
> that the {{SinkTask.put}} method receives (after transformations are 
> applied). Since that's all the information the connector has to implement any 
> kind of internal offset tracking, the topic/partitions it can return in 
> preCommit will correspond to the transformed topic, when the framework 
> actually expects it to be the original topic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to