Hello all, I'd actually like to retract the earlier additions to the KIP and point out that since I've started the voting process and gotten some responses, I will not be making any major changes to the KIP as it would require a re-voting process.
Thanks, Aakash On Tue, May 19, 2020 at 2:31 PM Aakash Shah <as...@confluent.io> wrote: > Hi Chris and others, > > Yes, you are correct; I looked through KIP-298 to understand it better. I > agree with your idea to handle "errors.tolerance=none." > > I see, you are basically saying you are in favor of standardizing handling > what to set the reporter to if it is not configured. I am on board with > this proposal, especially if this is in line with previous behaviors as you > mentioned. > > I will add both of these suggestions to the KIP. > > Lastly, unless anyone has any issues with Chris's suggestions, I believe > the last part we have to come to a consensus is using a Future as the > return type. I am for giving extra guarantees to the user if they wish; > however, I am not very familiar with the potential issues with the consumer > heartbeat as Arjun pointed out. Does anyone have any thoughts on this? > > Thanks, > Aakash > > On Tue, May 19, 2020 at 2:10 PM Chris Egerton <chr...@confluent.io> wrote: > >> Hi Aakash, >> >> > If "errors.tolerance=none", should it not be the case that the error >> reporter does not even report any error; rather, the task just fails after >> throwing the error? I do understand the point you are saying about >> duplicates, though. >> >> I believe the "errors.tolerance" property dictates whether a task should >> fail after a record that causes problems during conversion or >> transformation is encountered and reported (for example, by writing to a >> DLQ). If it is set to "none", then the task will fail immediately; if it >> is >> set to "all", then the task will continue running normally. So if we want >> to preserve that behavior, we might want to immediately throw an exception >> when an errant record is reported by a "SinkTask" instance and the user >> has >> configured "errors.tolerance = none", which unless caught will cause the >> task to cease writing records to the sink. In addition to throwing that >> exception, we should also still fail the task; the exception is just a way >> to (hopefully) interrupt the task's processing of records in order to >> prevent duplicates if/when the task is restarted later on. >> >> > Lastly, why do you say we should always provide an errant record >> reporter? >> Doesn't that change the contract of what functionality it is providing? >> >> I'm just thinking that instead of returning "null" when no errant record >> reporter is configured, we could return one that always fails the task and >> throws an exception. This seems in line with the default behavior of the >> framework when no error handling configuration properties are specified >> and >> a record causes problems during conversion or transformation. We could >> leave the choice in the hands of developers but this might make things >> confusing for users who get different behavior from different connectors >> under the same circumstances. >> >> Hope this helps! >> >> Cheers, >> >> Chris >> >> On Tue, May 19, 2020 at 1:50 PM Aakash Shah <as...@confluent.io> wrote: >> >> > Hi Arjun, >> > >> > I am not very familiar with how the potential heartbeat failure would >> cause >> > more failures when consuming subsequent records. Can you elaborate on >> this? >> > >> > Thanks, >> > Aakash >> > >> > On Tue, May 19, 2020 at 10:03 AM Arjun Satish <arjun.sat...@gmail.com> >> > wrote: >> > >> > > One more concern with the connector blocking on the Future's get() is >> > that >> > > it may cause the task's consumer to fail to heartbeat (since there is >> no >> > > independent thread to do this). That would then cause failures when we >> > > eventually try to consume more records after returning from put(). The >> > > developer would need to be cognizant of these bits before waiting on >> the >> > > future, which adds a reasonable amount of complexity. >> > > >> > > Even with preCommit() returning incomplete offsets, I suppose the >> concern >> > > would be that the put() method keeps giving the task more records, >> and to >> > > truly pause the "firehose", the task needs to pause all partitions? >> > > >> > > >> > > On Tue, May 19, 2020 at 9:26 AM Arjun Satish <arjun.sat...@gmail.com> >> > > wrote: >> > > >> > > > Can we get a couple of examples that shows utility of waiting on the >> > > > Future<>? Also, in preCommit() we would report back on the >> incomplete >> > > > offsets. So that feedback mechanism will already exists for >> developers >> > > who >> > > > want to manually manage this. >> > > > >> > > > On Tue, May 19, 2020 at 8:03 AM Randall Hauch <rha...@gmail.com> >> > wrote: >> > > > >> > > >> Thanks, Aakash, for updating the KIP. >> > > >> >> > > >> On Tue, May 19, 2020 at 2:18 AM Arjun Satish < >> arjun.sat...@gmail.com> >> > > >> wrote: >> > > >> >> > > >> > Hi Randall, >> > > >> > >> > > >> > Thanks for the explanation! Excellent point about guaranteeing >> > offsets >> > > >> in >> > > >> > the async case. >> > > >> > >> > > >> > If we can guarantee that the offsets will be advanced only after >> the >> > > bad >> > > >> > records are reported, then is there any value is the Future<> >> return >> > > >> type? >> > > >> > I feel we can declare the function with a void return type: >> > > >> > >> > > >> > void report(SinkRecord failedRecord, Throwable error) >> > > >> > >> > > >> > that works asynchronously, and advances offsets only after the >> DLQ >> > > >> producer >> > > >> > (and other reporters) complete successfully (as you explained). >> > > >> > >> > > >> > This actually alleviates my concern of what this Future<> >> actually >> > > >> means. >> > > >> > Since a failure to report should kill the tasks, there is no >> reason >> > > for >> > > >> the >> > > >> > connector to ever wait on the get(). >> > > >> >> > > >> >> > > >> We should not say "there is no reason", because we don't know all >> of >> > the >> > > >> requirements that might exist for sending records to external >> systems. >> > > The >> > > >> additional guarantee regarding error records being fully recorded >> > before >> > > >> `preCommit(...)` is called is a minimal guarantee that Connect >> > provides >> > > >> the >> > > >> sink task, and returning a Future allow a sink task to have >> *stronger* >> > > >> guarantees than what Connect provides by default. >> > > >> >> > > >> Once again: >> > > >> 1. we need an async API to allow the sink task to report problem >> > records >> > > >> and then immediately continue doing more work. >> > > >> 2. Connect should guarantee to the sink task that all reported >> records >> > > >> will >> > > >> actually be recorded before `preCommit(...)` is called >> > > >> 3. a sink task *might* need stronger guarantees, and may need to >> block >> > > on >> > > >> the reported records some time before `preCommit(...)`, and we >> should >> > > >> allow >> > > >> them to do this. >> > > >> 4. Future and callbacks are common techniques, but there are >> > significant >> > > >> runtime risks of using callbacks, whereas Future is a >> common/standard >> > > >> pattern that is straightforward to use. >> > > >> >> > > >> This *exactly* matches the current KIP, which is why I plan to vote >> > for >> > > >> this valuable and well-thought out KIP. >> > > >> >> > > >> >> > > >> > And if we are guaranteeing that the >> > > >> > offsets are only advanced when the errors are reported, then this >> > > >> becomes a >> > > >> > double win: >> > > >> > >> > > >> > 1. connector developers can literally fire and forget failed >> > records. >> > > >> > 2. offsets are correctly advanced on errors being reported. >> Failure >> > to >> > > >> > report error will kill the task, and the last committed offset >> will >> > be >> > > >> the >> > > >> > correct one. >> > > >> >> > > >> >> > > >> > The main contract would simply be to call report() before >> > preCommit() >> > > or >> > > >> > before put() returns in the task, so the framework knows that >> that >> > > there >> > > >> > are error records reported, and those need to finish before the >> > > offsets >> > > >> can >> > > >> > be advanced. >> > > >> > >> > > >> > I think I'd be pretty excited about this API. and if we all >> agree, >> > > then >> > > >> > let's go ahead with this? >> > > >> >> > > >> >> > > >> > Best, >> > > >> > >> > > >> > >> > > >> > >> > > >> >> > > > >> > > >> > >> >