leonardBang commented on code in PR #3904: URL: https://github.com/apache/flink-cdc/pull/3904#discussion_r2040874530
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml: ########## @@ -0,0 +1,299 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>flink-cdc-pipeline-connectors</artifactId> + <groupId>org.apache.flink</groupId> + <version>${revision}</version> + </parent> + + <artifactId>flink-cdc-pipeline-connector-iceberg</artifactId> + <packaging>jar</packaging> + + <name>flink-cdc-pipeline-connector-iceberg</name> + + <properties> + <iceberg.version>1.6.1</iceberg.version> + <hadoop.version>2.8.5</hadoop.version> + <hive.version>2.3.9</hive.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.iceberg</groupId> + <artifactId>iceberg-flink-runtime-1.19</artifactId> + <version>${iceberg.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cdc-composer</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>junit</artifactId> + <groupId>junit</groupId> + </exclusion> + <exclusion> + <artifactId>junit-vintage-engine</artifactId> + <groupId>org.junit.vintage</groupId> + </exclusion> + </exclusions> + </dependency> + + + <!-- hadoop dependency --> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>jdk.tools</groupId> + <artifactId>jdk.tools</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + <exclusion> + <artifactId>commons-compress</artifactId> + <groupId>org.apache.commons</groupId> + </exclusion> + <exclusion> + <artifactId>commons-io</artifactId> + <groupId>commons-io</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>jdk.tools</groupId> + <artifactId>jdk.tools</artifactId> Review Comment: left comment why we need to exclude this artifact? ########## docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md: ########## @@ -0,0 +1,225 @@ +--- +title: "Iceberg" +weight: 9 +type: docs +aliases: +- /connectors/pipeline-connectors/iceberg +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Iceberg Pipeline Connector + +The Iceberg Pipeline connector can be used as the *Data Sink* of the pipeline, and write data to [Iceberg](https://iceberg.apache.org). This document describes how to set up the Iceberg Pipeline connector. + +## What can the connector do? +* Create table automatically if not exist +* Schema change synchronization +* Data synchronization Review Comment: # Iceberg Pipeline Connector The Iceberg Pipeline Connector functions as a Data Sink for data pipelines, enabling data writes to [Apache Iceberg ](https://iceberg.apache.org/)tables. This document explains how to configure the connector. # Key Capabilities * Automatic Table Creation Creates Iceberg tables dynamically when they do not exist * Schema Synchronization Propagates schema changes (e.g., column additions) from source systems to Iceberg * Data Replication Supports both batch and streaming data synchronization ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.utils; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSink; +import org.apache.flink.table.data.TimestampData; + +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount; +import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision; + +/** Util class for types in {@link IcebergDataSink}. */ +public class IcebergTypeUtils { + + /** Convert column from Flink CDC framework to Iceberg framework. */ Review Comment: ```suggestion /** Convert column from Flink CDC to Iceberg format. */ ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java: ########## @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; +import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; +import org.apache.flink.cdc.connectors.iceberg.sink.utils.IcebergTypeUtils; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; + +/** A {@link MetadataApplier} for Apache Iceberg. */ +public class IcebergMetadataApplier implements MetadataApplier { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergMetadataApplier.class); + + private transient Catalog catalog; + + private final Map<String, String> catalogOptions; + + // currently, we set table options for all tables using the same options. + private final Map<String, String> tableOptions; + + private final Map<TableId, List<String>> partitionMaps; + + private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes; + + public IcebergMetadataApplier(Map<String, String> catalogOptions) { + this(catalogOptions, new HashMap<>(), new HashMap<>()); + } + + public IcebergMetadataApplier( + Map<String, String> catalogOptions, + Map<String, String> tableOptions, + Map<TableId, List<String>> partitionMaps) { + this.catalogOptions = catalogOptions; + this.tableOptions = tableOptions; + this.partitionMaps = partitionMaps; + this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); + } + + @Override + public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) + throws SchemaEvolveException { + if (catalog == null) { + catalog = Review Comment: The catalog instance has no opportunity to be close which may lead to its hold resources leak IIUC? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/TableSchemaWrapper.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.v2; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.connectors.iceberg.sink.utils.IcebergTypeUtils; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; + +import java.time.ZoneId; +import java.util.List; + +/** A Wrapper for {@link Schema}. */ +public class TableSchemaWrapper { + + private final Schema schema; + + private final List<RecordData.FieldGetter> fieldGetters; + + public TableSchemaWrapper(Schema schema, ZoneId zoneId) { + this.schema = schema; + this.fieldGetters = IcebergTypeUtils.createFieldGetters(schema, zoneId); + } + + public Schema getSchema() { + return schema; + } + + public RowData convertEventToRowData(DataChangeEvent dataChangeEvent) { Review Comment: Could we define a static method or introduce a converter for this conversion for performance consideration ? ########## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml: ########## @@ -41,6 +41,8 @@ limitations under the License. <flink.release.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.release.name> <flink.release.mirror>https://dlcdn.apache.org/flink/flink-${flink.version}</flink.release.mirror> <maven.plugin.download.version>1.6.8</maven.plugin.download.version> + <iceberg.version>1.6.1</iceberg.version> + <hive.version>2.3.9</hive.version> Review Comment: minor: we can use version property to avoid dIff between test and production codebase ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction; + +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.connectors.iceberg.sink.v2.WriteResultWrapper; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.actions.RewriteDataFilesActionResult; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.actions.Actions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** An Operator to add checkpointId to IcebergTable Small file compaction. */ +public class CompactionOperator + extends AbstractStreamOperator<CommittableMessage<WriteResultWrapper>> + implements OneInputStreamOperator< + CommittableMessage<WriteResultWrapper>, CommittableMessage<WriteResultWrapper>> { + protected static final Logger LOGGER = LoggerFactory.getLogger(CompactionOperator.class); + + private final Map<String, String> catalogOptions; + + private Catalog catalog; + + /** store a list of MultiTableCommittable in one checkpoint. */ + private final Map<TableId, Integer> tableCommitTimes; + + private final Set<TableId> compactedTables; + + private final CompactionOptions compactionOptions; + + private volatile Throwable throwable; + + private ExecutorService compactExecutor; + + public CompactionOperator( + Map<String, String> catalogOptions, CompactionOptions compactionOptions) { + this.tableCommitTimes = new HashMap<>(); + this.compactedTables = new HashSet<>(); + this.catalogOptions = catalogOptions; + this.compactionOptions = compactionOptions; + } + + @Override + public void open() throws Exception { + super.open(); + if (compactExecutor == null) { + this.compactExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + Thread.currentThread().getName() + "-Cdc-Compaction")); + } + } + + @Override + public void processElement(StreamRecord<CommittableMessage<WriteResultWrapper>> element) { + if (element.getValue() instanceof CommittableWithLineage) { + TableId tableId = + ((CommittableWithLineage<WriteResultWrapper>) element.getValue()) + .getCommittable() + .getTableId(); + tableCommitTimes.put(tableId, tableCommitTimes.getOrDefault(tableId, 0) + 1); + int commitTimes = tableCommitTimes.get(tableId); + if (commitTimes >= compactionOptions.getCommitInterval() + && !compactedTables.contains(tableId)) { + if (throwable != null) { + throw new RuntimeException(throwable); + } + compactedTables.add(tableId); + if (compactExecutor == null) { + compact(tableId); + } else { + compactExecutor.submit(() -> compact(tableId)); + } + } + } + } + + private void compact(TableId tableId) { + if (catalog == null) { + catalog = Review Comment: close you catalog resources as well ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.v2; + +import org.apache.flink.cdc.common.event.TableId; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.io.WriteResult; + +import java.io.Serializable; + +/** A wrapper class for {@link WriteResult} and {@link TableId}. */ +public class WriteResultWrapper implements Serializable { + + private final WriteResult writeResult; + + private final TableId tableId; + + public WriteResultWrapper(WriteResult writeResult, TableId tableId) { + this.writeResult = writeResult; + this.tableId = tableId; + } + + public WriteResult getWriteResult() { + return writeResult; + } + + public TableId getTableId() { + return tableId; + } + + /** Build a simple description for the write result. */ + public String buildDescription() { + long addCount = 0; + if (writeResult.dataFiles() != null) { + for (DataFile dataFile : writeResult.dataFiles()) { + addCount += dataFile.recordCount(); + } + } + long deleteCount = 0; + if (writeResult.deleteFiles() != null) { + for (DeleteFile dataFile : writeResult.deleteFiles()) { + deleteCount += dataFile.recordCount(); + } + } Review Comment: +1 ########## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java: ########## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.output.ToStringConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.File; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** End-to-end tests for mysql cdc to Iceberg pipeline job. */ +public class MySqlToIcebergE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(MySqlToIcebergE2eITCase.class); + + public static final Duration TESTCASE_TIMEOUT = Duration.ofMinutes(3); + + // ------------------------------------------------------------------------------------------ + // MySQL Variables (we always use MySQL as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + + @org.testcontainers.junit.jupiter.Container + public static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer( + MySqlVersion.V8_0) // v8 support both ARM and AMD architectures + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases("mysql") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + protected final UniqueDatabase inventoryDatabase = + new UniqueDatabase(MYSQL, "iceberg_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + private String warehouse; + + @BeforeAll + public static void initializeContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL)).join(); + LOG.info("Containers are started."); + } + + @BeforeEach + public void before() throws Exception { + LOG.info("Starting containers..."); + warehouse = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + jobManagerConsumer = new ToStringConsumer(); + jobManager = + new GenericContainer<>(getFlinkDockerImageTag()) + .withCommand("jobmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) + .withExposedPorts(JOB_MANAGER_REST_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withCreateContainerCmdModifier(cmd -> cmd.withVolumes(sharedVolume)) + .withFileSystemBind(warehouse, warehouse, BindMode.READ_WRITE) + .withLogConsumer(jobManagerConsumer); + Startables.deepStart(Stream.of(jobManager)).join(); + LOG.info("JobManager is started."); + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", sharedVolume.toString()); + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", warehouse); + + taskManagerConsumer = new ToStringConsumer(); + taskManager = + new GenericContainer<>(getFlinkDockerImageTag()) + .withCommand("taskmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .dependsOn(jobManager) + .withVolumesFrom(jobManager, BindMode.READ_WRITE) + .withFileSystemBind(warehouse, warehouse, BindMode.READ_WRITE) + .withLogConsumer(taskManagerConsumer); + Startables.deepStart(Stream.of(taskManager)).join(); + LOG.info("TaskManager is started."); + runInContainerAsRoot(taskManager, "chmod", "0777", "-R", sharedVolume.toString()); + runInContainerAsRoot(taskManager, "chmod", "0777", "-R", warehouse); + inventoryDatabase.createAndInitialize(); + } + + @AfterEach + public void after() { + super.after(); + inventoryDatabase.dropDatabase(); + } + + @Test + public void testSyncWholeDatabase() throws Exception { + String database = inventoryDatabase.getDatabaseName(); + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: mysql\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: iceberg\n" + + " catalog.properties.warehouse: %s\n" + + " catalog.properties.type: hadoop\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: evolve\n" + + " parallelism: %s", + MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, database, warehouse, parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path icebergCdcConnector = TestUtils.getResource("iceberg-cdc-pipeline-connector.jar"); + Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, icebergCdcConnector, mysqlDriverJar, hadoopJar); + waitUntilJobRunning(Duration.ofSeconds(60)); + LOG.info("Pipeline job is running"); + validateSinkResult( + warehouse, + database, + "products", + Arrays.asList( + "101, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null", + "102, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null", + "103, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null", + "104, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null", + "105, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null", + "106, Six, Ferris, 9.813, null, null, null", + "107, Seven, Grace, 2.117, null, null, null", + "108, Eight, Hesse, 6.819, null, null, null", + "109, Nine, IINA, 5.223, null, null, null")); + + validateSinkResult( + warehouse, + database, + "customers", + Arrays.asList( + "101, user_1, Shanghai, 123567891234", + "102, user_2, Shanghai, 123567891234", + "103, user_3, Shanghai, 123567891234", + "104, user_4, Shanghai, 123567891234")); + + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), MYSQL.getDatabasePort(), database); + List<String> recordsInIncrementalPhase; + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + + stat.execute( + "INSERT INTO products VALUES (default,'Ten','Jukebox',0.2, null, null, null);"); // 110 + stat.execute("UPDATE products SET description='Fay' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.125' WHERE id=107;"); + + // modify table schema + stat.execute("ALTER TABLE products DROP COLUMN point_c;"); + stat.execute("DELETE FROM products WHERE id=101;"); + + stat.execute( + "INSERT INTO products VALUES (default,'Eleven','Kryo',5.18, null, null);"); // 111 + stat.execute( + "INSERT INTO products VALUES (default,'Twelve', 'Lily', 2.14, null, null);"); // 112 + recordsInIncrementalPhase = createChangesAndValidate(stat); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + List<String> recordsInSnapshotPhase = + new ArrayList<>( + Arrays.asList( + "102, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null, null, null, null, null, null, null, null, null, null", + "103, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null, null, null, null, null, null, null, null, null, null", + "104, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null, null, null, null, null, null, null, null, null, null", + "105, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null, null, null, null, null, null, null, null, null, null", + "106, Six, Fay, 9.813, null, null, null, null, null, null, null, null, null, null, null, null", + "107, Seven, Grace, 5.125, null, null, null, null, null, null, null, null, null, null, null, null", + "108, Eight, Hesse, 6.819, null, null, null, null, null, null, null, null, null, null, null, null", + "109, Nine, IINA, 5.223, null, null, null, null, null, null, null, null, null, null, null, null", + "110, Ten, Jukebox, 0.2, null, null, null, null, null, null, null, null, null, null, null, null", + "111, Eleven, Kryo, 5.18, null, null, null, null, null, null, null, null, null, null, null, null", + "112, Twelve, Lily, 2.14, null, null, null, null, null, null, null, null, null, null, null, null")); + recordsInSnapshotPhase.addAll(recordsInIncrementalPhase); + recordsInSnapshotPhase = + recordsInSnapshotPhase.stream().sorted().collect(Collectors.toList()); + validateSinkResult(warehouse, database, "products", recordsInSnapshotPhase); + } + + /** + * Basic Schema: id INTEGER NOT NULL, name VARCHAR(255) NOT NULL, description VARCHAR(512), + * weight FLOAT, enum_c enum('red', 'white'), json_c JSON. + */ + private List<String> createChangesAndValidate(Statement stat) throws SQLException { + List<String> result = new ArrayList<>(); + StringBuilder sqlFields = new StringBuilder(); + + // Add Column. + for (int addColumnRepeat = 0; addColumnRepeat < 10; addColumnRepeat++) { + stat.execute( + String.format( + "ALTER TABLE products ADD COLUMN point_c_%s VARCHAR(10);", + addColumnRepeat)); + sqlFields.append(", '1'"); + StringBuilder resultFields = new StringBuilder(); + for (int j = 0; j < 10; j++) { Review Comment: minor: we give meaningful variable name to avoid to use `i` and `j`, and the 10 is a magic number, we can define a constant with a meaningful name ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.v2; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.sink2.CommittingSinkWriter; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.io.TaskWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** A {@link SinkWriter} for Apache Iceberg. */ +public class IcebergWriter implements CommittingSinkWriter<Event, WriteResultWrapper> { + + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergWriter.class); + + public static final String DEFAULT_FILE_FORMAT = "parquet"; + + public static final long DEFAULT_MAX_FILE_SIZE = 256 * 1024 * 1024; + + Map<TableId, RowDataTaskWriterFactory> writerFactoryMap; + + Map<TableId, TaskWriter<RowData>> writerMap; + + Map<TableId, TableSchemaWrapper> schemaMap; + + List<WriteResultWrapper> temporaryWriteResult; + + private final Catalog catalog; + + private final int taskId; + + private final int attemptId; + + private final ZoneId zoneId; + + public IcebergWriter( + Map<String, String> catalogOptions, int taskId, int attemptId, ZoneId zoneId) { + catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-writer-catalog", catalogOptions, new Configuration()); + writerFactoryMap = new HashMap<>(); + writerMap = new HashMap<>(); + schemaMap = new HashMap<>(); + temporaryWriteResult = new ArrayList<>(); + this.taskId = taskId; + this.attemptId = attemptId; + this.zoneId = zoneId; + } + + @Override + public Collection<WriteResultWrapper> prepareCommit() throws IOException, InterruptedException { + List<WriteResultWrapper> list = new ArrayList<>(); + list.addAll(temporaryWriteResult); + list.addAll(getWriteResult()); + temporaryWriteResult.clear(); + return list; + } + + private RowDataTaskWriterFactory getRowDataTaskWriterFactory(TableId tableId) { + Table table = catalog.loadTable(TableIdentifier.parse(tableId.identifier())); + RowType rowType = FlinkSchemaUtil.convert(table.schema()); + RowDataTaskWriterFactory rowDataTaskWriterFactory = + new RowDataTaskWriterFactory( + table, + rowType, + DEFAULT_MAX_FILE_SIZE, + FileFormat.fromString(DEFAULT_FILE_FORMAT), + new HashMap<>(), + new ArrayList<>(table.schema().identifierFieldIds()), + true); + rowDataTaskWriterFactory.initialize(taskId, attemptId); + return rowDataTaskWriterFactory; + } + + @Override + public void write(Event event, Context context) throws IOException, InterruptedException { + if (event instanceof DataChangeEvent) { + DataChangeEvent dataChangeEvent = (DataChangeEvent) event; + TableId tableId = dataChangeEvent.tableId(); + writerFactoryMap.computeIfAbsent(tableId, this::getRowDataTaskWriterFactory); + TaskWriter<RowData> writer = + writerMap.computeIfAbsent( + tableId, tableId1 -> writerFactoryMap.get(tableId1).create()); + RowData rowData = schemaMap.get(tableId).convertEventToRowData(dataChangeEvent); + writer.write(rowData); + } else { + SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; + TableId tableId = schemaChangeEvent.tableId(); + TableSchemaWrapper tableSchemaWrapper = schemaMap.get(tableId); + + Schema newSchema = + tableSchemaWrapper != null + ? SchemaUtils.applySchemaChangeEvent( + tableSchemaWrapper.getSchema(), schemaChangeEvent) + : SchemaUtils.applySchemaChangeEvent(null, schemaChangeEvent); + schemaMap.put(tableId, new TableSchemaWrapper(newSchema, zoneId)); + } + } + + @Override + public void flush(boolean flush) throws IOException { + // Notice: flush method may be called many times during one checkpoint. + temporaryWriteResult.addAll(getWriteResult()); + } + + private List<WriteResultWrapper> getWriteResult() throws IOException { + List<WriteResultWrapper> writeResults = new ArrayList<>(); + for (Map.Entry<TableId, TaskWriter<RowData>> entry : writerMap.entrySet()) { + WriteResultWrapper writeResultWrapper = + new WriteResultWrapper(entry.getValue().complete(), entry.getKey()); + writeResults.add(writeResultWrapper); + LOGGER.info(writeResultWrapper.buildDescription()); + } + writerMap.clear(); + writerFactoryMap.clear(); + return writeResults; + } + + @Override + public void writeWatermark(Watermark watermark) {} + + @Override + public void close() throws Exception { Review Comment: Plz note the lifecycle of catalog ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.factories.FactoryHelper; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.connectors.iceberg.sink.utils.OptionUtils; +import org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction.CompactionOptions; + +import java.time.ZoneId; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES; +import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_TABLE_PROPERTIES; + +/** A {@link DataSinkFactory} for Apache Iceberg. */ +public class IcebergDataSinkFactory implements DataSinkFactory { + + public static final String IDENTIFIER = "iceberg"; + + @Override + public DataSink createDataSink(Context context) { + FactoryHelper.createFactoryHelper(this, context) + .validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES); + + Map<String, String> allOptions = context.getFactoryConfiguration().toMap(); + OptionUtils.printOptions(IDENTIFIER, allOptions); + + Map<String, String> catalogOptions = new HashMap<>(); + Map<String, String> tableOptions = new HashMap<>(); + allOptions.forEach( + (key, value) -> { + if (key.startsWith(PREFIX_TABLE_PROPERTIES)) { + tableOptions.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), value); + } else if (key.startsWith(IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) { + catalogOptions.put( Review Comment: If we allow pass all catalog and table properties, would this introduce potential unexpected behavior e.g. user set ` cache-enabled` to ture? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.utils; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSink; +import org.apache.flink.table.data.TimestampData; + +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount; +import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision; + +/** Util class for types in {@link IcebergDataSink}. */ +public class IcebergTypeUtils { + + /** Convert column from Flink CDC framework to Iceberg framework. */ + public static Types.NestedField convertCdcColumnToIcebergField( + int index, PhysicalColumn column) { + DataType dataType = column.getType(); + return Types.NestedField.of( + index, + dataType.isNullable(), + column.getName(), + convertCDCTypeToIcebergType(dataType), + column.getComment()); + } + + /** + * Convert data type from CDC framework to Iceberg framework, refer to <a Review Comment: ```suggestion * Convert data type from Flink CDC to Iceberg format, refer to <a ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/TableMetric.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.v2; + +import org.apache.flink.metrics.MetricGroup; + +import java.util.concurrent.atomic.AtomicInteger; + +/** Metrics of commit message for each table. */ +public class TableMetric { + + /** Number of commit times. */ + public static final String NUM_COMMIT_TIME = "numCommitTime"; Review Comment: `times` and `time` have different semantics, the first one means `commit count`, the later one means `commit duration`, which one do you want to express? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction; + +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.connectors.iceberg.sink.v2.WriteResultWrapper; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.actions.RewriteDataFilesActionResult; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.actions.Actions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** An Operator to add checkpointId to IcebergTable Small file compaction. */ Review Comment: plz improve the java doc, it's unclear for me. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction; + +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.connectors.iceberg.sink.v2.WriteResultWrapper; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.actions.RewriteDataFilesActionResult; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.actions.Actions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** An Operator to add checkpointId to IcebergTable Small file compaction. */ +public class CompactionOperator + extends AbstractStreamOperator<CommittableMessage<WriteResultWrapper>> + implements OneInputStreamOperator< + CommittableMessage<WriteResultWrapper>, CommittableMessage<WriteResultWrapper>> { + protected static final Logger LOGGER = LoggerFactory.getLogger(CompactionOperator.class); + + private final Map<String, String> catalogOptions; + + private Catalog catalog; + + /** store a list of MultiTableCommittable in one checkpoint. */ + private final Map<TableId, Integer> tableCommitTimes; + + private final Set<TableId> compactedTables; + + private final CompactionOptions compactionOptions; + + private volatile Throwable throwable; + + private ExecutorService compactExecutor; + + public CompactionOperator( + Map<String, String> catalogOptions, CompactionOptions compactionOptions) { + this.tableCommitTimes = new HashMap<>(); + this.compactedTables = new HashSet<>(); + this.catalogOptions = catalogOptions; + this.compactionOptions = compactionOptions; + } + + @Override + public void open() throws Exception { + super.open(); + if (compactExecutor == null) { + this.compactExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + Thread.currentThread().getName() + "-Cdc-Compaction")); + } + } + + @Override + public void processElement(StreamRecord<CommittableMessage<WriteResultWrapper>> element) { + if (element.getValue() instanceof CommittableWithLineage) { + TableId tableId = + ((CommittableWithLineage<WriteResultWrapper>) element.getValue()) + .getCommittable() + .getTableId(); + tableCommitTimes.put(tableId, tableCommitTimes.getOrDefault(tableId, 0) + 1); + int commitTimes = tableCommitTimes.get(tableId); + if (commitTimes >= compactionOptions.getCommitInterval() + && !compactedTables.contains(tableId)) { + if (throwable != null) { + throw new RuntimeException(throwable); + } + compactedTables.add(tableId); + if (compactExecutor == null) { + compact(tableId); + } else { + compactExecutor.submit(() -> compact(tableId)); + } + } + } + } + + private void compact(TableId tableId) { + if (catalog == null) { + catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-compaction-catalog", catalogOptions, new Configuration()); + } + try { + RewriteDataFilesActionResult rewriteResult = + Actions.forTable( + StreamExecutionEnvironment.createLocalEnvironment(), + catalog.loadTable(TableIdentifier.parse(tableId.identifier()))) + .rewriteDataFiles() + .execute(); + LOGGER.info( + "Iceberg small file compact result for {}: {} data files and {} delete files", Review Comment: Iceberg small file compact result for {}: added {} data files and deleted {} files. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java: ########## @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific langutinyIntCol governing permissions and Review Comment: langutinyIntCol ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org