Hey Greg,

Thinking more, I do like the idea of a source-side equivalent of the
ErrantRecordReporter interface!

However, I also suspect we may have to reason more carefully about what
users could do with this kind of information in a DLQ topic. Yes, it's an
option to reset the connector (or a copy of it) to the earliest unprocessed
partition/offset in the DLQ topic and start processing data anew from
there, but this might lead to a flood of duplicates. IIRC it also wouldn't
even be this simple if we were to tolerate duplicates--users would have to
reset the connector to the offset just before the one present in the DLQ,
since resetting to the actual offset would make it appear to the connector
as if the record with that offset were produced and committed successfully.
Maybe we could include both the offset for the failed record, and the
offset for the last record before that one with the same source
partition--basically saying to the user "This is the one that failed, and
this is the one that needs to be reset to if you want to try again".

I'm starting to wonder if, for now, we could try to design something that's
useful for metadata and for manual intervention, but perhaps not for direct
usage with a connector (e.g., we wouldn't expect people to take the
contents of the DLQ topic and use it to start up a second connector for the
sole purpose of slurping up previously-dropped records). Thoughts?

Cheers,

Chris

On Tue, Mar 5, 2024 at 6:19 PM Greg Harris <greg.har...@aiven.io.invalid>
wrote:

> Hey Chris,
>
> That's a cool idea! That can certainly be applied for failures other
> than poll(), and could be useful when combined with the Offsets
> modification API.
>
> Perhaps failures inside of poll() can be handled by an extra
> mechanism, similar to the ErrantRecordReporter, which allows reporting
> affected source partition/source offsets when a meaningful key or
> value cannot be read.
>
> Thanks,
> Greg
>
> On Tue, Mar 5, 2024 at 3:03 PM Chris Egerton <fearthecel...@gmail.com>
> wrote:
> >
> > Hi Greg,
> >
> > This was my understanding as well--if we can't turn a record into a byte
> > array on the source side, it's difficult to know exactly what to write
> to a
> > DLQ topic.
> >
> > One idea I've toyed with recently is that we could write the source
> > partition and offset for the failed record (assuming, hopefully safely,
> > that these can at least be serialized). This may not cover all bases, is
> > highly dependent on how user-friendly the offsets published by the
> > connector are, and does come with the risk of data loss (if the upstream
> > system is wiped before skipped records can be recovered), but could be
> > useful in some scenarios.
> >
> > Thoughts?
> >
> > Chris
> >
> > On Tue, Mar 5, 2024 at 5:49 PM Greg Harris <greg.har...@aiven.io.invalid
> >
> > wrote:
> >
> > > Hi Yeikel,
> > >
> > > Thanks for your question. It certainly isn't clear from the original
> > > KIP-298, the attached discussion, or the follow-up KIP-610 as to why
> > > the situation is asymmetric.
> > >
> > > The reason as I understand it is: Source connectors are responsible
> > > for importing data to Kafka. If an error occurs during this process,
> > > then writing useful information to a dead letter queue about the
> > > failure is at least as difficult as importing the record correctly.
> > >
> > > For some examples:
> > > * If an error occurs during poll(), the external data has not yet been
> > > transformed into a SourceRecord that the framework can transform or
> > > serialize.
> > > * If an error occurs during conversion/serialization, the external
> > > data cannot be reasonably serialized to be forwarded to the DLQ.
> > > * If a record cannot be written to Kafka, such as due to being too
> > > large, the same failure is likely to happen with writing to the DLQ as
> > > well.
> > >
> > > For the Sink side, we already know that the data was properly
> > > serializable and appeared as a ConsumerRecord<byte[],byte[]>. That can
> > > be forwarded to the DLQ as-is with a reasonable expectation for
> > > success, with the same data formatting as the source topic.
> > >
> > > If you have a vision for how this can be improved and are interested,
> > > please consider opening a KIP! The situation can certainly be made
> > > better than it is today.
> > >
> > > Thanks!
> > > Greg
> > >
> > > On Tue, Mar 5, 2024 at 5:35 AM Yeikel Santana <em...@yeikel.com>
> wrote:
> > > >
> > > > Hi all,
> > > >
> > > > Sink connectors support Dear Letter Queues[1], but Source connectors
> > > don't seem to
> > > >
> > > > What is the reason that we decided to do that?
> > > >
> > > > In my data pipeline, I'd like to apply some transformations to the
> > > messages before they are sink, but that leaves me vulnerable to
> failures as
> > > I need to either fail the connector or employ logging to track source
> > > failures
> > > >
> > > > It seems that for now, I'll need to apply the transformations as a
> sink
> > > and possibly reinsert them back to Kafka for downstream consumption,
> but
> > > that sounds unnecessary
> > > >
> > > >
> > > > [1]
> > >
> https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=80453065#content/view/80453065
> > >
>

Reply via email to