I hope to do even better. If the stream could provide information about the
spec/specId for the record which we would like to write, then we could
refresh and use a new writer immediately.

Teaser - if the user could provide a converter which converts the input
data to `DynamicData` then we can create/update table/schema etc and write
multiple target tables/schemas/specs with a single FlinkSink:

```
  public interface DynamicDataConverter<T> extends Serializable {
    void open(OpenContext openContext) throws Exception;

    DynamicData convert(T t);
  }

public class DynamicData {
  TableIdentifier tableIdentifier;
  String branch;
  Schema schema;
  PartitionSpec spec;
  RowData rowData;
}
```

But this will take some time :)

Xianjin YE <xian...@apache.org> ezt írta (időpont: 2024. aug. 20., K,
16:54):

> Hi Péter,
> >  I have seen requirements for accommodating partitioning scheme changes
> when the Table has been changed.
>
> This is similar with request I received from users. It’s possible to
> update/refresh the table spec/schema in the next checkpoint without Flink
> Job restart. It requires some extra effort though. It would be great that
> we can support that in the Flink Dynamic Sink.
>
> On Aug 20, 2024, at 14:26, Péter Váry <peter.vary.apa...@gmail.com> wrote:
>
> Hi Fokko, Xianjin,
>
> Thanks for both proposals, I will take a deeper look soon! Both seems
> promising at the first glance.
>
> For the use cases,
> - I have seen requirements for converting incoming Avro records with
> evolving schema and writing them to a table.
> - I have seen requirements for creating new tables when a new group of
> records starts to come in.
> - I have seen requirements for accommodating partitioning scheme changes
> when the Table has been changed.
>
> The other info used for writing is:
> - branch
> - spec
>
> Charging the target branch based on the incoming records seems easy, and I
> was wondering if there is an easy way to alter the table for the target
> spec. This would make a fully dynamic sink. I don't have a concrete use
> case ATM, so if it is not trivial, we could just leave it for later.
> What surprised me is that there is no easy way to convert a Transform to a
> PartitionSpec update.
>
> Thanks, Peter
>
> On Mon, Aug 19, 2024, 15:16 Xianjin YE <xian...@apache.org> wrote:
>
>> Hey Péter,
>>
>> For evolving the schema, Spark has the ability to mergeSchema
>> <https://github.com/apache/iceberg/blob/d4e0b3f2078ee5ed113ba69b800c55c5994e33b8/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java#L172>
>>  based
>> into the new incoming Schema, you may want to take a look at that.
>>
>> For evolving the partition spec, I don’t think there’s an easy way to
>> evolve to the desired spec directly.
>> And BTW, what’s your user case to evolve the partition spec directly in a
>> Flink job? The common request I received was that the partition spec is
>> updated externally and users want the Flink job to pick up the latest spec
>> without a job restart.
>>
>> On Aug 19, 2024, at 19:43, Fokko Driesprong <fo...@apache.org> wrote:
>>
>> Hey Peter,
>>
>> Thanks for raising this since I recently ran into the same issue. The
>> APIs that we have today nicely hide the field IDs from the user, which is
>> great.
>>
>> I do think all the methods are in there to evolve the schema to the
>> desired one, however, we don't have a way to control the field-IDs. For
>> evolving the schema, I recently wrote a
>> <https://github.com/delta-io/delta/blob/18f5b4cde2120079e15ad4afc7ec84f7f1f48108/iceberg/src/main/java/shadedForDelta/org/apache/iceberg/EvolveSchemaVisitor.java>
>> SchemaWithParentVisitor
>> <https://github.com/delta-io/delta/blob/18f5b4cde2120079e15ad4afc7ec84f7f1f48108/iceberg/src/main/java/shadedForDelta/org/apache/iceberg/EvolveSchemaVisitor.java>
>> that will evolve the schema to a target schema that you supply. This
>> might do the trick for the FlinkDynamicSink. If you want to keep the old
>> fields as well (to avoid breaking downstream consumers), then the
>> UnionByName
>> <https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java>
>> visitor might also do the trick.
>>
>> The most important part is; where are you tracking the field IDs? For
>> example, when renaming a field, the Flink job should update the existing
>> field and not perform a drop+add operation.
>>
>> Kind regards,
>> Fokko
>>
>> Op ma 19 aug 2024 om 13:26 schreef Péter Váry <
>> peter.vary.apa...@gmail.com>:
>>
>>> Hi Team,
>>>
>>> I'm playing around with creating a Flink Dynamic Sink which would allow
>>> schema changes without the need for job restart. So when a record with an
>>> unknown schema arrives, then it would update the Iceberg table to the new
>>> schema and continue processing the records.
>>>
>>> Lets's say, I have the `Schema newSchema` and `PartitionSpec newSpec` at
>>> hand, and I have the `Table icebergTable` with a different Schema and
>>> PartitionSpec. I know, that we have the `Table.updateSchema` and
>>> `Table.updateSpec` to modify them, but these methods in the API only allow
>>> for incremental changes (addColumn, updateColumn, or addField,
>>> removeField). Do we have an existing API for effectively updating the
>>> Iceberg Table schema/spec to a new one, if we have the target schema and
>>> spec at hand?
>>>
>>> Thanks,
>>> Peter
>>>
>>
>>
>

Reply via email to