Hi,

I'm using spark structured streaming to append to iceberg partitioned
table. I am using custom iceberg catalog (gCP biglake iceberg catalog) to
upsert data into iceberg tables that are backed by gcp biglake metastore.

There are multiple ways to append streaming data into partition table. One
that is mentioned in iceberg doc doesn't work as expected . (It could be
catalog impl issue)

Following overwrites some of the records in parquet file when there are
multiple records ingested in different batches that belongs to same
partition.

val tableIdentifier: String = ...
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("fanout-enabled", "true")
    .option("checkpointLocation", checkpointPath)
    .start()

Does above option ensure exactly once in case reprocessing happens? ALso it
will not work for idempotent updates, right?

My workaround for data issue caused by above is to use custom foreachBatch
function that does batch upserts using merge into query:

e.g MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

so even foreachBatch is at-least once gaurantee `merge into` will never
insert duplicate records. However cost of write could be higher now? Is
there any other option with spark streaming + iceberg to do dedup and
idempotent writes? (in events of reprocessing or just duplicate records0

I see Delta table have some options "txnVersion" and "txnAppId" which allow
it to drop duplicates before writing like following.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion",
batch_id).option("txnAppId", app_id).save(...) # location 1

Is there something similar exist for Iceberg? If not do you see issue with
`foreach` and `merg into.. when not matched..` approach at production scale.

I have posted a question on SO regarding this as well:
https://stackoverflow.com/questions/76726225/spark-structured-streaming-apache-iceberg-how-appends-can-be-idempotent

Thanks!
Nirav

Reply via email to