[
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16020279#comment-16020279
]
Randall Hauch edited comment on KAFKA-3821 at 5/22/17 10:12 PM:
----------------------------------------------------------------
[~ewencp], the more I think about this issue, the more I think that it should
be solved with a new subtype of {{SourceRecord}}, say {{OffsetRecord}}:
{code:java}
public class OffsetRecord extends SourceRecord {
public OffsetRecord(Map<String, ?> sourcePartition,
Map<String, ?> sourceOffset) {
super(sourcePartition, sourceOffset,
null, null, null, null, null, null);
}
// with hashCode(), equals(), toString(), and newRecord(...)
// implementations
}
{code}
The {{SourceTaskWorker}} would need to check instanceof and skip a few things
it's currently doing (e.g., using null for key and value rather than converting
them, not sending to the producer), but everything else would work fine as is
since {{OffsetRecord}} would be a subtype of {{SourceRecord}}.
I also think that we should *not* introduce a supertype of {{SourceRecord}} and
change the signature of {{SourceTask.poll()}} to return something other than
{{List<SourceRecord>}}. Technically we could do this because it is a binary
compatible change (due to type erasure), but it's way too confusing /
surprising with almost no real benefit. Besides, it'd require a fair amount of
refactoring of the implementation.
If you agree, then I can write up a KIP for this so that we can start the
formal discussion.
was (Author: rhauch):
[~ewencp], the more I think about this issue, the more I think that it should
be solved with a new subtype of {{SourceRecord}}, say {{OffsetRecord}}:
{code:java}
public class OffsetRecord extends SourceRecord {
public OffsetRecord(Map<String, ?> sourcePartition,
Map<String, ?> sourceOffset) {
super(sourcePartition, sourceOffset, null, null, null, null, null, null);
}
// with hashCode(), equals(), toString(), and newRecord(...) implementations
}
{code}
The {{SourceTaskWorker}} would need to check instanceof and skip a few things
it's currently doing (e.g., using null for key and value rather than converting
them, not sending to the producer), but everything else would work fine as is
since {{OffsetRecord}} would be a subtype of {{SourceRecord}}.
I also think that we should *not* introduce a supertype of {{SourceRecord}} and
change the signature of {{SourceTask.poll()}} to return something other than
{{List<SourceRecord>}}. Technically we could do this because it is a binary
compatible change (due to type erasure), but it's way too confusing /
surprising with almost no real benefit. Besides, it'd require a fair amount of
refactoring of the implementation.
If you agree, then I can write up a KIP for this so that we can start the
formal discussion.
> Allow Kafka Connect source tasks to produce offset without writing to topics
> ----------------------------------------------------------------------------
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
> Issue Type: Improvement
> Components: KafkaConnect
> Affects Versions: 0.9.0.1
> Reporter: Randall Hauch
> Labels: needs-kip
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a
> database). Once the task completes those records, the connector wants to
> update the offsets (e.g., the snapshot is complete) but has no more records
> to be written to a topic. With this change, the task could simply supply an
> updated offset.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)