Okay sounds good. So, below two options would help me to capture CDC changes:
1) Delta lake 2) Maintaining snapshot of records with some indicators and timestamp. Correct me if I'm wrong Thanks, Sid On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, <mich.talebza...@gmail.com> wrote: > There are two ways of doing it. > > > 1. Through snapshot offered meaning an immutable snapshot of the state > of the table at a given version. For example, the state > <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta > table <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at > the version > <https://books.japila.pl/delta-lake-internals/Snapshot/#version>. > 2. Creating your own versioning. Taking your example you define the > target storage *with two added columns, namely:* op_type INT > (1-inset,2-update,3-delete) and op_timeTIMESTAMP <as of ingestion_time>. > Your example record will be > > > id op_type op_time > > 1 1 <ingestion_time> > > 1 3 <ingestion_time> > > > df = rdd.toDF(). \ > > withColumnRenamed("_1", "ID"). \ > > withColumnRenamed("_2", "CLUSTERED"). \ > > withColumnRenamed("_3", "SCATTERED"). \ > > withColumnRenamed("_4", "RANDOMISED"). \ > > withColumnRenamed("_5", "RANDOM_STRING"). \ > > withColumnRenamed("_6", "SMALL_VC"). \ > > withColumnRenamed("_7", "PADDING"). \ > > withColumn("op_type", lit(1)). \ > > withColumn("op_time", current_timestamp()) > > Then you can look at all records that were created and subsequently > deleted and at what time > > > SELECT ID, op_time FROM my_table> WHERE op_type in (1,3) > > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 27 Jan 2022 at 17:54, Sid Kal <flinkbyhe...@gmail.com> wrote: > >> Hi Sean, >> >> So you mean if I use those file formats it will do the work of CDC >> automatically or I would have to handle it via code ? >> >> Hi Mich, >> >> Not sure if I understood you. Let me try to explain my scenario. Suppose >> there is a Id "1" which is inserted today, so I transformed and ingested >> it. Now suppose if this user id is deleted from the source itself. Then how >> can I delete it in my transformed db >> ? >> >> >> >> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sro...@gmail.com> wrote: >> >>> This is what storage engines like Delta, Hudi, Iceberg are for. No need >>> to manage it manually or use a DBMS. These formats allow deletes, upserts, >>> etc of data, using Spark, on cloud storage. >>> >>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Where ETL data is stored? >>>> >>>> >>>> >>>> *But now the main problem is when the record at the source is deleted, >>>> it should be deleted in my final transformed record too.* >>>> >>>> >>>> If your final sync (storage) is data warehouse, it should be soft >>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp). >>>> >>>> >>>> >>>> HTH >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <flinkbyhe...@gmail.com> wrote: >>>> >>>>> I am using Spark incremental approach for bringing the latest data >>>>> everyday. Everything works fine. >>>>> >>>>> But now the main problem is when the record at the source is deleted, >>>>> it should be deleted in my final transformed record too. >>>>> >>>>> How do I capture such changes and change my table too ? >>>>> >>>>> Best regards, >>>>> Sid >>>>> >>>>>