Hi Chen Song If want to test the format v2 under your env, you could follow this comment https://github.com/apache/iceberg/pull/2410#issuecomment-812463051 to upgrade your iceberg table to format v2.
The TableProperties.FORMAT_VERSION was introduced in a separate PoC PR , so we could not find this static variable in the current apache iceberg master branch. On Wed, Apr 7, 2021 at 3:28 AM Chen Song <chen.song...@gmail.com> wrote: > Hey I want to quickly follow up on this thread. > > I cannot seem to find any pull request to expose V2 format version on > table creation, specifically for the line below referenced in your > previous email. > > TableProperties.FORMAT_VERSION > > Can you suggest? I want to create a V2 table to test some row level > upserts/deletes. > > Chen > > On Sun, Dec 27, 2020 at 9:33 PM OpenInx <open...@gmail.com> wrote: > >> > you can apply this patch in your own repository >> >> The patch is : https://github.com/apache/iceberg/pull/1978 >> >> On Mon, Dec 28, 2020 at 10:32 AM OpenInx <open...@gmail.com> wrote: >> >>> Hi liubo07199 >>> >>> Thanks for testing the iceberg row-level delete, I skimmed the code, it >>> seems you were trying the equality-delete feature. For iceberg users, I >>> think we don't have to write those iceberg internal codes to get this work, >>> this isn't friendly for users. Instead, we usually use the >>> equality-delete ( CDC events ingestion or flink aggregation upsert >>> streams) feature based on the compute-engine work. Currently, we've >>> supported the flink cdc-events integration (Flink Datastream integration >>> has been merged [1] while the Flink SQL integration depends on the time >>> when we are ready to expose iceberg format v2 [2]) >>> >>> About what's the time to expose format v2 to users, you may want to read >>> this mail [3]. >>> >>> If you just want to have a basic test for writing cdc by flink, you can >>> apply this patch in your own repository, and then create an iceberg table >>> with an extra option like the following: >>> >>> public static Table createTable(String path, Map<String, String> >>> properties, boolean partitioned) { >>> PartitionSpec spec; >>> if (partitioned) { >>> spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); >>> } else { >>> spec = PartitionSpec.unpartitioned(); >>> } >>> properties.put(TableProperties.FORMAT_VERSION, "2"); >>> return new HadoopTables().create(SCHEMA, spec, properties, path); >>> } >>> >>> Then use the flink data stream api or flink sql to write the cdc events >>> into an apache iceberg table. For data stream job to sinking cdc events I >>> suggest to use the similar way here [4]. >>> >>> I'd like to help if you have further feedback. >>> >>> Thanks. >>> >>> [1]. https://github.com/apache/iceberg/pull/1974 >>> [2]. https://github.com/apache/iceberg/pull/1978 >>> [3]. >>> https://mail-archives.apache.org/mod_mbox/iceberg-dev/202012.mbox/%3CCACc8XkGt%2B5kxr-XRMgY1eUKjd70mej38KFbhDuV2MH3AVMON2g%40mail.gmail.com%3E >>> [4]. >>> https://github.com/apache/iceberg/pull/1974/files#diff-13e2e5b52d0effe51e1b470df77cb08b5ec8cc4f3a7f0fd4e51ee212fc83f76aR143 >>> >>> On Sat, Dec 26, 2020 at 7:14 PM 1 <liubo1022...@126.com> wrote: >>> >>>> Hi, all: >>>> >>>> I want to try row level delete, but get the exception : >>>> IllegalArgumentException: >>>> Cannot write delete files in a v1 table. >>>> I look over https://iceberg.apache.org/spec/#table-metadata for >>>> format-version, it said that An integer version number for the format. >>>> Currently, this is always 1. Implementations must throw an exception if a >>>> table’s version is higher than the supported version. so what can i do to >>>> test row-level deletion ? >>>> So what can I do to have a try to row level delete? how to create a >>>> v2 table ? >>>> >>>> thx >>>> >>>> Code is : >>>> >>>> private static void deleteRead() throws IOException { >>>> Schema deleteRowSchema = table.schema().select("id"); >>>> Record dataDelete = GenericRecord.create(deleteRowSchema); >>>> List<Record> dataDeletes = Lists.newArrayList( >>>> dataDelete.copy("id", 11), // id = 29 >>>> dataDelete.copy("id", 12), // id = 89 >>>> dataDelete.copy("id", 13) // id = 122 >>>> ); >>>> >>>> DeleteFile eqDeletes = writeDeleteFile(table, >>>> Files.localOutput(tmpFile), dataDeletes, deleteRowSchema); >>>> >>>> table.newRowDelta() >>>> .addDeletes(eqDeletes) >>>> .commit(); >>>> } >>>> >>>> private static DeleteFile writeDeleteFile(Table table, OutputFile out, >>>> List<Record> deletes, Schema >>>> deleteRowSchema) throws IOException { >>>> EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out) >>>> .forTable(table) >>>> .withPartition(Row.of("20201221")) >>>> .rowSchema(deleteRowSchema) >>>> .createWriterFunc(GenericParquetWriter::buildWriter) >>>> .overwrite() >>>> >>>> .equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray()) >>>> .buildEqualityWriter(); >>>> >>>> try (Closeable toClose = writer) { >>>> writer.deleteAll(deletes); >>>> } >>>> >>>> return writer.toDeleteFile(); >>>> } >>>> >>>> liubo07199 >>>> liubo07...@hellobike.com >>>> >>>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=liubo07199&uid=liubo07199%40hellobike.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22liubo07199%40hellobike.com%22%5D> >>>> >>> > > -- > Chen Song > >