您好,

Iceberg 为 Flink 实现的 connector 同时支持 DataStream API 和 Table API[1]。其 DataStream 
API 提供 Append(默认行为)、Overwrite、Upsert 三种可选的模式,您可以使用下面的 Java 代码片段实现:

首先创建对应数据行 Schema 格式的反序列化器,例如,可以使用 RowDataDebeziumDeserializeSchema 的生成器来快速构造一个:


private RowDataDebeziumDeserializeSchema getDeserializer(
        DataType dataType) {
    LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
    InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
    return RowDataDebeziumDeserializeSchema.newBuilder()
            .setPhysicalRowType((RowType) dataType.getLogicalType())
            .setResultTypeInfo(typeInfo)
            .build();
}

然后,您可以使用该反序列化器创建 MySQL 数据源:

MySqlSource<RowData> mySqlSource =
        MySqlSource.<RowData>builder()
                // 其他参数配置略
                .deserializer(getDeserializer({{ ROW_DATA_TYPE_HERE }}))
                .build();

并创建一个 Iceberg 数据源:

Configuration hadoopConf = new Configuration();
TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);

FlinkSink.forRowData(mysqlSource)
    .tableLoader(tableLoader)
// 此处可以追加 .overwrite(true) 或 .upsert(true)
// 来配置 Overwrite 或 Upsert 行为
    .append();

P.S. 在接下来的 Flink CDC 版本中,预计会为 3.0 版本新增的 Pipeline 作业[2]提供写入 Iceberg 
的能力,使用起来更方便快捷。如果能够满足您的需求,也请多多尝试。

祝好!

Regards,
yux

[1] https://iceberg.apache.org/docs/1.5.2/flink-writes/#writing-with-datastream
[2] 
https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/get-started/introduction/



发件人: zapjone <zapj...@163.com>
日期: 星期二, 2024年6月4日 18:34
收件人: user-zh@flink.apache.org <user-zh@flink.apache.org>
主题: Flink Datastream实现删除操作
各位大佬好:
    想请教下,在使用mysql-cdc到iceberg,通过sql方式可以实现自动更新和删除功能。但在使用datastream 
api进行处理后,注册成临时表,怎么实现类似于sql方式的自动更新和删除呢?

回复