Unfortunately both databases will be online during this so conflicts could occur in either direction. I had previously dug up an answer around modifying the JdbcIO here: https://stackoverflow.com/questions/56398422/exception-handling-in-apache-beam-pipelines-when-writing-to-database-using-java. But I just wanted to check that there wasn't a more "official" approach that I just hadn't come across yet.
On Tue, Apr 1, 2025 at 11:53 AM Robert Bradshaw via user < user@beam.apache.org> wrote: > Good question. I think it depends on who else is modifying the SQL > database. > > In the easy case (e.g. everything you want to write to your SQL > database comes from the NoSQL source) you could group (e.g. via a > GroupByKey) on your identifier, filter out duplicates with a > subsequent DoFn, and then write to your SQL database with the > assurance that there will be no duplicate keys. > > If you are concerned that there might be already existing, conflicting > entries in the SQL database, you can also add a step to check to see > if the identifier is already inserted (and, following the previous > step, wouldn't have to worry about another worker from this job > inserting the same identifier). > > If there are external, concurrent processes also modifying the SQL > database that you have to worry about, that gets a bit trickier, and > it seems necessary to do the dead letter queue as part of the write > itself. This may require modifying JdbcIO itself to return a dead > letter queue. > > - Robert > > > > On Tue, Apr 1, 2025 at 11:12 AM Jonathan Hope > <jonathan.douglas.h...@gmail.com> wrote: > > > > Hello, I had a question and was hoping this was the right place to ask. > > > > Let's say I'm moving data from a NoSQL database to a SQL database. Here > is an example document in the NoSQL database: > > > > { > > "id": "1234", > > "identifier": "5678" > > } > > > > The id is system generated, and the identifier is user provided. This is > being moved into a SQL database with two columns: > > > > id > > identifier > > > > In the SQL database there is a UNIQUE index on identifier, however the > same thing cannot be enforced on the NoSQL side. Now I could check for this > like so: > > > > Get source data > > Check to see if identifier has already been inserted > > Move duplicates to a dead letter queue > > Write the data > > Success > > > > But what could happen is: > > > > Get source data > > Check to see if identifier has already been inserted > > Move duplicates to a dead letter queue > > Another worker inserts a duplicate identifier > > Write the data > > Failure > > > > > > If I was doing this outside of the beam context I would try the write, > capture the errors, and then redirect the failures to some kind of dead > letter queue. However for the life of me I can't figure out how to do this > in Beam. In cases where writes are failing, retries will never succeed, and > you can't reliably check for the trigger of the failure ahead of time what > is the recommended pattern? > > > > Thanks! >