Iceberg supports the write-audit-publish workflow that does essentially
what you want at a batch level, either with individual snapshots or
branches, but the former at least is not well documented by Iceberg at
present. I don't have bandwidth to do a more detailed write-up on it at
present but some helpful resources are:
https://github.com/apache/iceberg/blob/master/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java
https://iceberg.apache.org/docs/latest/branching/
https://tabular.io/blog/integrated-audits/
https://www.dremio.com/blog/streamlining-data-quality-in-apache-iceberg-with-write-audit-publish-branching/
https://www.dremio.com/wp-content/uploads/2022/05/Sam-Redai-The-Write-Audit-Publish-Pattern-via-Apache-Iceberg.pdf


On Thu, Jul 20, 2023 at 12:34 PM Nirav Patel <nira...@gmail.com> wrote:

> Hi,
>
> I'm using spark structured streaming to append to iceberg partitioned
> table. I am using custom iceberg catalog (gCP biglake iceberg catalog) to
> upsert data into iceberg tables that are backed by gcp biglake metastore.
>
> There are multiple ways to append streaming data into partition table. One
> that is mentioned in iceberg doc doesn't work as expected . (It could be
> catalog impl issue)
>
> Following overwrites some of the records in parquet file when there are
> multiple records ingested in different batches that belongs to same
> partition.
>
> val tableIdentifier: String = ...
> data.writeStream
>     .format("iceberg")
>     .outputMode("append")
>     .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
>     .option("path", tableIdentifier)
>     .option("fanout-enabled", "true")
>     .option("checkpointLocation", checkpointPath)
>     .start()
>
> Does above option ensure exactly once in case reprocessing happens? ALso
> it will not work for idempotent updates, right?
>
> My workaround for data issue caused by above is to use custom foreachBatch
> function that does batch upserts using merge into query:
>
> e.g MERGE INTO logs
> USING newDedupedLogs
> ON logs.uniqueId = newDedupedLogs.uniqueId
> WHEN NOT MATCHED
>   THEN INSERT *
>
> so even foreachBatch is at-least once gaurantee `merge into` will never
> insert duplicate records. However cost of write could be higher now? Is
> there any other option with spark streaming + iceberg to do dedup and
> idempotent writes? (in events of reprocessing or just duplicate records0
>
> I see Delta table have some options "txnVersion" and "txnAppId" which
> allow it to drop duplicates before writing like following.
>
> def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
>   batch_df.write.format(...).option("txnVersion",
> batch_id).option("txnAppId", app_id).save(...) # location 1
>
> Is there something similar exist for Iceberg? If not do you see issue with
> `foreach` and `merg into.. when not matched..` approach at production scale.
>
> I have posted a question on SO regarding this as well:
>
> https://stackoverflow.com/questions/76726225/spark-structured-streaming-apache-iceberg-how-appends-can-be-idempotent
>
> Thanks!
> Nirav
>
>

Reply via email to