I hope to do even better. If the stream could provide information about the spec/specId for the record which we would like to write, then we could refresh and use a new writer immediately.
Teaser - if the user could provide a converter which converts the input data to `DynamicData` then we can create/update table/schema etc and write multiple target tables/schemas/specs with a single FlinkSink: ``` public interface DynamicDataConverter<T> extends Serializable { void open(OpenContext openContext) throws Exception; DynamicData convert(T t); } public class DynamicData { TableIdentifier tableIdentifier; String branch; Schema schema; PartitionSpec spec; RowData rowData; } ``` But this will take some time :) Xianjin YE <xian...@apache.org> ezt írta (időpont: 2024. aug. 20., K, 16:54): > Hi Péter, > > I have seen requirements for accommodating partitioning scheme changes > when the Table has been changed. > > This is similar with request I received from users. It’s possible to > update/refresh the table spec/schema in the next checkpoint without Flink > Job restart. It requires some extra effort though. It would be great that > we can support that in the Flink Dynamic Sink. > > On Aug 20, 2024, at 14:26, Péter Váry <peter.vary.apa...@gmail.com> wrote: > > Hi Fokko, Xianjin, > > Thanks for both proposals, I will take a deeper look soon! Both seems > promising at the first glance. > > For the use cases, > - I have seen requirements for converting incoming Avro records with > evolving schema and writing them to a table. > - I have seen requirements for creating new tables when a new group of > records starts to come in. > - I have seen requirements for accommodating partitioning scheme changes > when the Table has been changed. > > The other info used for writing is: > - branch > - spec > > Charging the target branch based on the incoming records seems easy, and I > was wondering if there is an easy way to alter the table for the target > spec. This would make a fully dynamic sink. I don't have a concrete use > case ATM, so if it is not trivial, we could just leave it for later. > What surprised me is that there is no easy way to convert a Transform to a > PartitionSpec update. > > Thanks, Peter > > On Mon, Aug 19, 2024, 15:16 Xianjin YE <xian...@apache.org> wrote: > >> Hey Péter, >> >> For evolving the schema, Spark has the ability to mergeSchema >> <https://github.com/apache/iceberg/blob/d4e0b3f2078ee5ed113ba69b800c55c5994e33b8/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java#L172> >> based >> into the new incoming Schema, you may want to take a look at that. >> >> For evolving the partition spec, I don’t think there’s an easy way to >> evolve to the desired spec directly. >> And BTW, what’s your user case to evolve the partition spec directly in a >> Flink job? The common request I received was that the partition spec is >> updated externally and users want the Flink job to pick up the latest spec >> without a job restart. >> >> On Aug 19, 2024, at 19:43, Fokko Driesprong <fo...@apache.org> wrote: >> >> Hey Peter, >> >> Thanks for raising this since I recently ran into the same issue. The >> APIs that we have today nicely hide the field IDs from the user, which is >> great. >> >> I do think all the methods are in there to evolve the schema to the >> desired one, however, we don't have a way to control the field-IDs. For >> evolving the schema, I recently wrote a >> <https://github.com/delta-io/delta/blob/18f5b4cde2120079e15ad4afc7ec84f7f1f48108/iceberg/src/main/java/shadedForDelta/org/apache/iceberg/EvolveSchemaVisitor.java> >> SchemaWithParentVisitor >> <https://github.com/delta-io/delta/blob/18f5b4cde2120079e15ad4afc7ec84f7f1f48108/iceberg/src/main/java/shadedForDelta/org/apache/iceberg/EvolveSchemaVisitor.java> >> that will evolve the schema to a target schema that you supply. This >> might do the trick for the FlinkDynamicSink. If you want to keep the old >> fields as well (to avoid breaking downstream consumers), then the >> UnionByName >> <https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java> >> visitor might also do the trick. >> >> The most important part is; where are you tracking the field IDs? For >> example, when renaming a field, the Flink job should update the existing >> field and not perform a drop+add operation. >> >> Kind regards, >> Fokko >> >> Op ma 19 aug 2024 om 13:26 schreef Péter Váry < >> peter.vary.apa...@gmail.com>: >> >>> Hi Team, >>> >>> I'm playing around with creating a Flink Dynamic Sink which would allow >>> schema changes without the need for job restart. So when a record with an >>> unknown schema arrives, then it would update the Iceberg table to the new >>> schema and continue processing the records. >>> >>> Lets's say, I have the `Schema newSchema` and `PartitionSpec newSpec` at >>> hand, and I have the `Table icebergTable` with a different Schema and >>> PartitionSpec. I know, that we have the `Table.updateSchema` and >>> `Table.updateSpec` to modify them, but these methods in the API only allow >>> for incremental changes (addColumn, updateColumn, or addField, >>> removeField). Do we have an existing API for effectively updating the >>> Iceberg Table schema/spec to a new one, if we have the target schema and >>> spec at hand? >>> >>> Thanks, >>> Peter >>> >> >> >