[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16020279#comment-16020279 ]
Randall Hauch edited comment on KAFKA-3821 at 5/22/17 10:12 PM: ---------------------------------------------------------------- [~ewencp], the more I think about this issue, the more I think that it should be solved with a new subtype of {{SourceRecord}}, say {{OffsetRecord}}: {code:java} public class OffsetRecord extends SourceRecord { public OffsetRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) { super(sourcePartition, sourceOffset, null, null, null, null, null, null); } // with hashCode(), equals(), toString(), and newRecord(...) // implementations } {code} The {{SourceTaskWorker}} would need to check instanceof and skip a few things it's currently doing (e.g., using null for key and value rather than converting them, not sending to the producer), but everything else would work fine as is since {{OffsetRecord}} would be a subtype of {{SourceRecord}}. I also think that we should *not* introduce a supertype of {{SourceRecord}} and change the signature of {{SourceTask.poll()}} to return something other than {{List<SourceRecord>}}. Technically we could do this because it is a binary compatible change (due to type erasure), but it's way too confusing / surprising with almost no real benefit. Besides, it'd require a fair amount of refactoring of the implementation. If you agree, then I can write up a KIP for this so that we can start the formal discussion. was (Author: rhauch): [~ewencp], the more I think about this issue, the more I think that it should be solved with a new subtype of {{SourceRecord}}, say {{OffsetRecord}}: {code:java} public class OffsetRecord extends SourceRecord { public OffsetRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) { super(sourcePartition, sourceOffset, null, null, null, null, null, null); } // with hashCode(), equals(), toString(), and newRecord(...) implementations } {code} The {{SourceTaskWorker}} would need to check instanceof and skip a few things it's currently doing (e.g., using null for key and value rather than converting them, not sending to the producer), but everything else would work fine as is since {{OffsetRecord}} would be a subtype of {{SourceRecord}}. I also think that we should *not* introduce a supertype of {{SourceRecord}} and change the signature of {{SourceTask.poll()}} to return something other than {{List<SourceRecord>}}. Technically we could do this because it is a binary compatible change (due to type erasure), but it's way too confusing / surprising with almost no real benefit. Besides, it'd require a fair amount of refactoring of the implementation. If you agree, then I can write up a KIP for this so that we can start the formal discussion. > Allow Kafka Connect source tasks to produce offset without writing to topics > ---------------------------------------------------------------------------- > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Affects Versions: 0.9.0.1 > Reporter: Randall Hauch > Labels: needs-kip > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v6.3.15#6346)