This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new f49b263e65 [Improve][Connector-V2] Remove hard code iceberg table format version (#7500) f49b263e65 is described below commit f49b263e6517cee13a7c853a83472ce863f4b9d1 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Thu Aug 29 22:41:19 2024 +0800 [Improve][Connector-V2] Remove hard code iceberg table format version (#7500) --- docs/en/faq.md | 21 ----------- docs/zh/faq.md | 21 ----------- .../seatunnel/iceberg/IcebergCatalogLoader.java | 11 ++---- .../seatunnel/iceberg/catalog/IcebergCatalog.java | 37 +++++++++---------- .../seatunnel/iceberg/data/RowConverter.java | 22 ++++------- .../seatunnel/iceberg/sink/IcebergSink.java | 6 +-- .../seatunnel/iceberg/sink/IcebergSinkWriter.java | 10 ++--- .../iceberg/sink/writer/IcebergRecordWriter.java | 12 +----- .../iceberg/sink/writer/IcebergWriterFactory.java | 4 -- .../seatunnel/iceberg/source/IcebergSource.java | 10 ++--- .../seatunnel/iceberg/utils/SchemaUtils.java | 43 +++++----------------- 11 files changed, 51 insertions(+), 146 deletions(-) diff --git a/docs/en/faq.md b/docs/en/faq.md index 2e50c9d461..02c125ad4f 100644 --- a/docs/en/faq.md +++ b/docs/en/faq.md @@ -203,23 +203,6 @@ spark { } ``` -## How do I specify a different JDK version for SeaTunnel on YARN? - -For example, if you want to set the JDK version to JDK8, there are two cases: - -- The YARN cluster has deployed JDK8, but the default JDK is not JDK8. Add two configurations to the SeaTunnel config file: - - ``` - env { - ... - spark.executorEnv.JAVA_HOME="/your/java_8_home/directory" - spark.yarn.appMasterEnv.JAVA_HOME="/your/java_8_home/directory" - ... - } - ``` -- YARN cluster does not deploy JDK8. At this time, start SeaTunnel attached with JDK8. For detailed operations, see: - https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html - ## What should I do if OOM always appears when running SeaTunnel in Spark local[*] mode? If you run in local mode, you need to modify the `start-seatunnel.sh` startup script. After `spark-submit`, add a parameter `--driver-memory 4g` . Under normal circumstances, local mode is not used in the production environment. Therefore, this parameter generally does not need to be set during On YARN. See: [Application Properties](https://spark.apache.org/docs/latest/configuration.html#application-properties) for details. @@ -334,10 +317,6 @@ spark-submit --verbose ... ``` -## How do I use SeaTunnel to synchronize data across HDFS clusters? - -Just configure hdfs-site.xml properly. Refer to: https://www.cnblogs.com/suanec/p/7828139.html. - ## I want to learn the source code of SeaTunnel. Where should I start? SeaTunnel has a completely abstract and structured code implementation, and many people have chosen SeaTunnel As a way to learn Spark. You can learn the source code from the main program entry: SeaTunnel.java diff --git a/docs/zh/faq.md b/docs/zh/faq.md index 3be6ce38e5..4fc24e6a3a 100644 --- a/docs/zh/faq.md +++ b/docs/zh/faq.md @@ -204,23 +204,6 @@ spark { } ``` -## 如何为 YARN 上的 SeaTunnel 指定不同的 JDK 版本? - -例如要设置JDK版本为JDK8,有两种情况: - -- YARN集群已部署JDK8,但默认JDK不是JDK8。 在 SeaTunnel 配置文件中添加两个配置: - - ``` - env { - ... - spark.executorEnv.JAVA_HOME="/your/java_8_home/directory" - spark.yarn.appMasterEnv.JAVA_HOME="/your/java_8_home/directory" - ... - } - ``` -- YARN集群未部署JDK8。 此时,启动附带JDK8的SeaTunnel。 详细操作参见: - https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html - ## Spark local[*]模式运行SeaTunnel时总是出现OOM怎么办? 如果以本地模式运行,则需要修改`start-seatunnel.sh`启动脚本。 在 `spark-submit` 之后添加参数 `--driver-memory 4g` 。 一般情况下,生产环境中不使用本地模式。 因此,On YARN时一般不需要设置该参数。 有关详细信息,请参阅:[应用程序属性](https://spark.apache.org/docs/latest/configuration.html#application-properties)。 @@ -335,10 +318,6 @@ spark-submit --verbose ... ``` -## 如何使用SeaTunnel跨HDFS集群同步数据? - -只需正确配置 hdfs-site.xml 即可。 参考:https://www.cnblogs.com/suanec/p/7828139.html。 - ## 我想学习SeaTunnel的源代码。 我应该从哪里开始? SeaTunnel 拥有完全抽象、结构化的代码实现,很多人都选择 SeaTunnel 作为学习 Spark 的方式。 您可以从主程序入口了解源代码:SeaTunnel.java diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java index 0f4610783a..bbb590502c 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java @@ -50,25 +50,20 @@ public class IcebergCatalogLoader implements Serializable { private static final long serialVersionUID = -6003040601422350869L; private static final List<String> HADOOP_CONF_FILES = ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); - private CommonConfig config; + private final CommonConfig config; public IcebergCatalogLoader(CommonConfig config) { this.config = config; } public Catalog loadCatalog() { - // When using the seatunel engine, set the current class loader to prevent loading failures + // When using the SeaTunnel engine, set the current class loader to prevent loading failures Thread.currentThread().setContextClassLoader(IcebergCatalogLoader.class.getClassLoader()); return CatalogUtil.buildIcebergCatalog( config.getCatalogName(), config.getCatalogProps(), loadHadoopConfig(config)); } - /** - * Loading Hadoop configuration through reflection - * - * @param config - * @return - */ + /** Loading Hadoop configuration through reflection */ public Object loadHadoopConfig(CommonConfig config) { Class<?> configClass = DynClasses.builder() diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java index 520f9bdbac..fc28001b2c 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java @@ -58,9 +58,9 @@ import static org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtil @Slf4j public class IcebergCatalog implements Catalog { - private String catalogName; - private ReadonlyConfig readonlyConfig; - private IcebergCatalogLoader icebergCatalogLoader; + private final String catalogName; + private final ReadonlyConfig readonlyConfig; + private final IcebergCatalogLoader icebergCatalogLoader; private org.apache.iceberg.catalog.Catalog catalog; public IcebergCatalog(String catalogName, ReadonlyConfig readonlyConfig) { @@ -224,22 +224,21 @@ public class IcebergCatalog implements Catalog { public CatalogTable toCatalogTable(Table icebergTable, TablePath tablePath) { List<Types.NestedField> columns = icebergTable.schema().columns(); TableSchema.Builder builder = TableSchema.builder(); - columns.stream() - .forEach( - nestedField -> { - String name = nestedField.name(); - SeaTunnelDataType<?> seaTunnelType = - SchemaUtils.toSeaTunnelType(name, nestedField.type()); - PhysicalColumn physicalColumn = - PhysicalColumn.of( - name, - seaTunnelType, - (Long) null, - true, - null, - nestedField.doc()); - builder.column(physicalColumn); - }); + columns.forEach( + nestedField -> { + String name = nestedField.name(); + SeaTunnelDataType<?> seaTunnelType = + SchemaUtils.toSeaTunnelType(name, nestedField.type()); + PhysicalColumn physicalColumn = + PhysicalColumn.of( + name, + seaTunnelType, + (Long) null, + true, + null, + nestedField.doc()); + builder.column(physicalColumn); + }); List<String> partitionKeys = icebergTable.spec().fields().stream() diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java index 8c699b3440..f46928456f 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java @@ -92,17 +92,17 @@ public class RowConverter { return nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; } - public Record convert(Object row, SeaTunnelDataType rowType) { + public Record convert(Object row, SeaTunnelDataType<?> rowType) { return convertStructValue(row, rowType, tableSchema.asStruct(), -1, null); } - public Record convert(Object row, SeaTunnelDataType rowType, SchemaChangeWrapper wrapper) { + public Record convert(Object row, SeaTunnelDataType<?> rowType, SchemaChangeWrapper wrapper) { return convertStructValue(row, rowType, tableSchema.asStruct(), -1, wrapper); } protected GenericRecord convertStructValue( Object value, - SeaTunnelDataType fromType, + SeaTunnelDataType<?> fromType, Types.StructType schema, int parentFieldId, SchemaChangeWrapper wrapper) { @@ -120,15 +120,7 @@ public class RowConverter { } } - /** - * Convert RowType - * - * @param row - * @param fromType - * @param schema - * @param structFieldId - * @return - */ + /** Convert RowType */ private GenericRecord convertToStruct( SeaTunnelRow row, SeaTunnelRowType fromType, @@ -179,7 +171,7 @@ public class RowConverter { public Object convertValue( Object value, - SeaTunnelDataType fromType, + SeaTunnelDataType<?> fromType, Type type, int fieldId, SchemaChangeWrapper wrapper) { @@ -252,7 +244,7 @@ public class RowConverter { protected List<Object> convertListValue( Object value, - SeaTunnelDataType fromType, + SeaTunnelDataType<?> fromType, Types.ListType type, SchemaChangeWrapper wrapper) { Preconditions.checkArgument(value.getClass().isArray()); @@ -269,7 +261,7 @@ public class RowConverter { protected Map<Object, Object> convertMapValue( Object value, - SeaTunnelDataType fromType, + SeaTunnelDataType<?> fromType, Types.MapType type, SchemaChangeWrapper wrapper) { Preconditions.checkArgument(value instanceof Map); diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java index 65bccbdb89..a1d43d6acf 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java @@ -61,9 +61,9 @@ public class IcebergSink SupportSaveMode, SupportMultiTableSink { private static String PLUGIN_NAME = "Iceberg"; - private SinkConfig config; - private ReadonlyConfig readonlyConfig; - private CatalogTable catalogTable; + private final SinkConfig config; + private final ReadonlyConfig readonlyConfig; + private final CatalogTable catalogTable; public IcebergSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) { this.readonlyConfig = pluginConfig; diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java index aed6522ca8..3a5e22b93b 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java @@ -54,13 +54,12 @@ public class IcebergSinkWriter implements SinkWriter<SeaTunnelRow, IcebergCommitInfo, IcebergSinkState>, SupportMultiTableSinkWriter<Void> { private SeaTunnelRowType rowType; - private SinkConfig config; - private IcebergTableLoader icebergTableLoader; + private final SinkConfig config; + private final IcebergTableLoader icebergTableLoader; private RecordWriter writer; - private IcebergFilesCommitter filesCommitter; - private List<WriteResult> results = Lists.newArrayList(); + private final IcebergFilesCommitter filesCommitter; + private final List<WriteResult> results = Lists.newArrayList(); private String commitUser = UUID.randomUUID().toString(); - private long checkpointId; private final DataTypeChangeEventHandler dataTypeChangeEventHandler; @@ -77,7 +76,6 @@ public class IcebergSinkWriter tryCreateRecordWriter(); if (Objects.nonNull(states) && !states.isEmpty()) { this.commitUser = states.get(0).getCommitUser(); - this.checkpointId = states.get(0).getCheckpointId(); preCommit(states); } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java index 2be206ebb6..06b48591df 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java @@ -54,7 +54,7 @@ public class IcebergRecordWriter implements RecordWriter { private final List<WriteResult> writerResults; private TaskWriter<Record> writer; private RowConverter recordConverter; - private IcebergWriterFactory writerFactory; + private final IcebergWriterFactory writerFactory; public IcebergRecordWriter(Table table, IcebergWriterFactory writerFactory, SinkConfig config) { this.config = config; @@ -122,12 +122,7 @@ public class IcebergRecordWriter implements RecordWriter { } } - /** - * apply schema update - * - * @param updates - * @return - */ + /** apply schema update */ private void applySchemaUpdate(SchemaChangeWrapper updates) { // complete the current file flush(); @@ -169,7 +164,4 @@ public class IcebergRecordWriter implements RecordWriter { table.spec().partitionType())); writer = null; } - - @Override - public void close() {} } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java index 67809088ef..2ee7c3d6d7 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java @@ -40,9 +40,6 @@ import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PropertyUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -58,7 +55,6 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE @Slf4j public class IcebergWriterFactory { - private static final Logger LOG = LoggerFactory.getLogger(IcebergWriterFactory.class); private final IcebergTableLoader tableLoader; private final SinkConfig config; diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java index 7a2fdf9d4f..c56f3f2f00 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java @@ -65,12 +65,12 @@ public class IcebergSource private static final long serialVersionUID = 4343414808223919870L; - private SourceConfig sourceConfig; - private Schema tableSchema; - private Schema projectedSchema; - private SeaTunnelRowType seaTunnelRowType; + private final SourceConfig sourceConfig; + private final Schema tableSchema; + private final Schema projectedSchema; + private final SeaTunnelRowType seaTunnelRowType; private JobContext jobContext; - private CatalogTable catalogTable; + private final CatalogTable catalogTable; public IcebergSource(ReadonlyConfig config, CatalogTable catalogTable) { this.sourceConfig = SourceConfig.loadConfig(config); diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java index 6c99eb409c..01343a119f 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java @@ -40,7 +40,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -106,21 +105,11 @@ public class SchemaUtils { SinkConfig config = new SinkConfig(readonlyConfig); // build auto create table Map<String, String> options = new HashMap<>(table.getOptions()); - options.put(TableProperties.FORMAT_VERSION, "2"); // override options.putAll(config.getAutoCreateProps()); return createTable(catalog, toIcebergTableIdentifier(tablePath), config, schema, options); } - /** - * For local test - * - * @param catalog - * @param tableIdentifier - * @param config - * @param rowType - * @return - */ public static Table autoCreateTable( Catalog catalog, TableIdentifier tableIdentifier, @@ -180,7 +169,7 @@ public class SchemaUtils { Optional<Integer> pkId = structType.fields().stream() .filter(nestedField -> nestedField.name().equals(pk)) - .map(nestedField -> nestedField.fieldId()) + .map(Types.NestedField::fieldId) .findFirst(); if (!pkId.isPresent()) { throw new IllegalArgumentException( @@ -196,23 +185,14 @@ public class SchemaUtils { structType .fields() .forEach( - field -> { - fields.add( - identifierFieldIds.contains(field.fieldId()) - ? field.asRequired() - : field.asOptional()); - }); + field -> + fields.add( + identifierFieldIds.contains(field.fieldId()) + ? field.asRequired() + : field.asOptional())); return new Schema(fields, identifierFieldIds); } - public static TableIdentifier toIcebergTableIdentifierFromCatalogTable( - CatalogTable catalogTable) { - org.apache.seatunnel.api.table.catalog.TableIdentifier tableIdentifier = - catalogTable.getTableId(); - return TableIdentifier.of( - tableIdentifier.getDatabaseName(), tableIdentifier.getTableName()); - } - public static TableIdentifier toIcebergTableIdentifier(TablePath tablePath) { return TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName()); } @@ -221,12 +201,7 @@ public class SchemaUtils { return TablePath.of(tableIdentifier.namespace().toString(), tableIdentifier.name()); } - /** - * Commit table schema updates - * - * @param table - * @param wrapper - */ + /** Commit table schema updates */ private static void commitSchemaUpdates(Table table, SchemaChangeWrapper wrapper) { // get the latest schema in case another process updated it table.refresh(); @@ -249,7 +224,7 @@ public class SchemaUtils { .collect(toList()); // Rename column name - List<SchemaChangeColumn> changeColumns = wrapper.changeColumns().stream().collect(toList()); + List<SchemaChangeColumn> changeColumns = new ArrayList<>(wrapper.changeColumns()); if (addColumns.isEmpty() && modifyColumns.isEmpty() @@ -294,7 +269,7 @@ public class SchemaUtils { return IcebergTypeMapper.mapping(fieldName, type); } - public static Type toIcebergType(SeaTunnelDataType rowType) { + public static Type toIcebergType(SeaTunnelDataType<?> rowType) { return IcebergTypeMapper.toIcebergType(rowType); }