This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f8266fe35 [INLONG-6402][Sort] Modify the metadata field of oracle connector (#6404) f8266fe35 is described below commit f8266fe35c83456eabe356ff27927deafd6342f0 Author: emhui <111486498+e-m...@users.noreply.github.com> AuthorDate: Mon Nov 7 11:17:39 2022 +0800 [INLONG-6402][Sort] Modify the metadata field of oracle connector (#6404) * [INLONG-6402][Sort] Modify the metadata field of oracle connector * [INLONG-6402][Sort] Remove extra punctuation * [INLONG-6402][Sort] Modify the metadata field of oracle connector to be consistent with mysql connector * [INLONG-6402][Sort] Compatible with open source oracle connector * [INLONG-6402][Sort] Remove incorrect information * [INLONG-6402][Sort] Remove unused meta field Co-authored-by: menghuiyu <menghu...@tencent.com> --- .../org/apache/inlong/common/enums/MetaField.java | 5 + .../org/apache/inlong/sort/protocol/Metadata.java | 11 +- .../protocol/node/extract/OracleExtractNode.java | 55 ++++++- .../node/extract/OracleExtractNodeTest.java | 14 +- .../cdc/oracle/table/OracleReadableMetaData.java | 166 ++++++++------------- 5 files changed, 137 insertions(+), 114 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java index f88da9c4d..4a065583d 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java @@ -116,6 +116,11 @@ public enum MetaField { */ MYSQL_TYPE, + /** + * The table structure. It is only used for Oracle database + */ + ORACLE_TYPE, + /** * Primary key field name. Currently, it is used for MySQL database. */ diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java index 71c37e4ae..6c2ac4d2d 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java @@ -54,14 +54,6 @@ public interface Metadata { case OP_TS: metadataKey = "op_ts"; break; - case DATA: - case DATA_BYTES: - metadataKey = "meta.data"; - break; - case DATA_CANAL: - case DATA_BYTES_CANAL: - metadataKey = "meta.data_canal"; - break; default: throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", @@ -106,6 +98,9 @@ public interface Metadata { case MYSQL_TYPE: metadataType = "MAP<STRING, STRING>"; break; + case ORACLE_TYPE: + metadataType = "MAP<STRING, STRING>"; + break; case PK_NAMES: metadataType = "ARRAY<STRING>"; break; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java index a1bad2523..594ccdd60 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java @@ -150,7 +150,58 @@ public class OracleExtractNode extends ExtractNode implements InlongMetric, Meta @Override public Set<MetaField> supportedMetaFields() { return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.DATABASE_NAME, - MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.DATA, MetaField.DATA_CANAL, - MetaField.DATA_BYTES, MetaField.DATA_BYTES_CANAL); + MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.OP_TYPE, MetaField.DATA, MetaField.DATA_BYTES, + MetaField.DATA_CANAL, MetaField.DATA_BYTES_CANAL, MetaField.IS_DDL, MetaField.TS, + MetaField.SQL_TYPE, MetaField.ORACLE_TYPE, MetaField.PK_NAMES); } + + @Override + public String getMetadataKey(MetaField metaField) { + String metadataKey; + switch (metaField) { + case TABLE_NAME: + metadataKey = "meta.table_name"; + break; + case DATABASE_NAME: + metadataKey = "meta.database_name"; + break; + case SCHEMA_NAME: + metadataKey = "meta.schema_name"; + break; + case OP_TS: + metadataKey = "meta.op_ts"; + break; + case OP_TYPE: + metadataKey = "meta.op_type"; + break; + case DATA: + case DATA_BYTES: + metadataKey = "meta.data"; + break; + case DATA_CANAL: + case DATA_BYTES_CANAL: + metadataKey = "meta.data_canal"; + break; + case IS_DDL: + metadataKey = "meta.is_ddl"; + break; + case TS: + metadataKey = "meta.ts"; + break; + case SQL_TYPE: + metadataKey = "meta.sql_type"; + break; + case ORACLE_TYPE: + metadataKey = "meta.oracle_type"; + break; + case PK_NAMES: + metadataKey = "meta.pk_names"; + break; + default: + throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", + this.getClass().getSimpleName(), metaField)); + } + return metadataKey; + } + } diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNodeTest.java index 98394467b..3ddd10c85 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNodeTest.java @@ -50,14 +50,20 @@ public class OracleExtractNodeTest extends SerializeBaseTest<OracleExtractNode> public void testMetaFields() { Map<MetaField, String> formatMap = new HashMap<>(); formatMap.put(MetaField.PROCESS_TIME, "AS PROCTIME()"); - formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'table_name' VIRTUAL"); - formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'database_name' VIRTUAL"); - formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL"); - formatMap.put(MetaField.SCHEMA_NAME, "STRING METADATA FROM 'schema_name' VIRTUAL"); + formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'meta.table_name' VIRTUAL"); + formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'meta.database_name' VIRTUAL"); + formatMap.put(MetaField.SCHEMA_NAME, "STRING METADATA FROM 'meta.schema_name' VIRTUAL"); + formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'meta.op_ts' VIRTUAL"); + formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'meta.op_type' VIRTUAL"); formatMap.put(MetaField.DATA, "STRING METADATA FROM 'meta.data' VIRTUAL"); formatMap.put(MetaField.DATA_CANAL, "STRING METADATA FROM 'meta.data_canal' VIRTUAL"); formatMap.put(MetaField.DATA_BYTES, "BYTES METADATA FROM 'meta.data' VIRTUAL"); formatMap.put(MetaField.DATA_BYTES_CANAL, "BYTES METADATA FROM 'meta.data_canal' VIRTUAL"); + formatMap.put(MetaField.IS_DDL, "BOOLEAN METADATA FROM 'meta.is_ddl' VIRTUAL"); + formatMap.put(MetaField.TS, "TIMESTAMP_LTZ(3) METADATA FROM 'meta.ts' VIRTUAL"); + formatMap.put(MetaField.SQL_TYPE, "MAP<STRING, INT> METADATA FROM 'meta.sql_type' VIRTUAL"); + formatMap.put(MetaField.ORACLE_TYPE, "MAP<STRING, STRING> METADATA FROM 'meta.oracle_type' VIRTUAL"); + formatMap.put(MetaField.PK_NAMES, "ARRAY<STRING> METADATA FROM 'meta.pk_names' VIRTUAL"); OracleExtractNode node = getTestObject(); boolean formatEquals = true; for (MetaField metaField : node.supportedMetaFields()) { diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java index d40758b45..1b48a5870 100644 --- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java +++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java @@ -47,7 +47,9 @@ import org.apache.kafka.connect.source.SourceRecord; /** Defines the supported metadata columns for {@link OracleTableSource}. */ public enum OracleReadableMetaData { - /** Name of the table that contain the row. */ + /** + * Name of the table that contain the row. + */ TABLE_NAME( "table_name", DataTypes.STRING().notNull(), @@ -59,7 +61,10 @@ public enum OracleReadableMetaData { return StringData.fromString(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY)); } }), - /** Name of the schema that contain the row. */ + + /** + * Name of the schema that contain the row. + */ SCHEMA_NAME( "schema_name", DataTypes.STRING().notNull(), @@ -72,7 +77,9 @@ public enum OracleReadableMetaData { } }), - /** Name of the database that contain the row. */ + /** + * Name of the database that contain the row. + */ DATABASE_NAME( "database_name", DataTypes.STRING().notNull(), @@ -86,72 +93,50 @@ public enum OracleReadableMetaData { }), /** - * It indicates the time that the change was made in the database. If the record is read from - * snapshot of the table instead of the change stream, the value is always 0. + * It indicates the time that the change was made in the database. */ OP_TS( "op_ts", - DataTypes.TIMESTAMP_LTZ(3).notNull(), + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(SourceRecord record) { Struct messageStruct = (Struct) record.value(); - Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE); return TimestampData.fromEpochMillis( (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY)); } }), - DATA( - "meta.data", - DataTypes.STRING(), - new MetadataConverter() { - private static final long serialVersionUID = 1L; - - @Override - public Object read(SourceRecord record) { - return null; - } - - @Override - public Object read(SourceRecord record, - @Nullable TableChanges.TableChange tableSchema, RowData rowData) { - return getCanalData(record, tableSchema, (GenericRowData) rowData); - } - }), - - DATA_CANAL( - "meta.data_canal", - DataTypes.STRING(), + /** + * Name of the table that contain the row. + */ + META_TABLE_NAME( + "meta.table_name", + DataTypes.STRING().notNull(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(SourceRecord record) { - return null; - } - - @Override - public Object read(SourceRecord record, - @Nullable TableChanges.TableChange tableSchema, RowData rowData) { - return getCanalData(record, tableSchema, (GenericRowData) rowData); + return StringData.fromString(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY)); } }), /** - * Name of the table that contain the row. . + * Name of the schema that contain the row. */ - META_TABLE_NAME( - "meta.table_name", + META_SCHEMA_NAME( + "meta.schema_name", DataTypes.STRING().notNull(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(SourceRecord record) { - return StringData.fromString(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY)); + return StringData.fromString(getMetaData(record, AbstractSourceInfo.SCHEMA_NAME_KEY)); } }), @@ -171,8 +156,7 @@ public enum OracleReadableMetaData { }), /** - * It indicates the time that the change was made in the database. If the record is read from - * snapshot of the table instead of the binlog, the value is always 0. + * It indicates the time that the change was made in the database. */ META_OP_TS( "meta.op_ts", @@ -189,79 +173,74 @@ public enum OracleReadableMetaData { } }), - /** - * Operation type, INSERT/UPDATE/DELETE. - */ - OP_TYPE( - "meta.op_type", - DataTypes.STRING().notNull(), + DATA( + "meta.data", + DataTypes.STRING(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(SourceRecord record) { - return StringData.fromString(getOpType(record)); + return null; + } + + @Override + public Object read(SourceRecord record, + @Nullable TableChanges.TableChange tableSchema, RowData rowData) { + return getCanalData(record, tableSchema, (GenericRowData) rowData); } }), - /** - * Not important, a simple increment counter. - */ - BATCH_ID( - "meta.batch_id", - DataTypes.BIGINT().nullable(), + DATA_CANAL( + "meta.data_canal", + DataTypes.STRING(), new MetadataConverter() { private static final long serialVersionUID = 1L; - private long id = 0; - @Override public Object read(SourceRecord record) { - return id++; + return null; + } + + @Override + public Object read(SourceRecord record, + @Nullable TableChanges.TableChange tableSchema, RowData rowData) { + return getCanalData(record, tableSchema, (GenericRowData) rowData); } }), /** - * Source does not emit ddl data. + * Operation type, INSERT/UPDATE/DELETE. */ - IS_DDL( - "meta.is_ddl", - DataTypes.BOOLEAN().notNull(), + OP_TYPE( + "meta.op_type", + DataTypes.STRING().notNull(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(SourceRecord record) { - return false; + return StringData.fromString(getOpType(record)); } }), /** - * The update-before data for UPDATE record. + * Source does not emit ddl data. */ - OLD( - "meta.update_before", - DataTypes.ARRAY( - DataTypes.MAP( - DataTypes.STRING().nullable(), - DataTypes.STRING().nullable()) - .nullable()) - .nullable(), + IS_DDL( + "meta.is_ddl", + DataTypes.BOOLEAN().notNull(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(SourceRecord record) { - final Envelope.Operation op = Envelope.operationFor(record); - if (op != Envelope.Operation.UPDATE) { - return null; - } - return record; + return false; } }), - MYSQL_TYPE( - "meta.mysql_type", + ORACLE_TYPE( + "meta.oracle_type", DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable(), new MetadataConverter() { private static final long serialVersionUID = 1L; @@ -274,24 +253,11 @@ public enum OracleReadableMetaData { @Override public Object read( SourceRecord record, @Nullable TableChanges.TableChange tableSchema) { - if (tableSchema == null) { + Map<String, String> oracleType = getOracleType(tableSchema); + if (oracleType == null) { return null; } - Map<StringData, StringData> mysqlType = new HashMap<>(); - final Table table = tableSchema.getTable(); - table.columns() - .forEach( - column -> { - mysqlType.put( - StringData.fromString(column.name()), - StringData.fromString( - String.format( - "%s(%d)", - column.typeName(), - column.length()))); - }); - - return new GenericMapData(mysqlType); + return new GenericMapData(oracleType); } }), @@ -348,17 +314,17 @@ public enum OracleReadableMetaData { if (tableSchema == null) { return null; } - Map<StringData, Integer> mysqlType = new HashMap<>(); + Map<StringData, Integer> sqlType = new HashMap<>(); final Table table = tableSchema.getTable(); table.columns() .forEach( column -> { - mysqlType.put( + sqlType.put( StringData.fromString(column.name()), column.jdbcType()); }); - return new GenericMapData(mysqlType); + return new GenericMapData(sqlType); } }), @@ -479,16 +445,16 @@ public enum OracleReadableMetaData { if (tableSchema == null) { return null; } - Map<String, Integer> mysqlType = new LinkedHashMap<>(); + Map<String, Integer> sqlType = new LinkedHashMap<>(); final Table table = tableSchema.getTable(); table.columns() .forEach( column -> { - mysqlType.put( + sqlType.put( column.name(), column.jdbcType()); }); - return mysqlType; + return sqlType; } private static String getMetaData(SourceRecord record, String tableNameKey) {