Hi liubo07199 Thanks for testing the iceberg row-level delete, I skimmed the code, it seems you were trying the equality-delete feature. For iceberg users, I think we don't have to write those iceberg internal codes to get this work, this isn't friendly for users. Instead, we usually use the equality-delete ( CDC events ingestion or flink aggregation upsert streams) feature based on the compute-engine work. Currently, we've supported the flink cdc-events integration (Flink Datastream integration has been merged [1] while the Flink SQL integration depends on the time when we are ready to expose iceberg format v2 [2])
About what's the time to expose format v2 to users, you may want to read this mail [3]. If you just want to have a basic test for writing cdc by flink, you can apply this patch in your own repository, and then create an iceberg table with an extra option like the following: public static Table createTable(String path, Map<String, String> properties, boolean partitioned) { PartitionSpec spec; if (partitioned) { spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); } else { spec = PartitionSpec.unpartitioned(); } properties.put(TableProperties.FORMAT_VERSION, "2"); return new HadoopTables().create(SCHEMA, spec, properties, path); } Then use the flink data stream api or flink sql to write the cdc events into an apache iceberg table. For data stream job to sinking cdc events I suggest to use the similar way here [4]. I'd like to help if you have further feedback. Thanks. [1]. https://github.com/apache/iceberg/pull/1974 [2]. https://github.com/apache/iceberg/pull/1978 [3]. https://mail-archives.apache.org/mod_mbox/iceberg-dev/202012.mbox/%3CCACc8XkGt%2B5kxr-XRMgY1eUKjd70mej38KFbhDuV2MH3AVMON2g%40mail.gmail.com%3E [4]. https://github.com/apache/iceberg/pull/1974/files#diff-13e2e5b52d0effe51e1b470df77cb08b5ec8cc4f3a7f0fd4e51ee212fc83f76aR143 On Sat, Dec 26, 2020 at 7:14 PM 1 <liubo1022...@126.com> wrote: > Hi, all: > > I want to try row level delete, but get the exception : > IllegalArgumentException: > Cannot write delete files in a v1 table. > I look over https://iceberg.apache.org/spec/#table-metadata for > format-version, it said that An integer version number for the format. > Currently, this is always 1. Implementations must throw an exception if a > table’s version is higher than the supported version. so what can i do to > test row-level deletion ? > So what can I do to have a try to row level delete? how to create a v2 > table ? > > thx > > Code is : > > private static void deleteRead() throws IOException { > Schema deleteRowSchema = table.schema().select("id"); > Record dataDelete = GenericRecord.create(deleteRowSchema); > List<Record> dataDeletes = Lists.newArrayList( > dataDelete.copy("id", 11), // id = 29 > dataDelete.copy("id", 12), // id = 89 > dataDelete.copy("id", 13) // id = 122 > ); > > DeleteFile eqDeletes = writeDeleteFile(table, Files.localOutput(tmpFile), > dataDeletes, deleteRowSchema); > > table.newRowDelta() > .addDeletes(eqDeletes) > .commit(); > } > > private static DeleteFile writeDeleteFile(Table table, OutputFile out, > List<Record> deletes, Schema > deleteRowSchema) throws IOException { > EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out) > .forTable(table) > .withPartition(Row.of("20201221")) > .rowSchema(deleteRowSchema) > .createWriterFunc(GenericParquetWriter::buildWriter) > .overwrite() > > .equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray()) > .buildEqualityWriter(); > > try (Closeable toClose = writer) { > writer.deleteAll(deletes); > } > > return writer.toDeleteFile(); > } > > liubo07199 > liubo07...@hellobike.com > > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=liubo07199&uid=liubo07199%40hellobike.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22liubo07199%40hellobike.com%22%5D> >