Unfortunately both databases will be online during this so conflicts could
occur in either direction. I had previously dug up an answer around
modifying the JdbcIO here:
https://stackoverflow.com/questions/56398422/exception-handling-in-apache-beam-pipelines-when-writing-to-database-using-java.
But I just wanted to check that there wasn't a more "official" approach
that I just hadn't come across yet.

On Tue, Apr 1, 2025 at 11:53 AM Robert Bradshaw via user <
user@beam.apache.org> wrote:

> Good question. I think it depends on who else is modifying the SQL
> database.
>
> In the easy case (e.g. everything you want to write to your SQL
> database comes from the NoSQL source) you could group (e.g. via a
> GroupByKey) on your identifier, filter out duplicates with a
> subsequent DoFn, and then write to your SQL database with the
> assurance that there will be no duplicate keys.
>
> If you are concerned that there might be already existing, conflicting
> entries in the SQL database, you can also add a step to check to see
> if the identifier is already inserted (and, following the previous
> step, wouldn't have to worry about another worker from this job
> inserting the same identifier).
>
> If there are external, concurrent processes also modifying the SQL
> database that you have to worry about, that gets a bit trickier, and
> it seems necessary to do the dead letter queue as part of the write
> itself. This may require modifying JdbcIO itself to return a dead
> letter queue.
>
> - Robert
>
>
>
> On Tue, Apr 1, 2025 at 11:12 AM Jonathan Hope
> <jonathan.douglas.h...@gmail.com> wrote:
> >
> > Hello, I had a question and was hoping this was the right place to ask.
> >
> > Let's say I'm moving data from a NoSQL database to a SQL database. Here
> is an example document in the NoSQL database:
> >
> > {
> >  "id": "1234",
> >   "identifier": "5678"
> > }
> >
> > The id is system generated, and the identifier is user provided. This is
> being moved into a SQL database with two columns:
> >
> > id
> > identifier
> >
> > In the SQL database there is a UNIQUE index on identifier, however the
> same thing cannot be enforced on the NoSQL side. Now I could check for this
> like so:
> >
> > Get source data
> > Check to see if identifier has already been inserted
> > Move duplicates to a dead letter queue
> > Write the data
> > Success
> >
> > But what could happen is:
> >
> > Get source data
> > Check to see if identifier has already been inserted
> > Move duplicates to a dead letter queue
> > Another worker inserts a duplicate identifier
> > Write the data
> > Failure
> >
> >
> > If I was doing this outside of the beam context I would try the write,
> capture the errors, and then redirect the failures to some kind of dead
> letter queue. However for the life of me I can't figure out how to do this
> in Beam. In cases where writes are failing, retries will never succeed, and
> you can't reliably check for the trigger of the failure ahead of time what
> is the recommended pattern?
> >
> > Thanks!
>

Reply via email to