Yes, that makes sense. And it fits in very nicely with the current error handling framework.
On Thu, Oct 28, 2021 at 10:39 AM Knowles Atchison Jr <katchiso...@gmail.com> wrote: > That would work. I originally thought that it would be confusing to > overload that function when a Record that wasn't actually written, but > looking at SourceTask more closely, in commitRecord(SourceRecord, > RecordMetadata), the RecordMetadata is set to null in the event of a > filtered transformation so the framework is already doing this in a certain > regard. > > Knowles > > On Thu, Oct 28, 2021 at 10:29 AM Arjun Satish <arjun.sat...@gmail.com> > wrote: > > > To ack the message back to the source system, we already have a > > commitRecord method. Once the bad record is handled by skip/dlq, we could > > just call commitRecord() on it? > > > > On Thu, Oct 28, 2021 at 9:35 AM Knowles Atchison Jr < > katchiso...@gmail.com > > > > > wrote: > > > > > Hi Chris, > > > > > > Thank you for your reply! > > > > > > It is a clarity error regarding the javadoc. I am not operationally > > > familiar with all of the exceptions Kafka considers non-retriable, so I > > > pulled the list from Callback.java: > > > > > > > > > https://github.com/apache/kafka/blob/1afe2a5190e9c98e38c84dc793f4303ea51bc19b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java#L35 > > > to be an illustrative example of the types of exceptions that would > kill > > > the connector outright. Any exception thrown during the producer write > > will > > > be passed to this handler. I will update the KIP/PR to be more clear on > > > this matter. > > > > > > You raise an excellent point, how should the framework protect the > > > connector or developer from themselves? If a connector enables > > exactly-once > > > semantics, it would make sense to me to have the task killed. The > > framework > > > should enforce this type of misconfiguration that would break the > > internal > > > semantics of KIP-618. WorkerSourceTask could check the configuration > > before > > > handing off the records and exception to this function, fail initial > > > configuration check, or something of that nature. > > > > > > Hi Arjun, > > > > > > Thank you for your response! > > > > > > My specific use case is our custom JMS connector. We ack back to the > jms > > > broker once Kafka commits the record. We thread out our JMS consumer > such > > > that I would need access to the SourceRecord to confirm we are going to > > > throw away the message. > > > > > > Skipping such records, writing some log messages, and/or writing some > > error > > > context to a DLQ would cover most if not all of the use cases I > envision. > > > > > > "discard.message.on.producer.exception": "true" > > > > > > or some equivalent would get my personal use case 99% of the way > there. I > > > would still need some kind of callback from inside the connector with > the > > > Source Record to successfully ack back to my source system. > > > > > > I have updated the KIP regarding the callback being executed in a > > different > > > thread than poll(). > > > > > > Knowles > > > > > > On Thu, Oct 28, 2021 at 2:02 AM Arjun Satish <arjun.sat...@gmail.com> > > > wrote: > > > > > > > Hi Knowles, > > > > > > > > Thanks for the KIP! > > > > > > > > Could you please call out some use-cases on what the source > connectors > > > > would do when they hit such exceptions? I'm wondering if we would > need > > to > > > > do anything other than skipping such records, writing some log > > messages, > > > > and/or writing some error context to a DLQ? > > > > > > > > One of the goals for Connect was to abstract away intricacies of > Kafka > > > > topics, clients etc, so that connectors could focus on the external > > > systems > > > > themselves. Ideally, we'd want to see if we could call out the most > > > common > > > > cases and handle them in the framework itself, instead of delegating > > them > > > > back to the connector. This way, instead of the new API, we'd > probably > > > > introduce some more configuration options, but they could be > applicable > > > to > > > > all the connectors that are out there. > > > > > > > > Also, If the above mentioned are the most common uses, then we could > > > apply > > > > KIP-298 (with some adjustments) to source connectors for > non-retriable > > > > producer errors. > > > > > > > > If we decide to go with the API you are referring to though, would > the > > > > preTransformation record suffice? SMTs can be causing the actual > issues > > > > (for example, changing the topic name) that cause these non-retriable > > > > exceptions. The new callback might be receiving insufficient context > to > > > do > > > > any corrective action. > > > > > > > > In the documentation for the new API, we might want to specify that > > this > > > > callback will be called from a different thread than the ones calling > > > > poll(). So any shared objects must be protected appropriately. > > > > > > > > Cheers, > > > > > > > > On Wed, Oct 27, 2021 at 7:01 PM Chris Egerton > > > <chr...@confluent.io.invalid > > > > > > > > > wrote: > > > > > > > > > Hi Knowles, > > > > > > > > > > Thanks for the KIP. I may have more to say later but there's one > > thing > > > > I'd > > > > > like to make sure to share now. In the Javadocs for the proposed > > > > > SourceTask::ignoreNonRetriableProducerException method, > > > > > the InvalidProducerEpochException exception class is included as an > > > > example > > > > > of a non-retriable exception that may cause the new SourceTask > method > > > to > > > > be > > > > > invoked. This exception should only arise if the source task's > > producer > > > > is > > > > > a transactional producer, which is currently never the case and, > once > > > > > KIP-618 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-618 > ) > > is > > > > > merged, will only be the case when the task is running with > > > exactly-once > > > > > support. I wonder if it's safe to allow connectors to discard this > > > > > exception when they're running with exactly-once support, or if the > > > task > > > > > should still be unconditionally failed in that case? > > > > > > > > > > Cheers, > > > > > > > > > > Chris > > > > > > > > > > On Wed, Oct 27, 2021 at 5:39 PM John Roesler <vvcep...@apache.org> > > > > wrote: > > > > > > > > > > > Hi Knowles, > > > > > > > > > > > > Thanks for the reply! That all sounds reasonable to me, and > > > > > > that's a good catch regarding the SourceRecord. > > > > > > > > > > > > Thanks, > > > > > > -John > > > > > > > > > > > > On Wed, 2021-10-27 at 15:32 -0400, Knowles Atchison Jr > > > > > > wrote: > > > > > > > John, > > > > > > > > > > > > > > Thank you for the response and feedback! > > > > > > > > > > > > > > I originally started my first pass with the > > ProducerRecord<byte[], > > > > > > byte[]>. > > > > > > > For our connector, we need some of the information out of the > > > > > > SourceRecord > > > > > > > to ack our source system. If I had the actual ProducerRecord, I > > > would > > > > > > have > > > > > > > to convert it back before I would be able to do anything useful > > > with > > > > > it. > > > > > > I > > > > > > > think there is merit in providing both records as parameters to > > > this > > > > > > > callback. Then connector writers can decide which of the > > > > > representations > > > > > > of > > > > > > > the data is most useful to them. I also noticed that in my PR I > > was > > > > > > sending > > > > > > > the SourceRecord post transformation, when we really should be > > > > sending > > > > > > the > > > > > > > preTransformRecord. > > > > > > > > > > > > > > The Streams solution to this is very interesting. Given the > > nature > > > > of a > > > > > > > connector, to me it makes the most sense for the api call to be > > > part > > > > of > > > > > > > that task rather than an external class that is configurable. > > This > > > > > allows > > > > > > > the connector to use state it may have at the time to inform > > > > decisions > > > > > on > > > > > > > what to do with these producer exceptions. > > > > > > > > > > > > > > I have updated the KIP and PR. > > > > > > > > > > > > > > Knowles > > > > > > > > > > > > > > On Wed, Oct 27, 2021 at 1:03 PM John Roesler < > > vvcep...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > Good morning, Knowles, > > > > > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > > > > > To address your latest questions, it is fine to call for a > > > > > > > > vote if a KIP doesn't generate much discussion. Either the > > > > > > > > KIP was just not controversial enough for anyone to comment, > > > > > > > > in which case a vote is appropriate; or no one had time to > > > > > > > > review it, in which case, calling for a vote might be more > > > > > > > > provacative and elicit a response. > > > > > > > > > > > > > > > > As far as pinging people directly, one idea would be to look > > > > > > > > at the git history (git blame/praise) for the files you're > > > > > > > > changing to see which committers have recently been > > > > > > > > involved. Those are the folks who are most likely to have > > > > > > > > valuable feedback on your proposal. It might not be > > > > > > > > appropriate to directly email them, but I have seen KIP > > > > > > > > discussions before that requested feedback from people by > > > > > > > > name. It's probably not best to lead with that, but since no > > > > > > > > one has responded so far, it might not hurt. I'm sure that > > > > > > > > the reason they haven't noticed your KIP is just that they > > > > > > > > are so busy it slipped their radar. They might actually > > > > > > > > appreciate a more direct ping at this point. > > > > > > > > > > > > > > > > I'm happy to review, but as a caveat, I don't have much > > > > > > > > experience with using or maintaining Connect, so caveat > > > > > > > > emptor as far as my review goes. > > > > > > > > > > > > > > > > First of all, thanks for the well written KIP. Without much > > > > > > > > context, I was able to understand the motivation and > > > > > > > > proposal easily just by reading your document. > > > > > > > > > > > > > > > > I think your proposal is a good one. It seems like it would > > > > > > > > be pretty obvious as a user what (if anything) to do with > > > > > > > > the proposed method. > > > > > > > > > > > > > > > > For your reference, this proposal reminds me of these > > > > > > > > capabilities in Streams: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java > > > > > > > > . > > > > > > > > > > > > > > > > I'm not sure if there's value in bringing your proposed > > > > > > > > interface closer to that pattern or not. Streams and Connect > > > > > > > > are quite different domains after all. At least, I wanted > > > > > > > > you to be aware of them so you could consider the > > > > > > > > alternative API strategy they present. > > > > > > > > > > > > > > > > Regardless, I do wonder if it would be helpful to also > > > > > > > > include the actual ProducerRecord we tried to send, since > > > > > > > > there's a non-trivial transformation that takes place to > > > > > > > > convert the SourceRecord into a ProducerRecord. I'm not sure > > > > > > > > what people would do with it, exactly, but it might be > > > > > > > > helpful in deciding what to do about the exception, or maybe > > > > > > > > even in understanding the exception. > > > > > > > > > > > > > > > > Those are the only thoughts that come to my mind! Thanks > > > > > > > > again, > > > > > > > > -John > > > > > > > > > > > > > > > > On Wed, 2021-10-27 at 09:16 -0400, Knowles Atchison Jr > > > > > > > > wrote: > > > > > > > > > Good morning, > > > > > > > > > > > > > > > > > > Bumping this thread. Is there someone specific on the > Connect > > > > > > framework > > > > > > > > > team that I should ping? Is it appropriate to just call a > > vote? > > > > All > > > > > > > > source > > > > > > > > > connectors are dead in the water without a way to handle > > > producer > > > > > > write > > > > > > > > > exceptions. Thank you. > > > > > > > > > > > > > > > > > > Knowles > > > > > > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 8:33 AM Christopher Shannon < > > > > > > > > > christopher.l.shan...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > I also would find this feature useful to handle errors > > > better, > > > > > does > > > > > > > > anyone > > > > > > > > > > have any comments or feedback? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Oct 11, 2021 at 8:52 AM Knowles Atchison Jr < > > > > > > > > katchiso...@gmail.com > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Good morning, > > > > > > > > > > > > > > > > > > > > > > Bumping this for visibility. I would like this to go > into > > > the > > > > > > next > > > > > > > > > > release. > > > > > > > > > > > KIP freeze is Friday. > > > > > > > > > > > > > > > > > > > > > > Any comments and feedback are welcome. > > > > > > > > > > > > > > > > > > > > > > Knowles > > > > > > > > > > > > > > > > > > > > > > On Tue, Oct 5, 2021 at 4:24 PM Knowles Atchison Jr < > > > > > > > > > > katchiso...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hello all, > > > > > > > > > > > > > > > > > > > > > > > > I would like to discuss the following KIP: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-779%3A+Allow+Source+Tasks+to+Handle+Producer+Exceptions > > > > > > > > > > > > > > > > > > > > > > > > The main purpose is to allow Source Tasks the ability > > to > > > > see > > > > > > > > underlying > > > > > > > > > > > > Producer Exceptions and decide what to do rather than > > > being > > > > > > > > killed. In > > > > > > > > > > > our > > > > > > > > > > > > use cases we would want to log/write off some > > information > > > > and > > > > > > > > continue > > > > > > > > > > > > processing. > > > > > > > > > > > > > > > > > > > > > > > > PR is here: > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/11382 > > > > > > > > > > > > > > > > > > > > > > > > Any comments and feedback are welcome. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Knowles > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >