[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16020279#comment-16020279
 ] 

Randall Hauch edited comment on KAFKA-3821 at 5/22/17 10:12 PM:
----------------------------------------------------------------

[~ewencp], the more I think about this issue, the more I think that it should 
be solved with a new subtype of {{SourceRecord}}, say {{OffsetRecord}}:

{code:java}
public class OffsetRecord extends SourceRecord {
  public OffsetRecord(Map<String, ?> sourcePartition, 
                      Map<String, ?> sourceOffset) {
    super(sourcePartition, sourceOffset, 
          null, null, null, null, null, null);
  }
  // with hashCode(), equals(), toString(), and newRecord(...) 
  // implementations
}
{code}

The {{SourceTaskWorker}} would need to check instanceof and skip a few things 
it's currently doing (e.g., using null for key and value rather than converting 
them, not sending to the producer), but everything else would work fine as is 
since {{OffsetRecord}} would be a subtype of {{SourceRecord}}.

I also think that we should *not* introduce a supertype of {{SourceRecord}} and 
change the signature of {{SourceTask.poll()}} to return something other than 
{{List<SourceRecord>}}. Technically we could do this because it is a binary 
compatible change (due to type erasure), but it's way too confusing / 
surprising with almost no real benefit. Besides, it'd require a fair amount of 
refactoring of the implementation.

If you agree, then I can write up a KIP for this so that we can start the 
formal discussion.



was (Author: rhauch):
[~ewencp], the more I think about this issue, the more I think that it should 
be solved with a new subtype of {{SourceRecord}}, say {{OffsetRecord}}:

{code:java}
public class OffsetRecord extends SourceRecord {
  public OffsetRecord(Map<String, ?> sourcePartition, 
                      Map<String, ?> sourceOffset) {
    super(sourcePartition, sourceOffset, null, null, null, null, null, null);
  }
  // with hashCode(), equals(), toString(), and newRecord(...) implementations
}
{code}

The {{SourceTaskWorker}} would need to check instanceof and skip a few things 
it's currently doing (e.g., using null for key and value rather than converting 
them, not sending to the producer), but everything else would work fine as is 
since {{OffsetRecord}} would be a subtype of {{SourceRecord}}.

I also think that we should *not* introduce a supertype of {{SourceRecord}} and 
change the signature of {{SourceTask.poll()}} to return something other than 
{{List<SourceRecord>}}. Technically we could do this because it is a binary 
compatible change (due to type erasure), but it's way too confusing / 
surprising with almost no real benefit. Besides, it'd require a fair amount of 
refactoring of the implementation.

If you agree, then I can write up a KIP for this so that we can start the 
formal discussion.


> Allow Kafka Connect source tasks to produce offset without writing to topics
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-3821
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3821
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 0.9.0.1
>            Reporter: Randall Hauch
>              Labels: needs-kip
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to