I favour this approach too.

Andrew Schofield

On 01/10/2019, 09:15, "Ryanne Dolan" <ryannedo...@gmail.com> wrote:

    Thanks Randall, that works for me.
    
    Ryanne
    
    On Tue, Oct 1, 2019 at 9:09 AM Randall Hauch <rha...@gmail.com> wrote:
    
    > Apologies for the late entry -- I entirely missed this KIP and discussion.
    > :-(
    >
    > Thanks for creating the KIP and proposing this change. I do think it's
    > useful for source connector tasks to get more information about the
    > acknowledgement after the record was written.
    >
    > However, given the KIPs suggestion that the two `commitRecord(...)` method
    > variants are disjoint, I'm a bit surprised that the WorkerSourceTask would
    > do the following:
    >
    >     task.commitRecord(preTransformRecord);
    >     if (recordMetadata != null)
    >         task.commitRecord(preTransformRecord, recordMetadata);
    >
    > rather than:
    >
    >     if (recordMetadata != null)
    >         task.commitRecord(preTransformRecord, recordMetadata);
    >     else
    >         task.commitRecord(preTransformRecord);
    >
    > But if this is the case, I would argue that it is better to simply have 
one
    > `commitRecord(SourceRecord record, RecordMetadata metadata)` method that
    > clearly denotes that the metadata may be null if the record was not 
written
    > (e.g., an SMT caused it to be dropped) or could not be written (after
    > giving up retrying after failures in the SMTs and/or the converter), and
    > let the implementation deal with the differences. Essentially, we've be
    > deprecating the existing `commitRecord(SourceRecord)` method, changing the
    > framework to always use the new method, and having the new method by
    > default delegate to the existing method. (This is what Jun also suggested
    > on the PR request,
    > https://github.com/apache/kafka/pull/6295#discussion_r330097541). This is
    > backwards compatible for connector implementations that only override the
    > old method, yet provides a way for connectors that do implement the new 
API
    > to override the new method without having to also implement the old 
method,
    > too.
    >
    > IOW:
    >
    > @deprecated
    > public void commitRecord(SourceRecord sourceRecord) {
    >   // nop
    > }
    >
    > /**
    >  * <p>
    >  * Commit an individual {@link SourceRecord} when the callback from the
    > producer client is received, or if a record is filtered by a 
transformation
    > and not sent to the producer.
    >  * By default, this method delegates to the {@link
    > #commitRecord(SourceRecord)} method to maintain backward compatibility.
    > Tasks can choose to override this method,
    >  * override the {@link #commitRecord(SourceRecord)} method, or not 
override
    > either one.
    >  * </p>
    >  * <p>
    >  * SourceTasks are not required to implement this functionality; Kafka
    > Connect will record offsets
    >  * automatically. This hook is provided for systems that also need to 
store
    > offsets internally
    >  * in their own system.
    >  * </p>
    >  *
    >  * @param record {@link SourceRecord} that was successfully sent via the
    > producer.
    >  * @param recordMetadata the metadata from the producer's write
    > acknowledgement, or null if the record was not sent to the producer 
because
    > it was filtered by an SMT or could not be transformed and/or converted
    >  * @throws InterruptedException
    >  */
    > public void commitRecord(SourceRecord sourceRecord, RecordMetadata
    > recordMetadata) {
    >   commitRecord(sourceRecord);
    > }
    >
    > Best regards,
    >
    > Randall
    >
    >
    > On Thu, Jan 31, 2019 at 9:02 AM Ryanne Dolan <ryannedo...@gmail.com>
    > wrote:
    >
    > > Andrew, I have considered this, but I think passing null for
    > RecordMetadata
    > > would be surprising and error prone for anyone implementing SourceTask. 
I
    > > figure the only use-case for overriding this variant (and not the
    > existing
    > > one) is to capture the RecordMetadata. If that's the case, every
    > > implementation would need to check for null. What worries me is that an
    > > implementation that does not check for null will seem to work until an
    > SMT
    > > is configured to filter records, which I believe would be exceedingly
    > rare.
    > > Moreover, the presence of the RecordMetadata parameter strongly implies
    > > that the record has been sent and ACK'd, and it would be surprising to
    > > discover otherwise.
    > >
    > > On the other hand, the current PR makes it difficult to distinguish
    > between
    > > records that are filtered vs ACK'd. The implementing class would need to
    > > correlate across poll() and the two commitRecord() invocations in order
    > to
    > > find records that were poll()'d but not ACK'd. In contrast, if we passed
    > > null to commitRecord, the method would trivially know that the record 
was
    > > filtered. I think this is probably not a common use-case, so I don't
    > think
    > > we should worry about it. In fact, the existing commitRecord callback
    > seems
    > > to purposefully hide this detail from the implementing class, and I 
don't
    > > know why we'd try to expose it in the new method.
    > >
    > > This sort of confusion is why I originally proposed a new method name 
for
    > > this callback, as does the similar KIP-381. I agree that overloading the
    > > existing method is all-around easier, and I think a casual reader would
    > > make the correct assumption that RecordMetadata in the parameter list
    > > implies that the record was sent and ACK'd.
    > >
    > > > the connector implementor would want to provide only a single variant
    > of
    > > commitRecord()
    > >
    > > I think this would be true either way. The only reason you'd implement
    > both
    > > variants is to detect that a record has _not_ been ACK'd, which again I
    > > believe is a non-requirement.
    > >
    > > Would love to hear if you disagree.
    > >
    > > Thanks!
    > > Ryanne
    > >
    > >
    > > On Thu, Jan 31, 2019 at 3:47 AM Andrew Schofield <
    > > andrew_schofi...@live.com>
    > > wrote:
    > >
    > > > As you might expect, I like the overloaded commitRecord() but I think
    > the
    > > > overloaded method should be called in exactly the same situations as
    > the
    > > > previous method. When it does not reflect an ACK, the second parameter
    > > > could be null. The text of the KIP says that the overloaded method is
    > > only
    > > > called when a record is ACKed and I would have thought that the
    > connector
    > > > implementor would want to provide only a single variant of
    > > commitRecord().
    > > >
    > > > Andrew Schofield
    > > > IBM Event Streams
    > > >
    > > > On 31/01/2019, 03:00, "Ryanne Dolan" <ryannedo...@gmail.com> wrote:
    > > >
    > > >     I've updated the KIP and PR to overload commitRecord instead of
    > > adding
    > > > a
    > > >     new method. Here's the PR:
    > > >
    > > >
    > > >
    > >
    > 
https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fpull%2F6171&amp;data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151935856&amp;sdata=hxBWSTt5gF7AAVxw2P8%2BZ8duBB0T97gHOOYG6GCkdd8%3D&amp;reserved=0
    > > >
    > > >     Ryanne
    > > >
    > > >     On Mon, Jan 21, 2019 at 6:29 PM Ryanne Dolan <
    > ryannedo...@gmail.com>
    > > > wrote:
    > > >
    > > >     > Andrew Schofield suggested we overload the commitRecord method
    > > > instead of
    > > >     > adding a new one. Thoughts?
    > > >     >
    > > >     > Ryanne
    > > >     >
    > > >     > On Thu, Jan 17, 2019, 5:34 PM Ryanne Dolan <
    > ryannedo...@gmail.com
    > > > wrote:
    > > >     >
    > > >     >> I had to change the KIP number (concurrency is hard!) so the
    > link
    > > > is now:
    > > >     >>
    > > >     >>
    > > >     >>
    > > >
    > >
    > 
https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-416%253A%2BNotify%2BSourceTask%2Bof%2BACK%2527d%2Boffsets%252C%2Bmetadata&amp;data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151935856&amp;sdata=VkAFrM8B2ozCRJosPQjgM3aDD1cS%2Bob8KWVuNuuOJ9s%3D&amp;reserved=0
    > > >     >>
    > > >     >> Ryanne
    > > >     >>
    > > >     >> On Fri, Jan 11, 2019 at 2:43 PM Ryanne Dolan <
    > > ryannedo...@gmail.com
    > > > >
    > > >     >> wrote:
    > > >     >>
    > > >     >>> Hey y'all,
    > > >     >>>
    > > >     >>> Please review the following small KIP:
    > > >     >>>
    > > >     >>>
    > > >     >>>
    > > >
    > >
    > 
https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-414%253A%2BNotify%2BSourceTask%2Bof%2BACK%2527d%2Boffsets%252C%2Bmetadata&amp;data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151945855&amp;sdata=2mhXA4hEV3ZvrFaOcTqagO1rYNj1JsYAEDHQsFqkzG8%3D&amp;reserved=0
    > > >     >>>
    > > >     >>> Thanks!
    > > >     >>> Ryanne
    > > >     >>>
    > > >     >>
    > > >
    > > >
    > > >
    > >
    >
    

Reply via email to