Gwen, It looks there are two requirements here: 1. Know which message failed. 2. Know which record batch failed. i.e. which messages fail together.
The current callback interface can easily support (1) - user can simply construct here own callback which store the message in it. Do you mean we want (2) as well? RecordBatch seems an internal data structure to producer, I am not sure what is the motivation to expose it in callback, can you elaborate a little bit? Thanks, Jiangjie (Becket) Qin On Mon, Oct 5, 2015 at 10:20 AM, Mayuresh Gharat <gharatmayures...@gmail.com > wrote: > +1 Ewen. > Currently the while creating the RecordBatch we extract the key and value > and add it to the batch. At a very high level, we will have to extract the > key and Value (minimum atleast) from each record in the RecordBatch, create > a wrapper with the exception,the key and the value and add it to the > callback. > > This can be done I think. > > Thanks, > > Mayuresh > > On Sun, Oct 4, 2015 at 8:53 PM, Ewen Cheslack-Postava <e...@confluent.io> > wrote: > > > Anything from the context of the initial send request that is included in > > the callback is for convenience -- the only things you *need* to have in > > the callback are the result values (exception/metadata). You can always > > capture that information when you construct the callback. A similar issue > > came up with ConsumerRebalanceListener in the new consumer, which > > previously had the Consumer object as its first parameter. > > > > The obvious drawback is that this approach requires allocating a separate > > callback object for each Producer.send() call, unlike the > > ConsumerRebalanceListener example where the Consumer is the same for all > > callbacks from the same consumer. I don't think this is really an issue, > > and in a lot of cases just works naturally by marking the ProducerRecord > as > > final if you need it (or any other relevant data). The cost of the > > allocation/GC doesn't seem substantial. I can imagine coming up with a > > microbenchmark where the cost is relatively large, but is it ever in > > practice? > > > > With regards to your specific proposal, I don't think the callback should > > include the RecordBatch. If anything it should only get the > ProducerRecord. > > Not only is RecordBatch internal (and in the internals package so it's > very > > clear), but I think it exposes implementation details that shouldn't > > necessarily appear in the API. If we wanted to do that, we'd need need to > > carry the ProducerRecord through with the batch. Right now, if I recall > > correctly, it is dropped during the send() since send() just extracts the > > data it needs as it adds it to the RecordAccumulator. > > > > -Ewen > > > > > > > > On Fri, Oct 2, 2015 at 9:32 PM, Gwen Shapira <g...@confluent.io> wrote: > > > > > Hi, > > > > > > Currently the callback of Producer gets either Metadata (partition + > > > offset) or Exception. > > > In case of an exception, it doesn't get any information other than the > > > exception. Which means that if I want to write failures to a "dead > letter > > > queue" of some kind, I have to wait on the Future. > > > > > > I'd like to add RecordBatch to onComplete, so in case of errors we can > do > > > something with those records. > > > > > > Questions: > > > 1. Does it seem necessary to anyone else? What do all of you do when > the > > > callback receives an exception? > > > 2. Any thoughts on how to add a parameter without causing compatibility > > > issues? I think it will require a new interface (in addition to > > Callback), > > > but hope there's a better way. > > > > > > Gwen > > > > > > > > > > > -- > > Thanks, > > Ewen > > > > > > -- > -Regards, > Mayuresh R. Gharat > (862) 250-7125 >