Arjun, it's exciting to see a KIP around better handling of bad-data and errors in Kafka Connect.
I have only a few comments below, which I hope you'll find helpful. 1. I think it'd be useful to describe a bit more in detail how someone can extract the raw data of a Kafka record that failed to get converted (on the sink side in this example). How's the JSON schema looks like for an entry that is added to the dead-letter-queue and what someone should do to get the raw bytes? 2. Similarly, it'd be nice to describe a bit more what is placed or attempted to be placed in the dead-letter-queue in the case of source records that fail to get imported to Kafka. Currently the only sentence I read related to that is: "Similarly, for source connectors, the developer can write the corrected records back to the original source". 3. I think the plural for 'retries' in config options: 'errors.retries.limit' and 'errors.retries.delay.max.ms' doesn't read very well. Should 'retry' be used same as 'tolerance' (or 'log') is used right below? For example: errors.retry.limit and errors.retry.delay.max.ms 4. Should the metric names be 'total-record-failures' and 'total-records-skipped' to match their metric description and also be similar to 'total-retries'? And a few minor comments: - The domain of 'errors.retries.limit' does not include 0 in the allowed values (even though it's the default value). - For someone unfamiliar with the term SMT, the acronym is not explained in the text. Also the term transformations is better IMO. - typo: 'the task is to killed' - If you intend to add a link to a PR additionally to the jira ticket, it'd be handy to add it to the KIP header (along with state, thread, jira, etc). Now it's a bit hidden in the text and it's not clear that the KIP includes a link to a PR. Thanks for working on this missing but important functionality. - Konstantine On Tue, May 15, 2018 at 10:41 PM, Arjun Satish <arjun.sat...@gmail.com> wrote: > Magesh, > > Just to add to your point about retriable exceptions: the producer can > throw retriable exceptions which we are handling it here: > > https://github.com/apache/kafka/blob/trunk/connect/ > runtime/src/main/java/org/apache/kafka/connect/runtime/ > WorkerSourceTask.java#L275 > > BTW, exceptions like TimeoutExceptions (which extend RetriableExceptions) > are bubbled back to the application, and need to be handled as per > application requirements. > > Best, > > On Tue, May 15, 2018 at 8:30 PM, Arjun Satish <arjun.sat...@gmail.com> > wrote: > > > Magesh, > > > > Thanks for the feedback! Really appreciate your comments. > > > > 1. I updated the KIP to state that only the configs of the failed > > operation will be emitted. Thank you! > > > > The purpose of bundling the configs of the failed operation along with > the > > error context is to have a single place to find everything relevant to > the > > failure. This way, we can only look at the error logs to find the most > > common pieces to "failure" puzzles: the operation, the config and the > input > > record. Ideally, a programmer should be able to take these pieces and > > reproduce the error locally. > > > > 2. Added a table to describe this in the KIP. > > > > 3. Raw bytes will be base64 encoded before being logged. Updated the KIP > > to state this. Thank you! > > > > 4. I'll add an example log4j config to show we can take logs from a class > > and redirect it to a different location. Made a note in the PR for this. > > > > 5. When we talk about logging messages, this could mean instances of > > SinkRecords or SourceRecords. When we disable logging of messages, these > > records would be replaced by a "null". If you think it makes sense, > instead > > of completely dropping the object, we could drop only the key and value > > objects from ConnectRecord? That way some context will still be retained. > > > > 6. Yes, for now I think it is good to have explicit config in Connectors > > which dictates the error handling behavior. If this becomes an > > inconvenience, we can think of having a cluster global default, or better > > defaults in the configs. > > > > Best, > > > > > > On Tue, May 15, 2018 at 1:07 PM, Magesh Nandakumar <mage...@confluent.io > > > > wrote: > > > >> Hi Arjun, > >> > >> I think this a great KIP and would be a great addition to have in > connect. > >> Had a couple of minor questions: > >> > >> 1. What would be the value in logging the connector config using > >> errors.log.include.configs > >> for every message? > >> 2. Not being picky on format here but it might be clearer if the > behavior > >> is called out for each stage separately and what the connector > developers > >> need to do ( may be a tabular format). Also, I think all retriable > >> exception when talking to Broker are never propagated to the Connect > >> Framework since the producer is configured to try indefinitely > >> 3. If a message fails in serialization, would the raw bytes be available > >> to > >> the dlq or the error log > >> 4. Its not necessary to mention in KIP, but it might be better to > separate > >> the error records to a separate log file as part of the default log4j > >> properties > >> 5. If we disable message logging, would there be any other metadata > >> available like offset that helps reference the record? > >> 6. If I need error handler for all my connectors, would I have to set it > >> up > >> for each of them? I would think most people might want the behavior > >> applied > >> to all the connectors. > >> > >> Let me know your thoughts :). > >> > >> Thanks > >> Magesh > >> > >> On Tue, May 8, 2018 at 11:59 PM, Arjun Satish <arjun.sat...@gmail.com> > >> wrote: > >> > >> > All, > >> > > >> > I'd like to start a discussion on adding ways to handle and report > >> record > >> > processing errors in Connect. Please find a KIP here: > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> > 298%3A+Error+Handling+in+Connect > >> > > >> > Any feedback will be highly appreciated. > >> > > >> > Thanks very much, > >> > Arjun > >> > > >> > > > > >