Okay sounds good.

So,  below two options would help me to capture CDC changes:

1) Delta lake
2) Maintaining snapshot of records with some indicators and timestamp.

Correct me if I'm wrong

Thanks,
Sid

On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, <mich.talebza...@gmail.com>
wrote:

> There are two ways of doing it.
>
>
>    1. Through snapshot offered meaning an immutable snapshot of the state
>    of the table at a given version. For example, the state
>    <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta
>    table <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
>    the version
>    <https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
>    2. Creating your own versioning. Taking your example you define the
>    target storage *with two added columns, namely:* op_type INT
>    (1-inset,2-update,3-delete) and op_timeTIMESTAMP <as of ingestion_time>.
>    Your example record will be
>
>
> id               op_type      op_time
>
> 1                1             <ingestion_time>
>
> 1                3             <ingestion_time>
>
>
>        df = rdd.toDF(). \
>
>             withColumnRenamed("_1", "ID"). \
>
>             withColumnRenamed("_2", "CLUSTERED"). \
>
>             withColumnRenamed("_3", "SCATTERED"). \
>
>             withColumnRenamed("_4", "RANDOMISED"). \
>
>             withColumnRenamed("_5", "RANDOM_STRING"). \
>
>             withColumnRenamed("_6", "SMALL_VC"). \
>
>             withColumnRenamed("_7", "PADDING"). \
>
>             withColumn("op_type", lit(1)). \
>
>             withColumn("op_time", current_timestamp())
>
> Then  you can look at all records that were created and subsequently
> deleted and at what time
>
>
> SELECT ID, op_time FROM my_table> WHERE op_type in (1,3)
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Jan 2022 at 17:54, Sid Kal <flinkbyhe...@gmail.com> wrote:
>
>> Hi Sean,
>>
>> So you mean if I use those file formats it will do the work of CDC
>> automatically or I would have to handle it via code ?
>>
>> Hi Mich,
>>
>> Not sure if I understood you. Let me try to explain my scenario. Suppose
>> there is a Id "1" which is inserted today, so I transformed and ingested
>> it. Now suppose if this user id is deleted from the source itself. Then how
>> can I delete it in my transformed db
>> ?
>>
>>
>>
>> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sro...@gmail.com> wrote:
>>
>>> This is what storage engines like Delta, Hudi, Iceberg are for. No need
>>> to manage it manually or use a DBMS. These formats allow deletes, upserts,
>>> etc of data, using Spark, on cloud storage.
>>>
>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Where ETL data is stored?
>>>>
>>>>
>>>>
>>>> *But now the main problem is when the record at the source is deleted,
>>>> it should be deleted in my final transformed record too.*
>>>>
>>>>
>>>> If your final sync (storage) is data warehouse, it should be soft
>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>>
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <flinkbyhe...@gmail.com> wrote:
>>>>
>>>>> I am using Spark incremental approach for bringing the latest data
>>>>> everyday. Everything works fine.
>>>>>
>>>>> But now the main problem is when the record at the source is deleted,
>>>>> it should be deleted in my final transformed record too.
>>>>>
>>>>> How do I capture such changes and change my table too ?
>>>>>
>>>>> Best regards,
>>>>> Sid
>>>>>
>>>>>

Reply via email to