We are probably off the topic of the original thread. I am moving the Flink part of the discussion to a new thread/subject.
> but the prepared and not yet committed data files are also present in their final place. These data files are also not part of the table yet, and could be removed by the orphan files removal process. this problem would probably exist for any long-running/streaming jobs (like Spark streaming) > Moving them to another place would add an additional move(hdfs - cheap)/copy(S3 - costly) step for every commit, which I'd like to avoid. Agree with Peter on this front. > this generally applies to any concurrent inserts to the orphan file removal. There we need to "guess", how long will it take to commit the new data files. For batch jobs, this guess is easier because data files not committed within the max allowed run time of batch jobs (like 24 hours) are safe to be garbage collected. For long-running streaming jobs, this can be a little more tricky. Maybe the Iceberg sink jobs should exit terminally if it hasn't been able to commit to Iceberg after a threshold (like 24 hours) e.g. due to catalog service outage. This will prevent Flink jobs from producing more data files that can't be committed. On Wed, Feb 28, 2024 at 10:43 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: > I have been thinking about this quite a bit. > > Moving the temporary manifest files could work, but the prepared and not > yet committed data files are also present in their final place. These data > files are also not part of the table yet, and could be removed by the > orphan files removal process. Moving them to another place would add an > additional move(hdfs - cheap)/copy(S3 - costly) step for every commit, > which I'd like to avoid. > > BTW, this generally applies to any concurrent inserts to the orphan file > removal. There we need to "guess", how long will it take to commit the new > data files. It is easier for a process which are running, but Flink could > recover from old state even after several days, so we face the issue more > often. > > Thanks, Peter > > On Wed, Feb 28, 2024, 17:40 Ryan Blue <b...@tabular.io> wrote: > >> > No removed temporary files on Flink failure. (Spark orphan file removal >> needs to be configured to prevent removal of Flink temporary files which >> are needed on recovery) >> >> This sounds like it's a larger problem. Shouldn't Flink store its state >> in a different prefix that won't be cleaned up by orphan files? >> >> On Wed, Feb 28, 2024 at 3:04 AM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> Sorry to chime in a bit late to the conversation. >>> >>> I am currently working in implementing Flink in-job maintenance. >>> >>> The main target audience: >>> - Users who can't or don't want to use Spark >>> - Users who need frequent checkpointing (low latency in the Iceberg >>> table) and have many small files >>> - CDC users with frequent updates >>> >>> The planned architecture: >>> - Migrate FlinkSink to SinkV2, so the new Flink PostCommitTopology could >>> be used - Flink changes will be released in the upcoming Flink 1.19.0. >>> Preliminary PR for the migration is here: >>> https://github.com/apache/iceberg/pull/8653 >>> - Create a scheduler for starting maintenance tasks. This scheduler >>> could be placed in the PostCommitTopology and the tasks could be started >>> based on the recent commits in the table (useful if there is no other >>> writers), or a separate task could monitor the new snapshot for a table, >>> and feed the scheduler with the info (useful if more than one job writes to >>> the table, or resource usage for compaction and the job should be separated) >>> - Specific maintenance tasks (Data file rewrite, Manifest rewrite, >>> Snapshot expiration, Orphan file removal, etc). In the beginning we do not >>> aim for the same feature-rich maintenance tasks that we have in Spark, but >>> we would like to cover the basics, and reuse as much as possible. One >>> feature I think is further down the road: is the global ordering, and >>> splitting of the data files which is done in SparkShufflingDataRewriter - >>> which is also mentioned by Ryan above. We could do this using the same >>> shuffling driven by Steven for the writes, but this needs time. >>> >>> Pros: >>> - No Spark is needed if you only need Flink >>> - No removed temporary files on Flink failure. (Spark orphan file >>> removal needs to be configured to prevent removal of Flink temporary files >>> which are needed on recovery) >>> - The work which is deferred on write is finally done >>> - No extra table is needed >>> >>> Cons: >>> - Still missing the global reordering of the records >>> >>> I hope this helps. >>> Peter >>> >> >>