I favour this approach too. Andrew Schofield
On 01/10/2019, 09:15, "Ryanne Dolan" <ryannedo...@gmail.com> wrote: Thanks Randall, that works for me. Ryanne On Tue, Oct 1, 2019 at 9:09 AM Randall Hauch <rha...@gmail.com> wrote: > Apologies for the late entry -- I entirely missed this KIP and discussion. > :-( > > Thanks for creating the KIP and proposing this change. I do think it's > useful for source connector tasks to get more information about the > acknowledgement after the record was written. > > However, given the KIPs suggestion that the two `commitRecord(...)` method > variants are disjoint, I'm a bit surprised that the WorkerSourceTask would > do the following: > > task.commitRecord(preTransformRecord); > if (recordMetadata != null) > task.commitRecord(preTransformRecord, recordMetadata); > > rather than: > > if (recordMetadata != null) > task.commitRecord(preTransformRecord, recordMetadata); > else > task.commitRecord(preTransformRecord); > > But if this is the case, I would argue that it is better to simply have one > `commitRecord(SourceRecord record, RecordMetadata metadata)` method that > clearly denotes that the metadata may be null if the record was not written > (e.g., an SMT caused it to be dropped) or could not be written (after > giving up retrying after failures in the SMTs and/or the converter), and > let the implementation deal with the differences. Essentially, we've be > deprecating the existing `commitRecord(SourceRecord)` method, changing the > framework to always use the new method, and having the new method by > default delegate to the existing method. (This is what Jun also suggested > on the PR request, > https://github.com/apache/kafka/pull/6295#discussion_r330097541). This is > backwards compatible for connector implementations that only override the > old method, yet provides a way for connectors that do implement the new API > to override the new method without having to also implement the old method, > too. > > IOW: > > @deprecated > public void commitRecord(SourceRecord sourceRecord) { > // nop > } > > /** > * <p> > * Commit an individual {@link SourceRecord} when the callback from the > producer client is received, or if a record is filtered by a transformation > and not sent to the producer. > * By default, this method delegates to the {@link > #commitRecord(SourceRecord)} method to maintain backward compatibility. > Tasks can choose to override this method, > * override the {@link #commitRecord(SourceRecord)} method, or not override > either one. > * </p> > * <p> > * SourceTasks are not required to implement this functionality; Kafka > Connect will record offsets > * automatically. This hook is provided for systems that also need to store > offsets internally > * in their own system. > * </p> > * > * @param record {@link SourceRecord} that was successfully sent via the > producer. > * @param recordMetadata the metadata from the producer's write > acknowledgement, or null if the record was not sent to the producer because > it was filtered by an SMT or could not be transformed and/or converted > * @throws InterruptedException > */ > public void commitRecord(SourceRecord sourceRecord, RecordMetadata > recordMetadata) { > commitRecord(sourceRecord); > } > > Best regards, > > Randall > > > On Thu, Jan 31, 2019 at 9:02 AM Ryanne Dolan <ryannedo...@gmail.com> > wrote: > > > Andrew, I have considered this, but I think passing null for > RecordMetadata > > would be surprising and error prone for anyone implementing SourceTask. I > > figure the only use-case for overriding this variant (and not the > existing > > one) is to capture the RecordMetadata. If that's the case, every > > implementation would need to check for null. What worries me is that an > > implementation that does not check for null will seem to work until an > SMT > > is configured to filter records, which I believe would be exceedingly > rare. > > Moreover, the presence of the RecordMetadata parameter strongly implies > > that the record has been sent and ACK'd, and it would be surprising to > > discover otherwise. > > > > On the other hand, the current PR makes it difficult to distinguish > between > > records that are filtered vs ACK'd. The implementing class would need to > > correlate across poll() and the two commitRecord() invocations in order > to > > find records that were poll()'d but not ACK'd. In contrast, if we passed > > null to commitRecord, the method would trivially know that the record was > > filtered. I think this is probably not a common use-case, so I don't > think > > we should worry about it. In fact, the existing commitRecord callback > seems > > to purposefully hide this detail from the implementing class, and I don't > > know why we'd try to expose it in the new method. > > > > This sort of confusion is why I originally proposed a new method name for > > this callback, as does the similar KIP-381. I agree that overloading the > > existing method is all-around easier, and I think a casual reader would > > make the correct assumption that RecordMetadata in the parameter list > > implies that the record was sent and ACK'd. > > > > > the connector implementor would want to provide only a single variant > of > > commitRecord() > > > > I think this would be true either way. The only reason you'd implement > both > > variants is to detect that a record has _not_ been ACK'd, which again I > > believe is a non-requirement. > > > > Would love to hear if you disagree. > > > > Thanks! > > Ryanne > > > > > > On Thu, Jan 31, 2019 at 3:47 AM Andrew Schofield < > > andrew_schofi...@live.com> > > wrote: > > > > > As you might expect, I like the overloaded commitRecord() but I think > the > > > overloaded method should be called in exactly the same situations as > the > > > previous method. When it does not reflect an ACK, the second parameter > > > could be null. The text of the KIP says that the overloaded method is > > only > > > called when a record is ACKed and I would have thought that the > connector > > > implementor would want to provide only a single variant of > > commitRecord(). > > > > > > Andrew Schofield > > > IBM Event Streams > > > > > > On 31/01/2019, 03:00, "Ryanne Dolan" <ryannedo...@gmail.com> wrote: > > > > > > I've updated the KIP and PR to overload commitRecord instead of > > adding > > > a > > > new method. Here's the PR: > > > > > > > > > > > > https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fpull%2F6171&data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151935856&sdata=hxBWSTt5gF7AAVxw2P8%2BZ8duBB0T97gHOOYG6GCkdd8%3D&reserved=0 > > > > > > Ryanne > > > > > > On Mon, Jan 21, 2019 at 6:29 PM Ryanne Dolan < > ryannedo...@gmail.com> > > > wrote: > > > > > > > Andrew Schofield suggested we overload the commitRecord method > > > instead of > > > > adding a new one. Thoughts? > > > > > > > > Ryanne > > > > > > > > On Thu, Jan 17, 2019, 5:34 PM Ryanne Dolan < > ryannedo...@gmail.com > > > wrote: > > > > > > > >> I had to change the KIP number (concurrency is hard!) so the > link > > > is now: > > > >> > > > >> > > > >> > > > > > > https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-416%253A%2BNotify%2BSourceTask%2Bof%2BACK%2527d%2Boffsets%252C%2Bmetadata&data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151935856&sdata=VkAFrM8B2ozCRJosPQjgM3aDD1cS%2Bob8KWVuNuuOJ9s%3D&reserved=0 > > > >> > > > >> Ryanne > > > >> > > > >> On Fri, Jan 11, 2019 at 2:43 PM Ryanne Dolan < > > ryannedo...@gmail.com > > > > > > > >> wrote: > > > >> > > > >>> Hey y'all, > > > >>> > > > >>> Please review the following small KIP: > > > >>> > > > >>> > > > >>> > > > > > > https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-414%253A%2BNotify%2BSourceTask%2Bof%2BACK%2527d%2Boffsets%252C%2Bmetadata&data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151945855&sdata=2mhXA4hEV3ZvrFaOcTqagO1rYNj1JsYAEDHQsFqkzG8%3D&reserved=0 > > > >>> > > > >>> Thanks! > > > >>> Ryanne > > > >>> > > > >> > > > > > > > > > > > >