Hey Péter,

For evolving the schema, Spark has the ability to mergeSchema 
<https://github.com/apache/iceberg/blob/d4e0b3f2078ee5ed113ba69b800c55c5994e33b8/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java#L172>
 based into the new incoming Schema, you may want to take a look at that.

For evolving the partition spec, I don’t think there’s an easy way to evolve to 
the desired spec directly. 
And BTW, what’s your user case to evolve the partition spec directly in a Flink 
job? The common request I received was that the partition spec is updated 
externally and users want the Flink job to pick up the latest spec without a 
job restart.

> On Aug 19, 2024, at 19:43, Fokko Driesprong <fo...@apache.org> wrote:
> 
> Hey Peter,
> 
> Thanks for raising this since I recently ran into the same issue. The APIs 
> that we have today nicely hide the field IDs from the user, which is great.
> 
> I do think all the methods are in there to evolve the schema to the desired 
> one, however, we don't have a way to control the field-IDs. For evolving the 
> schema, I recently wrote a  
> <https://github.com/delta-io/delta/blob/18f5b4cde2120079e15ad4afc7ec84f7f1f48108/iceberg/src/main/java/shadedForDelta/org/apache/iceberg/EvolveSchemaVisitor.java>SchemaWithParentVisitor
>  
> <https://github.com/delta-io/delta/blob/18f5b4cde2120079e15ad4afc7ec84f7f1f48108/iceberg/src/main/java/shadedForDelta/org/apache/iceberg/EvolveSchemaVisitor.java>
>  that will evolve the schema to a target schema that you supply. This might 
> do the trick for the FlinkDynamicSink. If you want to keep the old fields as 
> well (to avoid breaking downstream consumers), then the UnionByName 
> <https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java>
>  visitor might also do the trick.
> 
> The most important part is; where are you tracking the field IDs? For 
> example, when renaming a field, the Flink job should update the existing 
> field and not perform a drop+add operation.
> 
> Kind regards,
> Fokko
> 
> Op ma 19 aug 2024 om 13:26 schreef Péter Váry <peter.vary.apa...@gmail.com 
> <mailto:peter.vary.apa...@gmail.com>>:
>> Hi Team,
>> 
>> I'm playing around with creating a Flink Dynamic Sink which would allow 
>> schema changes without the need for job restart. So when a record with an 
>> unknown schema arrives, then it would update the Iceberg table to the new 
>> schema and continue processing the records.
>> 
>> Lets's say, I have the `Schema newSchema` and `PartitionSpec newSpec` at 
>> hand, and I have the `Table icebergTable` with a different Schema and 
>> PartitionSpec. I know, that we have the `Table.updateSchema` and 
>> `Table.updateSpec` to modify them, but these methods in the API only allow 
>> for incremental changes (addColumn, updateColumn, or addField, removeField). 
>> Do we have an existing API for effectively updating the Iceberg Table 
>> schema/spec to a new one, if we have the target schema and spec at hand?
>> 
>> Thanks,
>> Peter

Reply via email to