Iceberg supports the write-audit-publish workflow that does essentially what you want at a batch level, either with individual snapshots or branches, but the former at least is not well documented by Iceberg at present. I don't have bandwidth to do a more detailed write-up on it at present but some helpful resources are: https://github.com/apache/iceberg/blob/master/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java https://iceberg.apache.org/docs/latest/branching/ https://tabular.io/blog/integrated-audits/ https://www.dremio.com/blog/streamlining-data-quality-in-apache-iceberg-with-write-audit-publish-branching/ https://www.dremio.com/wp-content/uploads/2022/05/Sam-Redai-The-Write-Audit-Publish-Pattern-via-Apache-Iceberg.pdf
On Thu, Jul 20, 2023 at 12:34 PM Nirav Patel <nira...@gmail.com> wrote: > Hi, > > I'm using spark structured streaming to append to iceberg partitioned > table. I am using custom iceberg catalog (gCP biglake iceberg catalog) to > upsert data into iceberg tables that are backed by gcp biglake metastore. > > There are multiple ways to append streaming data into partition table. One > that is mentioned in iceberg doc doesn't work as expected . (It could be > catalog impl issue) > > Following overwrites some of the records in parquet file when there are > multiple records ingested in different batches that belongs to same > partition. > > val tableIdentifier: String = ... > data.writeStream > .format("iceberg") > .outputMode("append") > .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) > .option("path", tableIdentifier) > .option("fanout-enabled", "true") > .option("checkpointLocation", checkpointPath) > .start() > > Does above option ensure exactly once in case reprocessing happens? ALso > it will not work for idempotent updates, right? > > My workaround for data issue caused by above is to use custom foreachBatch > function that does batch upserts using merge into query: > > e.g MERGE INTO logs > USING newDedupedLogs > ON logs.uniqueId = newDedupedLogs.uniqueId > WHEN NOT MATCHED > THEN INSERT * > > so even foreachBatch is at-least once gaurantee `merge into` will never > insert duplicate records. However cost of write could be higher now? Is > there any other option with spark streaming + iceberg to do dedup and > idempotent writes? (in events of reprocessing or just duplicate records0 > > I see Delta table have some options "txnVersion" and "txnAppId" which > allow it to drop duplicates before writing like following. > > def writeToDeltaLakeTableIdempotent(batch_df, batch_id): > batch_df.write.format(...).option("txnVersion", > batch_id).option("txnAppId", app_id).save(...) # location 1 > > Is there something similar exist for Iceberg? If not do you see issue with > `foreach` and `merg into.. when not matched..` approach at production scale. > > I have posted a question on SO regarding this as well: > > https://stackoverflow.com/questions/76726225/spark-structured-streaming-apache-iceberg-how-appends-can-be-idempotent > > Thanks! > Nirav > >