Hey I want to quickly follow up on this thread. I cannot seem to find any pull request to expose V2 format version on table creation, specifically for the line below referenced in your previous email.
TableProperties.FORMAT_VERSION Can you suggest? I want to create a V2 table to test some row level upserts/deletes. Chen On Sun, Dec 27, 2020 at 9:33 PM OpenInx <open...@gmail.com> wrote: > > you can apply this patch in your own repository > > The patch is : https://github.com/apache/iceberg/pull/1978 > > On Mon, Dec 28, 2020 at 10:32 AM OpenInx <open...@gmail.com> wrote: > >> Hi liubo07199 >> >> Thanks for testing the iceberg row-level delete, I skimmed the code, it >> seems you were trying the equality-delete feature. For iceberg users, I >> think we don't have to write those iceberg internal codes to get this work, >> this isn't friendly for users. Instead, we usually use the >> equality-delete ( CDC events ingestion or flink aggregation upsert >> streams) feature based on the compute-engine work. Currently, we've >> supported the flink cdc-events integration (Flink Datastream integration >> has been merged [1] while the Flink SQL integration depends on the time >> when we are ready to expose iceberg format v2 [2]) >> >> About what's the time to expose format v2 to users, you may want to read >> this mail [3]. >> >> If you just want to have a basic test for writing cdc by flink, you can >> apply this patch in your own repository, and then create an iceberg table >> with an extra option like the following: >> >> public static Table createTable(String path, Map<String, String> properties, >> boolean partitioned) { >> PartitionSpec spec; >> if (partitioned) { >> spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); >> } else { >> spec = PartitionSpec.unpartitioned(); >> } >> properties.put(TableProperties.FORMAT_VERSION, "2"); >> return new HadoopTables().create(SCHEMA, spec, properties, path); >> } >> >> Then use the flink data stream api or flink sql to write the cdc events >> into an apache iceberg table. For data stream job to sinking cdc events I >> suggest to use the similar way here [4]. >> >> I'd like to help if you have further feedback. >> >> Thanks. >> >> [1]. https://github.com/apache/iceberg/pull/1974 >> [2]. https://github.com/apache/iceberg/pull/1978 >> [3]. >> https://mail-archives.apache.org/mod_mbox/iceberg-dev/202012.mbox/%3CCACc8XkGt%2B5kxr-XRMgY1eUKjd70mej38KFbhDuV2MH3AVMON2g%40mail.gmail.com%3E >> [4]. >> https://github.com/apache/iceberg/pull/1974/files#diff-13e2e5b52d0effe51e1b470df77cb08b5ec8cc4f3a7f0fd4e51ee212fc83f76aR143 >> >> On Sat, Dec 26, 2020 at 7:14 PM 1 <liubo1022...@126.com> wrote: >> >>> Hi, all: >>> >>> I want to try row level delete, but get the exception : >>> IllegalArgumentException: >>> Cannot write delete files in a v1 table. >>> I look over https://iceberg.apache.org/spec/#table-metadata for >>> format-version, it said that An integer version number for the format. >>> Currently, this is always 1. Implementations must throw an exception if a >>> table’s version is higher than the supported version. so what can i do to >>> test row-level deletion ? >>> So what can I do to have a try to row level delete? how to create a >>> v2 table ? >>> >>> thx >>> >>> Code is : >>> >>> private static void deleteRead() throws IOException { >>> Schema deleteRowSchema = table.schema().select("id"); >>> Record dataDelete = GenericRecord.create(deleteRowSchema); >>> List<Record> dataDeletes = Lists.newArrayList( >>> dataDelete.copy("id", 11), // id = 29 >>> dataDelete.copy("id", 12), // id = 89 >>> dataDelete.copy("id", 13) // id = 122 >>> ); >>> >>> DeleteFile eqDeletes = writeDeleteFile(table, >>> Files.localOutput(tmpFile), dataDeletes, deleteRowSchema); >>> >>> table.newRowDelta() >>> .addDeletes(eqDeletes) >>> .commit(); >>> } >>> >>> private static DeleteFile writeDeleteFile(Table table, OutputFile out, >>> List<Record> deletes, Schema >>> deleteRowSchema) throws IOException { >>> EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out) >>> .forTable(table) >>> .withPartition(Row.of("20201221")) >>> .rowSchema(deleteRowSchema) >>> .createWriterFunc(GenericParquetWriter::buildWriter) >>> .overwrite() >>> >>> .equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray()) >>> .buildEqualityWriter(); >>> >>> try (Closeable toClose = writer) { >>> writer.deleteAll(deletes); >>> } >>> >>> return writer.toDeleteFile(); >>> } >>> >>> liubo07199 >>> liubo07...@hellobike.com >>> >>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=liubo07199&uid=liubo07199%40hellobike.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22liubo07199%40hellobike.com%22%5D> >>> >> -- Chen Song