Thanks Randall, that works for me.

Ryanne

On Tue, Oct 1, 2019 at 9:09 AM Randall Hauch <rha...@gmail.com> wrote:

> Apologies for the late entry -- I entirely missed this KIP and discussion.
> :-(
>
> Thanks for creating the KIP and proposing this change. I do think it's
> useful for source connector tasks to get more information about the
> acknowledgement after the record was written.
>
> However, given the KIPs suggestion that the two `commitRecord(...)` method
> variants are disjoint, I'm a bit surprised that the WorkerSourceTask would
> do the following:
>
>     task.commitRecord(preTransformRecord);
>     if (recordMetadata != null)
>         task.commitRecord(preTransformRecord, recordMetadata);
>
> rather than:
>
>     if (recordMetadata != null)
>         task.commitRecord(preTransformRecord, recordMetadata);
>     else
>         task.commitRecord(preTransformRecord);
>
> But if this is the case, I would argue that it is better to simply have one
> `commitRecord(SourceRecord record, RecordMetadata metadata)` method that
> clearly denotes that the metadata may be null if the record was not written
> (e.g., an SMT caused it to be dropped) or could not be written (after
> giving up retrying after failures in the SMTs and/or the converter), and
> let the implementation deal with the differences. Essentially, we've be
> deprecating the existing `commitRecord(SourceRecord)` method, changing the
> framework to always use the new method, and having the new method by
> default delegate to the existing method. (This is what Jun also suggested
> on the PR request,
> https://github.com/apache/kafka/pull/6295#discussion_r330097541). This is
> backwards compatible for connector implementations that only override the
> old method, yet provides a way for connectors that do implement the new API
> to override the new method without having to also implement the old method,
> too.
>
> IOW:
>
> @deprecated
> public void commitRecord(SourceRecord sourceRecord) {
>   // nop
> }
>
> /**
>  * <p>
>  * Commit an individual {@link SourceRecord} when the callback from the
> producer client is received, or if a record is filtered by a transformation
> and not sent to the producer.
>  * By default, this method delegates to the {@link
> #commitRecord(SourceRecord)} method to maintain backward compatibility.
> Tasks can choose to override this method,
>  * override the {@link #commitRecord(SourceRecord)} method, or not override
> either one.
>  * </p>
>  * <p>
>  * SourceTasks are not required to implement this functionality; Kafka
> Connect will record offsets
>  * automatically. This hook is provided for systems that also need to store
> offsets internally
>  * in their own system.
>  * </p>
>  *
>  * @param record {@link SourceRecord} that was successfully sent via the
> producer.
>  * @param recordMetadata the metadata from the producer's write
> acknowledgement, or null if the record was not sent to the producer because
> it was filtered by an SMT or could not be transformed and/or converted
>  * @throws InterruptedException
>  */
> public void commitRecord(SourceRecord sourceRecord, RecordMetadata
> recordMetadata) {
>   commitRecord(sourceRecord);
> }
>
> Best regards,
>
> Randall
>
>
> On Thu, Jan 31, 2019 at 9:02 AM Ryanne Dolan <ryannedo...@gmail.com>
> wrote:
>
> > Andrew, I have considered this, but I think passing null for
> RecordMetadata
> > would be surprising and error prone for anyone implementing SourceTask. I
> > figure the only use-case for overriding this variant (and not the
> existing
> > one) is to capture the RecordMetadata. If that's the case, every
> > implementation would need to check for null. What worries me is that an
> > implementation that does not check for null will seem to work until an
> SMT
> > is configured to filter records, which I believe would be exceedingly
> rare.
> > Moreover, the presence of the RecordMetadata parameter strongly implies
> > that the record has been sent and ACK'd, and it would be surprising to
> > discover otherwise.
> >
> > On the other hand, the current PR makes it difficult to distinguish
> between
> > records that are filtered vs ACK'd. The implementing class would need to
> > correlate across poll() and the two commitRecord() invocations in order
> to
> > find records that were poll()'d but not ACK'd. In contrast, if we passed
> > null to commitRecord, the method would trivially know that the record was
> > filtered. I think this is probably not a common use-case, so I don't
> think
> > we should worry about it. In fact, the existing commitRecord callback
> seems
> > to purposefully hide this detail from the implementing class, and I don't
> > know why we'd try to expose it in the new method.
> >
> > This sort of confusion is why I originally proposed a new method name for
> > this callback, as does the similar KIP-381. I agree that overloading the
> > existing method is all-around easier, and I think a casual reader would
> > make the correct assumption that RecordMetadata in the parameter list
> > implies that the record was sent and ACK'd.
> >
> > > the connector implementor would want to provide only a single variant
> of
> > commitRecord()
> >
> > I think this would be true either way. The only reason you'd implement
> both
> > variants is to detect that a record has _not_ been ACK'd, which again I
> > believe is a non-requirement.
> >
> > Would love to hear if you disagree.
> >
> > Thanks!
> > Ryanne
> >
> >
> > On Thu, Jan 31, 2019 at 3:47 AM Andrew Schofield <
> > andrew_schofi...@live.com>
> > wrote:
> >
> > > As you might expect, I like the overloaded commitRecord() but I think
> the
> > > overloaded method should be called in exactly the same situations as
> the
> > > previous method. When it does not reflect an ACK, the second parameter
> > > could be null. The text of the KIP says that the overloaded method is
> > only
> > > called when a record is ACKed and I would have thought that the
> connector
> > > implementor would want to provide only a single variant of
> > commitRecord().
> > >
> > > Andrew Schofield
> > > IBM Event Streams
> > >
> > > On 31/01/2019, 03:00, "Ryanne Dolan" <ryannedo...@gmail.com> wrote:
> > >
> > >     I've updated the KIP and PR to overload commitRecord instead of
> > adding
> > > a
> > >     new method. Here's the PR:
> > >
> > >
> > >
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fpull%2F6171&amp;data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151935856&amp;sdata=hxBWSTt5gF7AAVxw2P8%2BZ8duBB0T97gHOOYG6GCkdd8%3D&amp;reserved=0
> > >
> > >     Ryanne
> > >
> > >     On Mon, Jan 21, 2019 at 6:29 PM Ryanne Dolan <
> ryannedo...@gmail.com>
> > > wrote:
> > >
> > >     > Andrew Schofield suggested we overload the commitRecord method
> > > instead of
> > >     > adding a new one. Thoughts?
> > >     >
> > >     > Ryanne
> > >     >
> > >     > On Thu, Jan 17, 2019, 5:34 PM Ryanne Dolan <
> ryannedo...@gmail.com
> > > wrote:
> > >     >
> > >     >> I had to change the KIP number (concurrency is hard!) so the
> link
> > > is now:
> > >     >>
> > >     >>
> > >     >>
> > >
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-416%253A%2BNotify%2BSourceTask%2Bof%2BACK%2527d%2Boffsets%252C%2Bmetadata&amp;data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151935856&amp;sdata=VkAFrM8B2ozCRJosPQjgM3aDD1cS%2Bob8KWVuNuuOJ9s%3D&amp;reserved=0
> > >     >>
> > >     >> Ryanne
> > >     >>
> > >     >> On Fri, Jan 11, 2019 at 2:43 PM Ryanne Dolan <
> > ryannedo...@gmail.com
> > > >
> > >     >> wrote:
> > >     >>
> > >     >>> Hey y'all,
> > >     >>>
> > >     >>> Please review the following small KIP:
> > >     >>>
> > >     >>>
> > >     >>>
> > >
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-414%253A%2BNotify%2BSourceTask%2Bof%2BACK%2527d%2Boffsets%252C%2Bmetadata&amp;data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151945855&amp;sdata=2mhXA4hEV3ZvrFaOcTqagO1rYNj1JsYAEDHQsFqkzG8%3D&amp;reserved=0
> > >     >>>
> > >     >>> Thanks!
> > >     >>> Ryanne
> > >     >>>
> > >     >>
> > >
> > >
> > >
> >
>

Reply via email to