Hi Fokko, Xianjin, Thanks for both proposals, I will take a deeper look soon! Both seems promising at the first glance.
For the use cases, - I have seen requirements for converting incoming Avro records with evolving schema and writing them to a table. - I have seen requirements for creating new tables when a new group of records starts to come in. - I have seen requirements for accommodating partitioning scheme changes when the Table has been changed. The other info used for writing is: - branch - spec Charging the target branch based on the incoming records seems easy, and I was wondering if there is an easy way to alter the table for the target spec. This would make a fully dynamic sink. I don't have a concrete use case ATM, so if it is not trivial, we could just leave it for later. What surprised me is that there is no easy way to convert a Transform to a PartitionSpec update. Thanks, Peter On Mon, Aug 19, 2024, 15:16 Xianjin YE <xian...@apache.org> wrote: > Hey Péter, > > For evolving the schema, Spark has the ability to mergeSchema > <https://github.com/apache/iceberg/blob/d4e0b3f2078ee5ed113ba69b800c55c5994e33b8/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java#L172> > based > into the new incoming Schema, you may want to take a look at that. > > For evolving the partition spec, I don’t think there’s an easy way to > evolve to the desired spec directly. > And BTW, what’s your user case to evolve the partition spec directly in a > Flink job? The common request I received was that the partition spec is > updated externally and users want the Flink job to pick up the latest spec > without a job restart. > > On Aug 19, 2024, at 19:43, Fokko Driesprong <fo...@apache.org> wrote: > > Hey Peter, > > Thanks for raising this since I recently ran into the same issue. The APIs > that we have today nicely hide the field IDs from the user, which is great. > > I do think all the methods are in there to evolve the schema to the > desired one, however, we don't have a way to control the field-IDs. For > evolving the schema, I recently wrote a > <https://github.com/delta-io/delta/blob/18f5b4cde2120079e15ad4afc7ec84f7f1f48108/iceberg/src/main/java/shadedForDelta/org/apache/iceberg/EvolveSchemaVisitor.java> > SchemaWithParentVisitor > <https://github.com/delta-io/delta/blob/18f5b4cde2120079e15ad4afc7ec84f7f1f48108/iceberg/src/main/java/shadedForDelta/org/apache/iceberg/EvolveSchemaVisitor.java> > that will evolve the schema to a target schema that you supply. This > might do the trick for the FlinkDynamicSink. If you want to keep the old > fields as well (to avoid breaking downstream consumers), then the > UnionByName > <https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java> > visitor might also do the trick. > > The most important part is; where are you tracking the field IDs? For > example, when renaming a field, the Flink job should update the existing > field and not perform a drop+add operation. > > Kind regards, > Fokko > > Op ma 19 aug 2024 om 13:26 schreef Péter Váry <peter.vary.apa...@gmail.com > >: > >> Hi Team, >> >> I'm playing around with creating a Flink Dynamic Sink which would allow >> schema changes without the need for job restart. So when a record with an >> unknown schema arrives, then it would update the Iceberg table to the new >> schema and continue processing the records. >> >> Lets's say, I have the `Schema newSchema` and `PartitionSpec newSpec` at >> hand, and I have the `Table icebergTable` with a different Schema and >> PartitionSpec. I know, that we have the `Table.updateSchema` and >> `Table.updateSpec` to modify them, but these methods in the API only allow >> for incremental changes (addColumn, updateColumn, or addField, >> removeField). Do we have an existing API for effectively updating the >> Iceberg Table schema/spec to a new one, if we have the target schema and >> spec at hand? >> >> Thanks, >> Peter >> > >