Yes, we will need to expose ManifestWriter, but only the methods that work with DataFile because we only need to support append.
Unfortunately, these manifests will need to be rewritten because they don't have the correct snapshot ID in the file metadata because that is set in the final commit. Iceberg doesn't support Spark bucketing because they use different bucket functions. We will need to extend Spark to allow using a custom partition function to get bucketed joins working. On Mon, Jun 3, 2019 at 12:18 PM Anton Okolnychyi <aokolnyc...@apple.com> wrote: > If we are to support appending manifest files, do we expect to expose > ManifestWriter? > > Also, one more question about migrating bucketed Spark tables. Am I > correct it won’t work because of [1]? The bucketing field won’t be present > in the partition values map, as bucket ids are encoded in file names, right? > > [1] - > https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala#L107 > > On 20 May 2019, at 11:17, Anton Okolnychyi <aokolnyc...@apple.com> wrote: > > A few comments from me inline: > > I think it is reasonable to make this a Spark job. The number of files in > tables we convert typically requires it. This would only be too much for > the driver if all of the files are collected at one time. We commit 500,000 > files per batch, which seems to work well. That trades atomicity, though. > > > Makes sense to me, just wanted to double-check that the “proper” API > should still be based on a Spark job. > > - What should be the scope? Do we want the migration to modify the > catalog? > > What do you mean modify the catalog? > > We've built two SQL commands using these helper classes and methods: > SNAPSHOT TABLE that creates a new table copy, and MIGRATE TABLE that > basically builds a snapshot, but also renames the new table into place. > > > By modifying catalog I mean creating/dropping tables in it. My question is > whether this should be part of the migration tool/API or not. Right now, it > isn’t. Instead, the catalog is modified by SNAPSHOT TABLE and MIGRATE TABLE > commands that are using SparkTableUtil. I think is reasonable to keep it > this way, meaning that the scope of the migration tool is to find files and > append them to an existing Iceberg table. > > - If we stick to having Dataset[DataFiles], we will have one append per > dataset partition. How do we want to guarantee that either all files are > appended or none? One way is to rely on temporary tables but this requires > looking into the catalog. Another approach is to say that if the > migration isn’t successful, users will need to remove the metadata and > retry. > > I don't recommend committing from tasks. If Spark runs speculative > attempts, you can get duplicate data. > > > That’s exactly my point. In addition, if one task fails, then we end up > with partial results. However, it seems to be the only possible way right > now. We can mitigate the effects by having temporary tables but this > doesn’t solve the problem completely. > > Another option is to add a way to append manifest files, not just data > files. That way, each task produces a manifest and we can create a single > commit to add all of the manifests. This requires some rewriting, but I > think it would be a good way to distribute the majority of the work and > still have an atomic commit. > > > This sounds promising but requires quite some changes to the public API. I > am wondering if we can group all files per Dataset partition and > leverage ReplacePartitions/OverwriteFiles APIs in Iceberg. That way, the > migration won’t be atomic but idempotent. > > Thanks, > Anton > > On 15 May 2019, at 19:12, Ryan Blue <rb...@netflix.com> wrote: > > Replies inline: > > On Tue, May 14, 2019 at 3:21 AM Anton Okolnychyi <aokolnyc...@apple.com> > wrote: > I would like to resume this topic. How do we see the proper API for > migration? > > I have a couple of questions in mind: > - Now, it is based on a Spark job. Do we want to keep it that way because > the number of files might be huge? Will it be too much for the driver? > > I think it is reasonable to make this a Spark job. The number of files in > tables we convert typically requires it. This would only be too much for > the driver if all of the files are collected at one time. We commit 500,000 > files per batch, which seems to work well. That trades atomicity, though. > > - What should be the scope? Do we want the migration to modify the > catalog? > > What do you mean modify the catalog? > > We've built two SQL commands using these helper classes and methods: > SNAPSHOT TABLE that creates a new table copy, and MIGRATE TABLE that > basically builds a snapshot, but also renames the new table into place. > > - If we stick to having Dataset[DataFiles], we will have one append per > dataset partition. How do we want to guarantee that either all files are > appended or none? One way is to rely on temporary tables but this requires > looking into the catalog. Another approach is to say that if the > migration isn’t successful, users will need to remove the metadata and > retry. > > I don't recommend committing from tasks. If Spark runs speculative > attempts, you can get duplicate data. > > Another option is to add a way to append manifest files, not just data > files. That way, each task produces a manifest and we can create a single > commit to add all of the manifests. This requires some rewriting, but I > think it would be a good way to distribute the majority of the work and > still have an atomic commit. > > -- > Ryan Blue > Software Engineer > Netflix > > > > -- Ryan Blue Software Engineer Netflix