[inlong] branch master updated: [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario (#6323)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new ca24d2ede [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario (#6323) ca24d2ede is described below commit ca24d2edea046611f8c77d70b1465b550f146aaa Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Mon Oct 31 17:19:57 2022 +0800 [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario (#6323) * [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario * [INLONG-6322][Sort] Fix flushing update error * [INLONG-6322][Sort] Optimize flush handle * [INLONG-6322][Sort] Fix comment error --- .../table/DorisDynamicSchemaOutputFormat.java | 68 +- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java index c2db85789..9e8142ae7 100644 --- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java @@ -47,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * DorisDynamicSchemaOutputFormat, copy from {@link org.apache.doris.flink.table.DorisDynamicOutputFormat} @@ -76,11 +77,16 @@ public class DorisDynamicSchemaOutputFormat extends RichOutputFormat { private final String tablePattern; private final String dynamicSchemaFormat; private final boolean ignoreSingleTableErrors; -private final transient Map flushExceptionMap = new HashMap<>(); +private final Map flushExceptionMap = new HashMap<>(); +private final AtomicLong readInNum = new AtomicLong(0); +private final AtomicLong writeOutNum = new AtomicLong(0); +private final AtomicLong errorNum = new AtomicLong(0); +private final AtomicLong ddlNum = new AtomicLong(0); private long batchBytes = 0L; private int size; private DorisStreamLoad dorisStreamLoad; private transient volatile boolean closed = false; +private transient volatile boolean flushing = false; private transient ScheduledExecutorService scheduler; private transient ScheduledFuture scheduledFuture; private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat; @@ -131,10 +137,8 @@ public class DorisDynamicSchemaOutputFormat extends RichOutputFormat { this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("doris-streamload-output-format")); this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> { -synchronized (DorisDynamicSchemaOutputFormat.this) { -if (!closed) { -flush(); -} +if (!closed && !flushing) { +flush(); } }, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS); } @@ -158,7 +162,7 @@ public class DorisDynamicSchemaOutputFormat extends RichOutputFormat { addBatch(row); boolean valid = (executionOptions.getBatchSize() > 0 && size >= executionOptions.getBatchSize()) || batchBytes >= executionOptions.getMaxBatchBytes(); -if (valid) { +if (valid && !flushing) { flush(); } } @@ -168,8 +172,10 @@ public class DorisDynamicSchemaOutputFormat extends RichOutputFormat { if (row instanceof RowData) { RowData rowData = (RowData) row; JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0)); +readInNum.incrementAndGet(); boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode); if (isDDL) { +ddlNum.incrementAndGet(); // Ignore ddl change for now return; } @@ -226,6 +232,7 @@ public class DorisDynamicSchemaOutputFormat extends RichOutputFormat { } break; default: +errorNum.incrementAndGet(); throw n
[inlong] branch master updated (70bf3e0aa -> 93c2dc311)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 70bf3e0aa [INLONG-6361][Manager] Fix the deserialization exception with Jackson's version below 2.13 (#6362) add 93c2dc311 [INLONG-6353][Sort] Fix delete event handle error for doris connector (#6358) No new revisions were added by this update. Summary of changes: .../base/format/CanalJsonDynamicSchemaFormat.java | 3 + .../format/DebeziumJsonDynamicSchemaFormat.java| 3 + .../table/DorisDynamicSchemaOutputFormat.java | 145 +++-- .../inlong/sort/doris/table/DorisStreamLoad.java | 4 +- 4 files changed, 113 insertions(+), 42 deletions(-)
[inlong] branch master updated (e97a2c269 -> 4c20d9504)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from e97a2c269 [INLONG-6369][DataProxy] Use the defined message attribute Key value in InLong-Common (#6386) add 4c20d9504 [INLONG-6356][Sort] Oracle connector supports precision and binary type (#6357) No new revisions were added by this update. Summary of changes: .../table/RowDataDebeziumDeserializeSchema.java| 3 ++ .../cdc/oracle/table/OracleReadableMetaData.java | 41 +++--- 2 files changed, 31 insertions(+), 13 deletions(-)
[inlong] branch master updated: [INLONG-6402][Sort] Modify the metadata field of oracle connector (#6404)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new f8266fe35 [INLONG-6402][Sort] Modify the metadata field of oracle connector (#6404) f8266fe35 is described below commit f8266fe35c83456eabe356ff27927deafd6342f0 Author: emhui <111486498+e-m...@users.noreply.github.com> AuthorDate: Mon Nov 7 11:17:39 2022 +0800 [INLONG-6402][Sort] Modify the metadata field of oracle connector (#6404) * [INLONG-6402][Sort] Modify the metadata field of oracle connector * [INLONG-6402][Sort] Remove extra punctuation * [INLONG-6402][Sort] Modify the metadata field of oracle connector to be consistent with mysql connector * [INLONG-6402][Sort] Compatible with open source oracle connector * [INLONG-6402][Sort] Remove incorrect information * [INLONG-6402][Sort] Remove unused meta field Co-authored-by: menghuiyu --- .../org/apache/inlong/common/enums/MetaField.java | 5 + .../org/apache/inlong/sort/protocol/Metadata.java | 11 +- .../protocol/node/extract/OracleExtractNode.java | 55 ++- .../node/extract/OracleExtractNodeTest.java| 14 +- .../cdc/oracle/table/OracleReadableMetaData.java | 166 - 5 files changed, 137 insertions(+), 114 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java index f88da9c4d..4a065583d 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java @@ -116,6 +116,11 @@ public enum MetaField { */ MYSQL_TYPE, +/** + * The table structure. It is only used for Oracle database + */ +ORACLE_TYPE, + /** * Primary key field name. Currently, it is used for MySQL database. */ diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java index 71c37e4ae..6c2ac4d2d 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java @@ -54,14 +54,6 @@ public interface Metadata { case OP_TS: metadataKey = "op_ts"; break; -case DATA: -case DATA_BYTES: -metadataKey = "meta.data"; -break; -case DATA_CANAL: -case DATA_BYTES_CANAL: -metadataKey = "meta.data_canal"; -break; default: throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", @@ -106,6 +98,9 @@ public interface Metadata { case MYSQL_TYPE: metadataType = "MAP"; break; +case ORACLE_TYPE: +metadataType = "MAP"; +break; case PK_NAMES: metadataType = "ARRAY"; break; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java index a1bad2523..594ccdd60 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java @@ -150,7 +150,58 @@ public class OracleExtractNode extends ExtractNode implements InlongMetric, Meta @Override public Set supportedMetaFields() { return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.DATABASE_NAME, -MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.DATA, MetaField.DATA_CANAL, -MetaField.DATA_BYTES, MetaField.DATA_BYTES_CANAL); +MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.OP_TYPE, MetaField.DATA, MetaField.DATA_BYTES, +MetaField.DATA_CANAL, MetaField.DATA_BYTES_CANAL, MetaField.IS_DDL, MetaField.TS, +MetaField.SQL_TYPE, MetaField.ORACLE_TYPE, MetaField.PK_NAMES); } + +@Override +public String getMetadataKey(MetaField metaField) { +String metadataKey; +switch (metaField) { +case TABLE_NAME: +metadataKey = "meta.table_name"; +break; +case DATABASE_NAME: +metadataKey = "meta.database_name"; +
[inlong] branch master updated: [INLONG-6382][Sort] Iceberg data is messed up when the source table has no primary key in multiple sink scenes (#6383)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new a970fa616 [INLONG-6382][Sort] Iceberg data is messed up when the source table has no primary key in multiple sink scenes (#6383) a970fa616 is described below commit a970fa6167790cfd9d0797e92b5ea26d5f4712e3 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Mon Nov 7 11:22:30 2022 +0800 [INLONG-6382][Sort] Iceberg data is messed up when the source table has no primary key in multiple sink scenes (#6383) * [INLONG-6379][Sort] Bugfix:iceberg miss metric data in multiple sink (#6381) * [INLONG-6382][Sort] Bugfix:Iceberg miss data when source table do not have primary key in multiple sink scences * [INLONG-6382][Sort] Import auto generated primary key config in multiple sink scences Co-authored-by: thesumery <158971...@qq.com> --- .../org/apache/inlong/sort/base/Constants.java | 9 .../inlong/sort/base/sink/MultipleSinkOption.java | 26 +- .../sort/iceberg/FlinkDynamicTableFactory.java | 2 ++ .../inlong/sort/iceberg/IcebergTableSink.java | 2 ++ .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 3 +-- .../sink/multiple/IcebergMultipleStreamWriter.java | 8 +-- 6 files changed, 40 insertions(+), 10 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 19c58e9c1..c0c7f8808 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -163,4 +163,13 @@ public final class Constants { .booleanType() .defaultValue(true) .withDescription("Whether ignore the single table erros when multiple sink writing scenario."); + +public static final ConfigOption SINK_MULTIPLE_PK_AUTO_GENERATED = +ConfigOptions.key("sink.multiple.pk-auto-generated") +.booleanType() +.defaultValue(false) +.withDescription("Whether generated pk fields as whole data when source table does not have a " ++ "primary key."); + + } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java index 77c924b95..69ce3fc68 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java @@ -35,22 +35,26 @@ public class MultipleSinkOption implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(MultipleSinkOption.class); -private String format; +private final String format; -private SchemaUpdateExceptionPolicy schemaUpdatePolicy; +private final SchemaUpdateExceptionPolicy schemaUpdatePolicy; -private String databasePattern; +private final String databasePattern; -private String tablePattern; +private final String tablePattern; + +private final boolean pkAutoGenerated; public MultipleSinkOption(String format, SchemaUpdateExceptionPolicy schemaUpdatePolicy, String databasePattern, -String tablePattern) { +String tablePattern, +boolean pkAutoGenerated) { this.format = format; this.schemaUpdatePolicy = schemaUpdatePolicy; this.databasePattern = databasePattern; this.tablePattern = tablePattern; +this.pkAutoGenerated = pkAutoGenerated; } public String getFormat() { @@ -69,6 +73,10 @@ public class MultipleSinkOption implements Serializable { return tablePattern; } +public boolean isPkAutoGenerated() { +return pkAutoGenerated; +} + public static Builder builder() { return new Builder(); } @@ -78,6 +86,7 @@ public class MultipleSinkOption implements Serializable { private SchemaUpdateExceptionPolicy schemaUpdatePolicy; private String databasePattern; private String tablePattern; +private boolean pkAutoGenerated; public MultipleSinkOption.Builder withFormat(String format) { this.format = format; @@ -99,8 +108,13 @@ public class MultipleSinkOption implements Serializable {
[inlong-website] branch master updated: [INLONG-579][Doc] Add doc for oracle connector for all migrate (#584)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong-website.git The following commit(s) were added to refs/heads/master by this push: new f59f2a7186 [INLONG-579][Doc] Add doc for oracle connector for all migrate (#584) f59f2a7186 is described below commit f59f2a71867a85f6c447b7ea37fea67f12a64e06 Author: emhui <111486498+e-m...@users.noreply.github.com> AuthorDate: Tue Nov 8 14:29:35 2022 +0800 [INLONG-579][Doc] Add doc for oracle connector for all migrate (#584) * [INLONG-579][Doc] Add doc for oracle connector for all migrate --- docs/data_node/extract_node/oracle-cdc.md | 109 ++- .../current/data_node/extract_node/oracle-cdc.md | 117 +++-- 2 files changed, 216 insertions(+), 10 deletions(-) diff --git a/docs/data_node/extract_node/oracle-cdc.md b/docs/data_node/extract_node/oracle-cdc.md index f861091bcf..e330965912 100644 --- a/docs/data_node/extract_node/oracle-cdc.md +++ b/docs/data_node/extract_node/oracle-cdc.md @@ -294,7 +294,7 @@ TODO: It will be supported in the future. required (none) String - Table name of the Oracle database to monitor. + Table name of the Oracle database to monitor. The value is of the form <schema_name>.<table_name> port @@ -328,6 +328,13 @@ TODO: It will be supported in the future. String Inlong metric label, format of value is groupId=xxgroup&streamId=xxstream&nodeId=xxnode. + + source.multiple.enable + optional + false + Boolean + Whether to enable multiple schema and table migration. If it is' true ', Oracle Extract Node will compress the physical field of the table into a special meta field 'data_canal' in the format of 'canal json'. + @@ -378,6 +385,61 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0. + + meta.table_name + STRING NOT NULL + Name of the table that contain the row. + + + meta.schema_name + STRING NOT NULL + Name of the schema that contain the row. + + + meta.database_name + STRING NOT NULL + Name of the database that contain the row. + + + meta.op_ts + TIMESTAMP_LTZ(3) NOT NULL + It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0. + + + meta.op_type + STRING + Type of database operation, such as INSERT/DELETE, etc. + + + meta.data_canal + STRING/BYTES + Data for rows in `canal-json` format only exists when the `source.multiple.enable` option is 'true'. + + + meta.is_ddl + BOOLEAN + Whether the DDL statement. + + + meta.ts + TIMESTAMP_LTZ(3) NOT NULL + The current time when the row was received and processed. + + + meta.sql_type + MAP + Mapping of sql_type table fields to java data type IDs. + + + meta.oracle_type + MAP + Structure of the table. + + + meta.pk_names + ARRAY + Primay key name of the table. + @@ -387,7 +449,18 @@ CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, schema_name STRING METADATA FROM 'schema_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, -operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, +op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, +meta_db_name STRING METADATA FROM 'meta.database_name' VIRTUAL, +meta_schema_name STRING METADATA FROM 'meta.schema_name' VIRTUAL, +meta_table_name STRING METADATA FROM 'meta.table_name' VIRTUAL, +meat_op_ts TIMESTAMP_LTZ(3) METADATA FROM 'meta.op_ts' VIRTUAL, +meta_op_type STRING METADATA FROM 'meta.op_type' VIRTUAL, +meta_data_canal STRING METADATA FROM 'meta.data_canal' VIRTUAL, +meta_is_ddl BOOLEAN METADATA FROM 'meta.is_ddl' VIRTUAL, +meta_ts TIMESTAMP_LTZ(3) METADATA FROM 'meta.ts' VIRTUAL, +meta_sql_type MAP METADATA FROM 'meta.sql_type' VIRTUAL, +meat_oracle_type MAP METADATA FROM 'meta.oracle_type' VIRTUAL, +meta_pk_names ARRAY METADATA FROM 'meta.pk_names' VIRTUAL ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, @@ -401,7 +474,7 @@ CREATE TABLE products ( 'password' =
[inlong] branch master updated: [INLONG-6471][Sort] MySQL connector metric restore lost init data for using sourceFunction (#6474)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 924df34b9 [INLONG-6471][Sort] MySQL connector metric restore lost init data for using sourceFunction (#6474) 924df34b9 is described below commit 924df34b9362c276a5183c7ef18c69da125e38ef Author: Xin Gong AuthorDate: Wed Nov 9 10:35:53 2022 +0800 [INLONG-6471][Sort] MySQL connector metric restore lost init data for using sourceFunction (#6474) --- .../org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java | 4 1 file changed, 4 insertions(+) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java index 80d6812f2..5ade85306 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java @@ -80,6 +80,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; @@ -438,6 +440,8 @@ public class DebeziumSourceFunction extends RichSourceFunction MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(inlongAudit) +.withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L) +.withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) {
[inlong] branch master updated: [INLONG-6379][Sort] Complement iceberg multiple sink metric data compute (#6472)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new c8864f8f4 [INLONG-6379][Sort] Complement iceberg multiple sink metric data compute (#6472) c8864f8f4 is described below commit c8864f8f43b1b1a42a2b647723eed1a2b460c803 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Nov 9 10:39:06 2022 +0800 [INLONG-6379][Sort] Complement iceberg multiple sink metric data compute (#6472) --- .../inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java index 2c67cbd51..56aa77303 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java @@ -210,6 +210,9 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction
[inlong] branch master updated: [INLONG-6529] Fix microtime time zone error in mysql connector (#6530)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new e223c34c5 [INLONG-6529] Fix microtime time zone error in mysql connector (#6530) e223c34c5 is described below commit e223c34c54ca3c6cae251c9e340b138993d36568 Author: Schnapps AuthorDate: Tue Nov 15 15:40:34 2022 +0800 [INLONG-6529] Fix microtime time zone error in mysql connector (#6530) --- .../sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java index a7231a83e..96777c5ee 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java @@ -630,7 +630,7 @@ public final class RowDataDebeziumDeserializeSchema switch (schemaName) { case MicroTime.SCHEMA_NAME: Instant instant = Instant.ofEpochMilli((Long) fieldValue / 1000); -fieldValue = timeFormatter.format(LocalDateTime.ofInstant(instant, serverTimeZone)); +fieldValue = timeFormatter.format(LocalDateTime.ofInstant(instant, ZONE_UTC)); break; case Date.SCHEMA_NAME: fieldValue = dateFormatter.format(LocalDate.ofEpochDay((Integer) fieldValue));
[inlong] branch master updated: [INLONG-6548][Sort] Optimize metadata field naming for format of canal-json (#6549)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 489529262 [INLONG-6548][Sort] Optimize metadata field naming for format of canal-json (#6549) 489529262 is described below commit 4895292623c22a1d268bc8ce25366143bcd18005 Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Wed Nov 16 11:08:21 2022 +0800 [INLONG-6548][Sort] Optimize metadata field naming for format of canal-json (#6549) --- .../protocol/node/extract/KafkaExtractNode.java| 2 +- .../sort/protocol/node/load/KafkaLoadNode.java | 8 +- .../node/extract/KafkaExtractNodeTest.java | 3 +- .../sort/protocol/node/load/KafkaLoadNodeTest.java | 2 +- .../canal/CanalJsonEnhancedDecodingFormat.java | 19 ++ .../CanalJsonEnhancedDeserializationSchema.java| 260 - .../canal/CanalJsonEnhancedEncodingFormat.java | 19 ++ .../CanalJsonEnhancedSerializationSchema.java | 52 ++--- 8 files changed, 217 insertions(+), 148 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java index 02da2bef3..fb6693548 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java @@ -215,7 +215,7 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad metadataKey = "value.event-timestamp"; break; case OP_TYPE: -metadataKey = "value.op-type"; +metadataKey = "value.type"; break; case IS_DDL: metadataKey = "value.is-ddl"; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java index be326b478..26fa424ca 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java @@ -224,7 +224,7 @@ public class KafkaLoadNode extends LoadNode implements InlongMetric, Metadata, S metadataKey = "value.event-timestamp"; break; case OP_TYPE: -metadataKey = "value.op-type"; +metadataKey = "value.type"; break; case DATA: case DATA_CANAL: @@ -257,8 +257,8 @@ public class KafkaLoadNode extends LoadNode implements InlongMetric, Metadata, S @Override public Set supportedMetaFields() { return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.OP_TYPE, -MetaField.DATABASE_NAME, MetaField.SQL_TYPE, MetaField.PK_NAMES, MetaField.TS, -MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE, MetaField.BATCH_ID, -MetaField.UPDATE_BEFORE, MetaField.DATA_CANAL, MetaField.DATA); +MetaField.DATABASE_NAME, MetaField.SQL_TYPE, MetaField.PK_NAMES, MetaField.TS, +MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE, MetaField.BATCH_ID, +MetaField.UPDATE_BEFORE, MetaField.DATA_CANAL, MetaField.DATA); } } diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java index 661d6f547..1d41c96fd 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java @@ -34,7 +34,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; - import static org.junit.Assert.assertEquals; /** @@ -86,7 +85,7 @@ public class KafkaExtractNodeTest extends SerializeBaseTest { formatMap.put(MetaField.PROCESS_TIME, "AS PROCTIME()"); formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'value.table'"); formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'value.database'"); -formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'value.op-type'"); +formatMap.put(MetaFi
[inlong] branch master updated (16ddf5f5b -> 17f01a468)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 16ddf5f5b [INLONG-6636][Sort] Keep metric computing consistent for source MySQL and sink HBase (#6637) add 17f01a468 [INLONG-6615][Sort] Fix upsert kafka will throw NPE when get delete record (#6616) No new revisions were added by this update. Summary of changes: .../main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java | 4 +++- .../inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java | 7 --- 2 files changed, 7 insertions(+), 4 deletions(-)
[inlong] branch master updated: [INLONG-6654][Sort] Supports s3 side-output for dirty data (#6655)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 21bfeb155 [INLONG-6654][Sort] Supports s3 side-output for dirty data (#6655) 21bfeb155 is described below commit 21bfeb1555f17a49c40a771f3fc52e44c048a20d Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Tue Nov 29 15:46:47 2022 +0800 [INLONG-6654][Sort] Supports s3 side-output for dirty data (#6655) --- inlong-sort/sort-connectors/base/pom.xml | 5 + .../sort/base/dirty/sink/DirtySinkFactory.java | 7 +- .../sort/base/dirty/sink/log/LogDirtySink.java | 4 +- .../base/dirty/sink/log/LogDirtySinkFactory.java | 10 +- .../sort/base/dirty/sink/s3/S3DirtySink.java | 279 + .../base/dirty/sink/s3/S3DirtySinkFactory.java | 148 +++ .../inlong/sort/base/dirty/sink/s3/S3Helper.java | 100 .../inlong/sort/base/dirty/sink/s3/S3Options.java | 241 ++ .../org.apache.flink.table.factories.Factory | 3 +- licenses/inlong-sort-connectors/LICENSE| 2 +- pom.xml| 7 + 11 files changed, 795 insertions(+), 11 deletions(-) diff --git a/inlong-sort/sort-connectors/base/pom.xml b/inlong-sort/sort-connectors/base/pom.xml index 8e4d2701e..675190c51 100644 --- a/inlong-sort/sort-connectors/base/pom.xml +++ b/inlong-sort/sort-connectors/base/pom.xml @@ -42,6 +42,11 @@ ${project.version} provided + + +com.amazonaws +aws-java-sdk-s3 + diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java index b6725ddd8..07784b443 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java @@ -17,12 +17,13 @@ package org.apache.inlong.sort.base.dirty.sink; -import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.DynamicTableFactory.Context; +import org.apache.flink.table.factories.Factory; /** * Dirty sink factory class, it is used to create dirty sink */ -public interface DirtySinkFactory extends DynamicTableFactory { +public interface DirtySinkFactory extends Factory { /** * Create dirty sink @@ -31,6 +32,6 @@ public interface DirtySinkFactory extends DynamicTableFactory { * @param The data mode that is handled by the dirty sink * @return A dirty sink */ - DirtySink createDirtySink(DynamicTableFactory.Context context); + DirtySink createDirtySink(Context context); } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java index bf5a4f135..a57c981fe 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java @@ -46,7 +46,7 @@ public class LogDirtySink implements DirtySink { private static final long serialVersionUID = 1L; -private static final Logger LOG = LoggerFactory.getLogger(LogDirtySink.class); +private static final Logger LOGGER = LoggerFactory.getLogger(LogDirtySink.class); private final RowData.FieldGetter[] fieldGetters; private final String format; @@ -85,7 +85,7 @@ public class LogDirtySink implements DirtySink { // Only support csv format when the row is not a 'RowData' and 'JsonNode' value = FormatUtils.csvFormat(data, labelMap, fieldDelimiter); } -LOG.info("[{}] {}", dirtyData.getLogTag(), value); +LOGGER.info("[{}] {}", dirtyData.getLogTag(), value); } private String format(RowData data, Map labels) throws JsonProcessingException { diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java index 93a12f584..c3720a93d 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirt
[inlong] branch master updated (0671c3a45 -> 40e888598)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 0671c3a45 [INLONG-6678][Dashboard] Add sinkType parameter in sinkFieldList (#6679) add 40e888598 [INLONG-6584][Sort] Add read phase metric and table level metric for MySQL (#6670) No new revisions were added by this update. Summary of changes: .../org/apache/inlong/sort/base/Constants.java | 32 +++ .../apache/inlong/sort/base/enums/ReadPhase.java | 80 +++ .../inlong/sort/base/metric/MetricOption.java | 21 +- .../inlong/sort/base/metric/MetricState.java | 11 + .../inlong/sort/base/metric/SourceMetricData.java | 4 +- .../base/metric/phase/ReadPhaseMetricData.java | 106 + .../sort/base/metric/sub/SourceSubMetricData.java} | 51 ++--- .../base/metric/sub/SourceTableMetricData.java | 240 + .../inlong/sort/base/util/MetricStateUtils.java| 73 +++ .../sort/cdc/debezium/DebeziumSourceFunction.java | 31 ++- .../apache/inlong/sort/cdc/mysql/MySqlSource.java | 9 +- .../sort/cdc/mysql/table/MySqlTableSource.java | 1 + 12 files changed, 621 insertions(+), 38 deletions(-) create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/enums/ReadPhase.java create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/phase/ReadPhaseMetricData.java copy inlong-sort/{sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarBinaryTypeInfo.java => sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceSubMetricData.java} (56%) create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
[inlong] branch master updated: [INLONG-6671][Sort] Supports dirty data side-output for kafka connector (#6688)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 09bae48fe [INLONG-6671][Sort] Supports dirty data side-output for kafka connector (#6688) 09bae48fe is described below commit 09bae48fe06662bc8cd795a9272573a7ccd9135a Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Thu Dec 1 14:47:54 2022 +0800 [INLONG-6671][Sort] Supports dirty data side-output for kafka connector (#6688) --- .../base/dirty/sink/log/LogDirtySinkFactory.java | 9 +- .../base/dirty/sink/s3/S3DirtySinkFactory.java | 8 +- .../base/dirty/utils/DirtySinkFactoryUtils.java| 48 ++ .../kafka/DynamicKafkaSerializationSchema.java | 161 - .../apache/inlong/sort/kafka/KafkaDynamicSink.java | 18 ++- .../table/DynamicKafkaDeserializationSchema.java | 59 +++- .../sort/kafka/table/KafkaDynamicSource.java | 22 ++- .../sort/kafka/table/KafkaDynamicTableFactory.java | 49 +-- .../table/UpsertKafkaDynamicTableFactory.java | 25 +++- .../inlong/sort/parser/KafkaLoadSqlParseTest.java | 150 +++ 10 files changed, 481 insertions(+), 68 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java index c3720a93d..b07b581f1 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java @@ -18,6 +18,8 @@ package org.apache.inlong.sort.base.dirty.sink.log; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.factories.DynamicTableFactory.Context; import org.apache.flink.table.factories.FactoryUtil; import org.apache.inlong.sort.base.dirty.sink.DirtySink; @@ -38,9 +40,10 @@ public class LogDirtySinkFactory implements DirtySinkFactory { @Override public DirtySink createDirtySink(Context context) { -FactoryUtil.validateFactoryOptions(this, context.getConfiguration()); -String format = context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FORMAT); -String fieldDelimiter = context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER); +ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions()); +FactoryUtil.validateFactoryOptions(this, config); +String format = config.get(DIRTY_SIDE_OUTPUT_FORMAT); +String fieldDelimiter = config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER); return new LogDirtySink<>(format, fieldDelimiter, context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType()); } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java index d9ec26434..16310926c 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java @@ -19,6 +19,7 @@ package org.apache.inlong.sort.base.dirty.sink.s3; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.factories.DynamicTableFactory.Context; @@ -79,9 +80,10 @@ public class S3DirtySinkFactory implements DirtySinkFactory { @Override public DirtySink createDirtySink(Context context) { -FactoryUtil.validateFactoryOptions(this, context.getConfiguration()); -validate(context.getConfiguration()); -return new S3DirtySink<>(getS3Options(context.getConfiguration()), +ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions()); +FactoryUtil.validateFactoryOptions(this, config); +validate(config); +return new S3DirtySink<>(getS3Options(config), context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType()); } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/DirtySinkFactoryUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org
[inlong] branch master updated (bffb911d8 -> c1a07c38c)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from bffb911d8 [INLONG-6732][Manager] Fix wrong SortSDK topic properties (#6733) add c1a07c38c [INLONG-6724][Sort] Supports dirty data side-output for doris sink (#6725) No new revisions were added by this update. Summary of changes: .../apache/inlong/sort/base/dirty/DirtyType.java | 17 +- .../table/DorisDynamicSchemaOutputFormat.java | 339 + .../sort/doris/table/DorisDynamicTableFactory.java | 12 +- .../sort/doris/table/DorisDynamicTableSink.java| 21 +- .../DorisExtractNodeToDorisLoadNodeTest.java | 13 +- .../inlong/sort/parser/DorisMultipleSinkTest.java | 18 +- 6 files changed, 278 insertions(+), 142 deletions(-)
[inlong] branch master updated: [INLONG-6575][Sort] Add dirty data metric for Filesystem (#6726)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new b7c6f49d7 [INLONG-6575][Sort] Add dirty data metric for Filesystem (#6726) b7c6f49d7 is described below commit b7c6f49d7fd1cd6c86419f991bb74426ea04252f Author: Xin Gong AuthorDate: Tue Dec 6 11:51:31 2022 +0800 [INLONG-6575][Sort] Add dirty data metric for Filesystem (#6726) --- .../filesystem/stream/AbstractStreamingWriter.java | 46 +- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java index 568b639f1..e5e74e62a 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java @@ -42,6 +42,10 @@ import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; import org.apache.inlong.sort.base.util.MetricStateUtils; +import java.nio.charset.StandardCharsets; + +import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; @@ -78,7 +82,8 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe private transient long currentWatermark; -private SinkMetricData metricData; +private Long dataSize = 0L; +private Long rowSize = 0L; public AbstractStreamingWriter( long bucketCheckInterval, @@ -118,6 +123,8 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe .withInlongAudit(inlongAudit) .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) +.withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L) +.withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -129,7 +136,20 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe * Commit up to this checkpoint id. */ protected void commitUpToCheckpoint(long checkpointId) throws Exception { -helper.commitUpToCheckpoint(checkpointId); +try { +helper.commitUpToCheckpoint(checkpointId); +if (sinkMetricData != null) { +sinkMetricData.invoke(rowSize, dataSize); +} +} catch (Exception e) { +if (sinkMetricData != null) { +sinkMetricData.invokeDirty(rowSize, dataSize); +} +LOG.error("fileSystem sink commitUpToCheckpoint.", e); +} finally { +rowSize = 0L; +dataSize = 0L; +} } @Override @@ -192,14 +212,20 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe } @Override -public void processElement(StreamRecord element) throws Exception { -helper.onElement( -element.getValue(), -getProcessingTimeService().getCurrentProcessingTime(), -element.hasTimestamp() ? element.getTimestamp() : null, -currentWatermark); -if (metricData != null) { -metricData.invokeWithEstimate(element.getValue()); +public void processElement(StreamRecord element) { +try { +helper.onElement( +element.getValue(), +getProcessingTimeService().getCurrentProcessingTime(), +element.hasTimestamp() ? element.getTimestamp() : null, +currentWatermark); +rowSize = rowSize + 1; +dataSize = dataSize + element.getValue().toString().getBytes(StandardCharsets.UTF_8).length; +} catch (Exception e) { +if (sinkMetricData != null) { +sinkMetricData.invokeDirty(1L, element.getValue().toString().getBytes(StandardCharsets.UTF_8).length); +} +LOG.error("fileSystem sink processElement.", e); } }
[inlong] branch master updated: [INLONG-6751][Sort] Add read phase metric and table level metric for Oracle (#6808)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 3126faaf5 [INLONG-6751][Sort] Add read phase metric and table level metric for Oracle (#6808) 3126faaf5 is described below commit 3126faaf5e8851aa62a8772eeb4096bf47034613 Author: emhui <111486498+e-m...@users.noreply.github.com> AuthorDate: Tue Dec 13 16:49:37 2022 +0800 [INLONG-6751][Sort] Add read phase metric and table level metric for Oracle (#6808) --- .../base/metric/sub/SourceTableMetricData.java | 56 --- .../sort/cdc/base/util/CallbackCollector.java | 47 .../inlong/sort/cdc/oracle/OracleSource.java | 9 +++- .../oracle/debezium/DebeziumSourceFunction.java| 63 -- .../sort/cdc/oracle/table/OracleTableSource.java | 3 +- 5 files changed, 152 insertions(+), 26 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java index 112000903..a11d73711 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java @@ -122,9 +122,14 @@ public class SourceTableMetricData extends SourceMetricData implements SourceSub String metricGroupLabels = labels.entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue()) .collect(Collectors.joining(DELIMITER)); StringBuilder labelBuilder = new StringBuilder(metricGroupLabels); - labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0]) - .append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]); - +if (schemaInfoArray.length == 2) { + labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0]) + .append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]); +} else if (schemaInfoArray.length == 3) { + labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0]) + .append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaInfoArray[1]) + .append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[2]); +} MetricOption metricOption = MetricOption.builder() .withInitRecords(subMetricState != null ? subMetricState.getMetricValue(NUM_RECORDS_IN) : 0L) .withInitBytes(subMetricState != null ? subMetricState.getMetricValue(NUM_BYTES_IN) : 0L) @@ -135,14 +140,18 @@ public class SourceTableMetricData extends SourceMetricData implements SourceSub } /** - * build record schema identify,in the form of database.table + * build record schema identify,in the form of database.schema.table or database.table * * @param database the database name of record + * @param schema the schema name of record * @param table the table name of record * @return the record schema identify */ -public String buildSchemaIdentify(String database, String table) { -return database + Constants.SEMICOLON + table; +public String buildSchemaIdentify(String database, String schema, String table) { +if (schema == null) { +return database + Constants.SEMICOLON + table; +} +return database + Constants.SEMICOLON + schema + Constants.SEMICOLON + table; } /** @@ -168,7 +177,7 @@ public class SourceTableMetricData extends SourceMetricData implements SourceSub outputMetricsWithEstimate(data); return; } -String identify = buildSchemaIdentify(database, table); +String identify = buildSchemaIdentify(database, null, table); SourceMetricData subSourceMetricData; if (subSourceMetricMap.containsKey(identify)) { subSourceMetricData = subSourceMetricMap.get(identify); @@ -186,6 +195,39 @@ public class SourceTableMetricData extends SourceMetricData implements SourceSub outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE : ReadPhase.INCREASE_PHASE); } +/** + * output metrics with estimate + * + * @param database the database name of record + * @param schema the schema name of record + * @param table the table name of record +
[inlong] branch master updated: [INLONG-6869][Sort] Supports dirty data side-output for elasticsearch sink (#6870)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 99947c98d [INLONG-6869][Sort] Supports dirty data side-output for elasticsearch sink (#6870) 99947c98d is described below commit 99947c98d6437a694ccd3ffd1177f88add12dd31 Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Wed Dec 14 15:23:38 2022 +0800 [INLONG-6869][Sort] Supports dirty data side-output for elasticsearch sink (#6870) --- .../inlong/sort/base/dirty/DirtySinkHelper.java| 108 ++ .../apache/inlong/sort/base/dirty/DirtyType.java | 16 +++ .../sort/elasticsearch6/ElasticsearchSink.java | 20 +++- .../table/Elasticsearch6DynamicSink.java | 16 ++- .../table/Elasticsearch6DynamicSinkFactory.java| 14 ++- .../sort/elasticsearch7/ElasticsearchSink.java | 19 +++- .../table/Elasticsearch7DynamicSink.java | 16 ++- .../table/Elasticsearch7DynamicSinkFactory.java| 14 ++- .../sort/elasticsearch/ElasticsearchSinkBase.java | 122 +++- .../table/RowElasticsearchSinkFunction.java| 124 ++--- .../sort/parser/ElasticsearchSqlParseTest.java | 10 +- 11 files changed, 369 insertions(+), 110 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java new file mode 100644 index 0..a962b974e --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.dirty; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.Serializable; + +/** + * Dirty sink helper, it helps dirty data sink for {@link DirtySink} + * @param + */ +public class DirtySinkHelper implements Serializable { + +private static final long serialVersionUID = 1L; +private static final Logger LOGGER = LoggerFactory.getLogger(DirtySinkHelper.class); + +private DirtyOptions dirtyOptions; +private final @Nullable DirtySink dirtySink; + +public DirtySinkHelper(DirtyOptions dirtyOptions, @Nullable DirtySink dirtySink) { +this.dirtyOptions = Preconditions.checkNotNull(dirtyOptions, "dirtyOptions is null"); +this.dirtySink = dirtySink; +} + +/** + * Open for dirty sink + * + * @param configuration The configuration that is used for dirty sink + */ +public void open(Configuration configuration) { +if (dirtySink != null) { +try { +dirtySink.open(configuration); +} catch (Exception e) { +throw new RuntimeException(e); +} +} +} + +/** + * Dirty data sink + * @param dirtyData The dirty data + * @param dirtyType The dirty type {@link DirtyType} + * @param e The cause of dirty data + */ +public void invoke(T dirtyData, DirtyType dirtyType, Throwable e) { +if (!dirtyOptions.ignoreDirty()) { +RuntimeException ex; +if (e instanceof RuntimeException) { +ex = (RuntimeException) e; +} else { +ex = new RuntimeException(e); +} +throw ex; +} +if (dirtySink != null) { +DirtyData.Builder builder = DirtyData.builder(); +try { +builder.setData(dirtyData) +.setDirtyType(dirtyType) +.setLabels(dirtyOptions.getLabels()) +.setLogTag(dirtyOptions.getLogTag()) +.setDirtyMessage(e.getMessage()) +
[inlong] branch master updated (59c83c226 -> 8d0397700)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 59c83c226 [INLONG-6890][DataProxy] Fix tubemq MQ_TYPE enum and optimize sink code and config file (#6891) add 8d0397700 [INLONG-6884][Sort] Add dirty message for kafka connector (#6885) No new revisions were added by this update. Summary of changes: .../org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java | 3 +++ .../inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-)
[inlong] branch master updated: [INLONG-7186][Sort] Fix time zone incorrect (#7187)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new d7a639b5b [INLONG-7186][Sort] Fix time zone incorrect (#7187) d7a639b5b is described below commit d7a639b5b215e4df95ff6b4c717654557a5be247 Author: emhui <111486498+e-m...@users.noreply.github.com> AuthorDate: Mon Jan 9 10:57:06 2023 +0800 [INLONG-7186][Sort] Fix time zone incorrect (#7187) * [INLONG-7186][Sort] Fix time zone incorrect * [INLONG-7186][Sort] Remove unused imports --- .../apache/inlong/sort/base/format/JsonToRowDataConverters.java| 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java index b30e707ec..5d8b8206b 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.base.format; +import java.sql.Timestamp; +import java.time.ZoneId; import org.apache.flink.annotation.Internal; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -49,7 +51,6 @@ import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; -import java.time.ZoneOffset; import java.time.format.DateTimeParseException; import java.time.temporal.TemporalAccessor; import java.time.temporal.TemporalQueries; @@ -253,7 +254,7 @@ public class JsonToRowDataConverters implements Serializable { LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime()); LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate()); -return TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, localTime)); +return TimestampData.fromEpochMillis(Timestamp.valueOf(LocalDateTime.of(localDate, localTime)).getTime()); } private TimestampData convertToTimestampWithLocalZone(JsonNode jsonNode) { @@ -277,7 +278,7 @@ public class JsonToRowDataConverters implements Serializable { LocalDate localDate = parsedTimestampWithLocalZone.query(TemporalQueries.localDate()); return TimestampData.fromInstant( -LocalDateTime.of(localDate, localTime).toInstant(ZoneOffset.UTC)); +LocalDateTime.of(localDate, localTime).atZone(ZoneId.systemDefault()).toInstant()); } private StringData convertToString(JsonNode jsonNode) {
[inlong] branch master updated: [INLONG-7335][Sort] Fix Hbase connector lost spi file when shade (#7336)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 1385e370c [INLONG-7335][Sort] Fix Hbase connector lost spi file when shade (#7336) 1385e370c is described below commit 1385e370ce521108ed87fe206c32a91eacaf73bd Author: Xin Gong AuthorDate: Wed Feb 8 16:09:45 2023 +0800 [INLONG-7335][Sort] Fix Hbase connector lost spi file when shade (#7336) --- inlong-sort/sort-connectors/hbase/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/inlong-sort/sort-connectors/hbase/pom.xml b/inlong-sort/sort-connectors/hbase/pom.xml index fd26a1485..019ae7819 100644 --- a/inlong-sort/sort-connectors/hbase/pom.xml +++ b/inlong-sort/sort-connectors/hbase/pom.xml @@ -91,6 +91,7 @@ hbase-default.xml hbase-default.xml + false
[inlong] branch master updated: [INLONG-7377][Sort] Fix sort-dist protobuf conflicts with protobuf in sort-connectors (#7378)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new a8a65810e [INLONG-7377][Sort] Fix sort-dist protobuf conflicts with protobuf in sort-connectors (#7378) a8a65810e is described below commit a8a65810e7639292e021b0a039a7b95b7629a4d1 Author: Schnapps AuthorDate: Thu Feb 16 15:27:48 2023 +0800 [INLONG-7377][Sort] Fix sort-dist protobuf conflicts with protobuf in sort-connectors (#7378) --- inlong-sort/sort-connectors/filesystem/pom.xml| 1 + inlong-sort/sort-connectors/hbase/pom.xml | 1 + inlong-sort/sort-connectors/hudi/pom.xml | 1 + inlong-sort/sort-connectors/iceberg/pom.xml | 1 + inlong-sort/sort-connectors/kafka/pom.xml | 1 + inlong-sort/sort-connectors/mongodb-cdc/pom.xml | 1 + inlong-sort/sort-connectors/mysql-cdc/pom.xml | 1 + inlong-sort/sort-connectors/oracle-cdc/pom.xml| 1 + inlong-sort/sort-connectors/postgres-cdc/pom.xml | 1 + inlong-sort/sort-connectors/pulsar/pom.xml| 1 + inlong-sort/sort-connectors/sqlserver-cdc/pom.xml | 1 + inlong-sort/sort-dist/pom.xml | 4 ++-- 12 files changed, 13 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-connectors/filesystem/pom.xml b/inlong-sort/sort-connectors/filesystem/pom.xml index 54545dab1..57735743a 100644 --- a/inlong-sort/sort-connectors/filesystem/pom.xml +++ b/inlong-sort/sort-connectors/filesystem/pom.xml @@ -53,6 +53,7 @@ org.apache.inlong:* +com.google.protobuf:* diff --git a/inlong-sort/sort-connectors/hbase/pom.xml b/inlong-sort/sort-connectors/hbase/pom.xml index 019ae7819..823992ac0 100644 --- a/inlong-sort/sort-connectors/hbase/pom.xml +++ b/inlong-sort/sort-connectors/hbase/pom.xml @@ -108,6 +108,7 @@ org.apache.commons:commons-crypto org.apache.commons:commons-lang3 io.netty:netty-all +com.google.protobuf:* io.dropwizard.metrics:metrics-core diff --git a/inlong-sort/sort-connectors/hudi/pom.xml b/inlong-sort/sort-connectors/hudi/pom.xml index 660e17113..e62f42f8d 100644 --- a/inlong-sort/sort-connectors/hudi/pom.xml +++ b/inlong-sort/sort-connectors/hudi/pom.xml @@ -111,6 +111,7 @@ com.fasterxml.woodstox:* org.codehaus.woodstox:* com.google.guava:* +com.google.protobuf:* diff --git a/inlong-sort/sort-connectors/iceberg/pom.xml b/inlong-sort/sort-connectors/iceberg/pom.xml index c3cb35176..7d33b4a6e 100644 --- a/inlong-sort/sort-connectors/iceberg/pom.xml +++ b/inlong-sort/sort-connectors/iceberg/pom.xml @@ -87,6 +87,7 @@ org.apache.iceberg:* org.apache.hive:hive-exec org.apache.thrift:libfb303 +com.google.protobuf:* diff --git a/inlong-sort/sort-connectors/kafka/pom.xml b/inlong-sort/sort-connectors/kafka/pom.xml index 1f645cf91..cb77f730e 100644 --- a/inlong-sort/sort-connectors/kafka/pom.xml +++ b/inlong-sort/sort-connectors/kafka/pom.xml @@ -65,6 +65,7 @@ org.apache.inlong:* org.apache.kafka:* +com.google.protobuf:* org.apache.flink:flink-connector-kafka_${scala.binary.version} diff --git a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml index c3019e20c..38bb08f93 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml +++ b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml @@ -99,6 +99,7 @@ com.google.guava:* org.apache.flink:flink-shaded-guava +com.google.protobuf:* diff --git a/inlong-sort/sort-connectors/mysql-cdc/pom.xml b/inlong-sort/sort
[inlong-website] branch master updated: [INLONG-687][Doc] Optimize the standalone deployment guide (#688)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong-website.git The following commit(s) were added to refs/heads/master by this push: new 1ac61284f3 [INLONG-687][Doc] Optimize the standalone deployment guide (#688) 1ac61284f3 is described below commit 1ac61284f383daa77e5277f256f0a290811647ab Author: Charles Zhang AuthorDate: Thu Feb 23 17:23:41 2023 +0800 [INLONG-687][Doc] Optimize the standalone deployment guide (#688) --- docs/deployment/docker.md | 9 - docs/deployment/standalone.md | 40 -- .../current/deployment/docker.md | 4 +-- .../current/deployment/standalone.md | 40 -- 4 files changed, 70 insertions(+), 23 deletions(-) diff --git a/docs/deployment/docker.md b/docs/deployment/docker.md index 25ffaf2141..186b880bae 100644 --- a/docs/deployment/docker.md +++ b/docs/deployment/docker.md @@ -23,6 +23,13 @@ cd docker/docker-compose docker-compose up -d ``` +## Cluster Initialize +When all containers are successfully started, you can access the Inlong dashboard address `http: // localhost`, and use the following default account to log in: +``` +User: admin +Password: inlong +``` + ### Create Cluster Tag Click [Clusters]->[ClusterTags]->[Create] on the page to specify the cluster label name and person in charge:  @@ -41,7 +48,7 @@ The ClusterTags selects the newly created `default_cluster`, the Pulsar cluster Service URL is `pulsar://pulsar:6650`, Admin URL is `http://pulsar:8080`. ::: -### Create Data Stream +## Use You can refer [Pulsar Example](quick_start/pulsar_example.md) to create Data Stream. ## Destroy diff --git a/docs/deployment/standalone.md b/docs/deployment/standalone.md index 7a08b04ffd..170c7a4f99 100644 --- a/docs/deployment/standalone.md +++ b/docs/deployment/standalone.md @@ -16,7 +16,9 @@ InLong Support the following Message Queue services now, you can choose one of t ## Download the Binary Package You can get binary package from [Download Page](https://inlong.apache.org/download) ,or you can build the InLong refer to [How to Build](quick_start/how_to_build.md). +:::note Extract `apache-inlong-[version]-bin.tar.gz` and `apache-inlong-[version]-sort-connectors.tar.gz`, and make sure the `inlong-sort/connectors/` directory contains `sort-connector-[type]-[version].jar`. +::: ## DB Dependencies - If the backend database is MySQL, please download [mysql-connector-java-8.0.27.jar](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar) and put it into the following directories: @@ -33,12 +35,14 @@ In `conf/inlong.conf`, configure the parameters according to the actual situatio ```shell # local IP local_ip= -# Configure MySQL +# Configure Database, MySQL or PostgreSQL spring_datasource_hostname= spring_datasource_port=3306 spring_datasource_username=root spring_datasource_password=inlong -# Configure Pulsar or TubeMQ Address +# Configure Pulsar Address if use Pulsar for Audit +pulsar_service_url=pulsar://172.17.0.2:6650 +pulsar_admin_url=http://172.17.0.2:8080 # the REST server address for Flink flink_rest_address= # the REST server Port for Flink @@ -50,12 +54,28 @@ flink_rest_port=8081 bin/inlong-daemon start standalone ``` -## Register Message Queue -You can register message queue for Manger according to [Register MQ Cluster](https://inlong.apache.org/docs/next/modules/manager/quick_start/#register-mq-cluster). +## Cluster Initialize +When all containers are successfully started, you can access the Inlong dashboard address `http: // localhost`, and use the following default account to log in: +``` +User: admin +Password: inlong +``` -## Check -After all component run successfully, you can access `http://localhost` with default account: -```shell -user: admin -password: inlong -``` \ No newline at end of file +### Create Cluster Tag +Click [Clusters]->[ClusterTags]->[Create] on the page to specify the cluster label name and person in charge: + + +:::caution +Since each component reports the ClusterTags as `default_cluster` by default, do not use other names. +::: + +### Register Pulsar Cluster +Click [Clusters]->[ClusterTags]->[Create] on the page to register Pulsar Cluster: + + +:::note +The ClusterTags selects the newly created `default_cluster`, and then configuring the Pulsar cluster info. +::: + +## Use +You can refer [Pulsar Example](quick_start/pulsar_example.md) to create Data Stream. \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/deployment/docker.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/deployment/docker.md index 309de929c2..5d7b1753aa 10064
[inlong] branch master updated (c9a640d4c -> 90b678876)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from c9a640d4c [INLONG-5437][SDK] Support initializing SDK(cpp) by ClientConfig object (#5641) add 90b678876 [INLONG-5637][Sort] Fix kafka load node npe error (#5638) No new revisions were added by this update. Summary of changes: .../java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java | 8 ++-- 1 file changed, 2 insertions(+), 6 deletions(-)
[inlong] branch master updated: [INLONG-5913][Sort] Add metric state for oracle and fix speed computing error (#5935)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 4b4f0dc2f [INLONG-5913][Sort] Add metric state for oracle and fix speed computing error (#5935) 4b4f0dc2f is described below commit 4b4f0dc2fb7fdfd9011e75db2738ab3cd35eaa5d Author: Xin Gong AuthorDate: Tue Sep 20 11:15:15 2022 +0800 [INLONG-5913][Sort] Add metric state for oracle and fix speed computing error (#5935) --- .../inlong/sort/base/metric/SinkMetricData.java| 28 .../inlong/sort/base/metric/SourceMetricData.java | 19 +-- .../sort/cdc/oracle/DebeziumSourceFunction.java| 38 ++ 3 files changed, 54 insertions(+), 31 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index d065496e4..34f759a83 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -66,28 +66,26 @@ public class SinkMetricData implements MetricData { registerMetricsForDirtyRecords(new ThreadSafeCounter()); break; case NORMAL: -registerMetricsForNumBytesOut(new ThreadSafeCounter()); -registerMetricsForNumRecordsOut(new ThreadSafeCounter()); -registerMetricsForNumBytesOutPerSecond(); -registerMetricsForNumRecordsOutPerSecond(); - recordsOutCounter.inc(option.getInitRecords()); bytesOutCounter.inc(option.getInitBytes()); -registerMetricsForNumRecordsOutForMeter(recordsOutCounter); -registerMetricsForNumRecordsOutForMeter(bytesOutCounter); +registerMetricsForNumBytesOut(bytesOutCounter); +registerMetricsForNumRecordsOut(recordsOutCounter); +registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter()); +registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter()); +registerMetricsForNumBytesOutPerSecond(); +registerMetricsForNumRecordsOutPerSecond(); break; default: registerMetricsForDirtyBytes(new ThreadSafeCounter()); registerMetricsForDirtyRecords(new ThreadSafeCounter()); -registerMetricsForNumBytesOut(new ThreadSafeCounter()); -registerMetricsForNumRecordsOut(new ThreadSafeCounter()); -registerMetricsForNumBytesOutPerSecond(); -registerMetricsForNumRecordsOutPerSecond(); - recordsOutCounter.inc(option.getInitRecords()); bytesOutCounter.inc(option.getInitBytes()); -registerMetricsForNumRecordsOutForMeter(recordsOutCounter); -registerMetricsForNumRecordsOutForMeter(bytesOutCounter); +registerMetricsForNumBytesOut(bytesOutCounter); +registerMetricsForNumRecordsOut(recordsOutCounter); +registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter()); +registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter()); +registerMetricsForNumBytesOutPerSecond(); +registerMetricsForNumRecordsOutPerSecond(); break; } @@ -267,7 +265,7 @@ public class SinkMetricData implements MetricData { } if (numBytesOutForMeter != null) { -numBytesOutForMeter.inc(rowCount); +numBytesOutForMeter.inc(rowSize); } if (auditImp != null) { diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index 3cffcfe54..3ac6a96f8 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -54,23 +54,22 @@ public class SourceMetricData implements MetricData { this.metricGroup = metricGroup; this.labels = option.getLabels(); -SimpleCounter recordsInCounter = new SimpleCounter(); -SimpleCounter bytesInCounter = new SimpleCounter(); +ThreadSafeCounter recordsInCounter = new ThreadSafeCounter(); +ThreadSafeCounter bytesInCounter = new ThreadSafeCounter(); switch (option.getRegisteredMetric()) { default: -registerMetricsForNumRecordsIn
[inlong] branch master updated: [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new a859e7973 [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972) a859e7973 is described below commit a859e7973b699c4b3a4e6c8d5919d63a8e41efb7 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Sep 21 16:02:06 2022 +0800 [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972) Co-authored-by: thesumery <158971...@qq.com> --- .../hive/filesystem/AbstractStreamingWriter.java | 31 ++ 1 file changed, 31 insertions(+) diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java index ab2e845f2..dd9456203 100644 --- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java @@ -18,6 +18,10 @@ package org.apache.inlong.sort.hive.filesystem; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -34,9 +38,16 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; + import javax.annotation.Nullable; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send * file and bucket information to downstream. @@ -70,6 +81,8 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe @Nullable private transient SinkMetricData metricData; +private transient ListState metricStateListState; +private transient MetricState metricState; public AbstractStreamingWriter( long bucketCheckInterval, @@ -113,6 +126,8 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(auditHostAndPorts) +.withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) +.withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -151,12 +166,28 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe bucketCheckInterval); currentWatermark = Long.MIN_VALUE; + +// init metric state +if (this.inlongMetric != null) { +this.metricStateListState = context.getOperatorStateStore().getUnionListState( +new ListStateDescriptor<>( +INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint() { +}))); +} +if (context.isRestored()) { +metricState = MetricStateUtils.restoreMetricState(metricStateListState, +getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); +} } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); helper.snapshotState(context.getCheckpointId()); +if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, +getRuntimeContext().getIndexOfThisSubtask()); +} } @Override
[inlong] branch master updated: [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 0394181db [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973) 0394181db is described below commit 0394181db774a946c88cf3ee2b0668e644e81465 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Sep 21 16:04:01 2022 +0800 [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973) Co-authored-by: thesumery <158971...@qq.com> --- .../sort/iceberg/sink/IcebergStreamWriter.java | 41 ++ 1 file changed, 41 insertions(+) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java index 8318c7177..6f1b75ea0 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java @@ -19,6 +19,12 @@ package org.apache.inlong.sort.iceberg.sink; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -30,11 +36,17 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 */ @@ -53,6 +65,8 @@ class IcebergStreamWriter extends AbstractStreamOperator private transient int attemptId; @Nullable private transient SinkMetricData metricData; +private transient ListState metricStateListState; +private transient MetricState metricState; IcebergStreamWriter( String fullTableName, @@ -81,6 +95,8 @@ class IcebergStreamWriter extends AbstractStreamOperator MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(auditHostAndPorts) +.withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) +.withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -105,6 +121,31 @@ class IcebergStreamWriter extends AbstractStreamOperator } } +@Override +public void initializeState(StateInitializationContext context) throws Exception { +super.initializeState(context); +// init metric state +if (this.inlongMetric != null) { +this.metricStateListState = context.getOperatorStateStore().getUnionListState( +new ListStateDescriptor<>( +INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint() { +}))); +} +if (context.isRestored()) { +metricState = MetricStateUtils.restoreMetricState(metricStateListState, +getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); +} +} + +@Override +public void snapshotState(StateSnapshotContext context) throws Exception { +super.snapshotState(context); +if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, +getRuntimeContext().getIndexOfThisSubtask()); +} +} + @Override public void dispose() throws Exception { super.dispose();
[inlong] branch master updated: [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 341aa987b [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974) 341aa987b is described below commit 341aa987ba86d0598af321d8e5a164a02224ff26 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Sep 21 16:05:23 2022 +0800 [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974) Co-authored-by: thesumery <158971...@qq.com> --- .../inlong/sort/base/metric/MetricOption.java | 8 + .../iceberg/flink/sink/IcebergStreamWriter.java| 41 ++ 2 files changed, 49 insertions(+) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java index f4c679f9c..8cf0d6f01 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java @@ -111,6 +111,14 @@ public class MetricOption { return initBytes; } +public void setInitRecords(long initRecords) { +this.initRecords = initRecords; +} + +public void setInitBytes(long initBytes) { +this.initBytes = initBytes; +} + public static Builder builder() { return new Builder(); } diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java index bb00b7808..ef7612743 100644 --- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java @@ -19,6 +19,12 @@ package org.apache.inlong.sort.iceberg.flink.sink; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -29,11 +35,17 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 */ @@ -51,6 +63,8 @@ class IcebergStreamWriter extends AbstractStreamOperator private transient int attemptId; @Nullable private transient SinkMetricData metricData; +private transient ListState metricStateListState; +private transient MetricState metricState; IcebergStreamWriter(String fullTableName, TaskWriterFactory taskWriterFactory, @@ -74,6 +88,8 @@ class IcebergStreamWriter extends AbstractStreamOperator // Initialize metric if (metricOption != null) { +metricOption.setInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L); +metricOption.setInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L); metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } } @@ -95,6 +111,31 @@ class IcebergStreamWriter extends AbstractStreamOperator } } +@Override +public void initializeState(StateInitializationContext context) throws Exception { +super.initializeState(context); +// init metric state +if (this.metricData != null) { +this.metricStateListState = context.getOperatorStateStore().getUnionListState( +new ListStateDescriptor<>( +INLONG
[inlong] branch master updated (4f50594a6 -> c164d788f)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 4f50594a6 [INLONG-6094][Kubernetes] Add liveness reliability for modules (#6095) add c164d788f [INLONG-6089][Manager][Sort] kafka extract node support raw format and scan mode support timestamp (#6100) No new revisions were added by this update. Summary of changes: .../manager/pojo/sort/util/ExtractNodeUtils.java | 10 - .../manager/pojo/source/kafka/KafkaOffset.java | 1 + .../manager/pojo/source/kafka/KafkaSource.java | 3 ++ .../manager/pojo/source/kafka/KafkaSourceDTO.java | 4 ++ .../pojo/source/kafka/KafkaSourceRequest.java | 6 ++- .../sort/protocol/constant/KafkaConstant.java | 2 + .../sort/protocol/enums/KafkaScanStartupMode.java | 3 +- .../protocol/node/extract/KafkaExtractNode.java| 22 -- .../node/extract/KafkaExtractNodeTest.java | 36 ++- .../sort/parser/DataTypeConvertSqlParseTest.java | 2 +- .../sort/parser/DecimalFormatSqlParseTest.java | 2 +- .../sort/parser/DistinctNodeSqlParseTest.java | 6 +-- .../inlong/sort/parser/FlinkSqlParserTest.java | 6 +-- .../sort/parser/FullOuterJoinSqlParseTest.java | 14 +++--- .../sort/parser/InnerJoinRelationSqlParseTest.java | 10 ++--- .../parser/IntervalJoinRelationSqlParseTest.java | 4 +- .../inlong/sort/parser/KafkaSqlParseTest.java | 51 +- .../sort/parser/LeftOuterJoinSqlParseTest.java | 10 ++--- .../inlong/sort/parser/MetaFieldSyncTest.java | 2 +- .../MySqlTemporalJoinRelationSqlParseTest.java | 2 +- .../sort/parser/RightOuterJoinSqlParseTest.java| 10 ++--- 21 files changed, 164 insertions(+), 42 deletions(-)
[inlong] branch master updated (5d676a887 -> 31e9f9013)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 5d676a887 [INLONG-6131][Agent] Support file filtering by condition (#6132) add 31e9f9013 [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode (#6123) No new revisions were added by this update. Summary of changes: .../sort/protocol/node/format/AvroFormat.java | 9 +- .../sort/protocol/node/format/CanalJsonFormat.java | 7 + .../sort/protocol/node/format/CsvFormat.java | 9 +- .../protocol/node/format/DebeziumJsonFormat.java | 9 +- .../inlong/sort/protocol/node/format/Format.java | 7 + .../sort/protocol/node/format/InLongMsgFormat.java | 9 +- .../sort/protocol/node/format/JsonFormat.java | 10 +- .../sort/protocol/node/format/RawFormat.java | 15 +- .../sort/protocol/node/load/KafkaLoadNode.java | 44 +- .../org/apache/inlong/sort/base/Constants.java | 26 ++-- .../base/format/AbstractDynamicSchemaFormat.java | 116 ++ .../base/format/CanalJsonDynamicSchemaFormat.java | 58 +++ .../format/DebeziumJsonDynamicSchemaFormat.java| 58 +++ .../base/format/DynamicSchemaFormatFactory.java| 54 +++ .../sort/base/format/JsonDynamicSchemaFormat.java | 168 + .../format/CanalJsonDynamicSchemaFormatTest.java | 88 +++ .../DebeziumJsonDynamicSchemaFormatTest.java | 79 ++ .../base/format/DynamicSchemaFormatBaseTest.java | 63 .../kafka/DynamicKafkaSerializationSchema.java | 45 -- .../apache/inlong/sort/kafka/KafkaDynamicSink.java | 30 +++- .../sort/kafka/table/KafkaDynamicTableFactory.java | 118 --- .../table/UpsertKafkaDynamicTableFactory.java | 5 +- .../inlong/sort/parser/KafkaLoadSqlParseTest.java | 54 ++- 23 files changed, 1023 insertions(+), 58 deletions(-) create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java create mode 100644 inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java create mode 100644 inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java create mode 100644 inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatBaseTest.java
[inlong] branch master updated (31e9f9013 -> a225671e5)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 31e9f9013 [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode (#6123) add a225671e5 [INLONG-6113][Sort] Mysql cdc connector support read table schema when using debezium function (#6128) No new revisions were added by this update. Summary of changes: .../org/apache/inlong/common/enums/MetaField.java | 7 +- .../org/apache/inlong/sort/protocol/Metadata.java | 5 +- .../protocol/node/extract/MySqlExtractNode.java| 8 ++- .../node/extract/MySqlExtractNodeTest.java | 1 + .../sort/cdc/debezium/DebeziumSourceFunction.java | 10 +++ .../debezium/internal/DebeziumChangeFetcher.java | 12 +++- .../internal/FlinkDatabaseSchemaHistory.java | 2 +- .../cdc/mysql/table/MySqlReadableMetadata.java | 28 +--- .../apache/inlong/sort/parser/AllMigrateTest.java | 79 ++ .../inlong/sort/formats/json/canal/CanalJson.java | 13 10 files changed, 137 insertions(+), 28 deletions(-)
[inlong] branch master updated: [INLONG-6188] join supports multiple fields (#6189)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 153fdaf96 [INLONG-6188] join supports multiple fields (#6189) 153fdaf96 is described below commit 153fdaf967197d1a8b3c3558fb83e7987bc10716 Author: jiachengjiang <108396409+jjiach...@users.noreply.github.com> AuthorDate: Thu Oct 20 11:10:15 2022 +0800 [INLONG-6188] join supports multiple fields (#6189) --- .../org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java index 69ab47153..5896e2887 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java @@ -148,7 +148,7 @@ public class NodeRelationUtils { StreamField leftField = leftJoinFields.get(index); StreamField rightField = rightJoinFields.get(index); LogicOperator operator; -if (index != leftJoinFields.size() - 1) { +if (index != 0) { operator = AndOperator.getInstance(); } else { operator = EmptyOperator.getInstance();
[inlong] branch master updated (153fdaf96 -> d062fb335)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 153fdaf96 [INLONG-6188] join supports multiple fields (#6189) add d062fb335 [INLONG-6224][Sort] Import schema and data parsing ability for DynamicSchemaFormat (#6225) No new revisions were added by this update. Summary of changes: inlong-sort/sort-connectors/base/pom.xml | 7 ++ .../org/apache/inlong/sort/base/Constants.java | 22 --- .../base/format/AbstractDynamicSchemaFormat.java | 35 ++ .../base/format/CanalJsonDynamicSchemaFormat.java | 55 +++- .../format/DebeziumJsonDynamicSchemaFormat.java| 63 -- .../sort/base/format/JsonDynamicSchemaFormat.java | 77 ++ 6 files changed, 243 insertions(+), 16 deletions(-)
[inlong] branch master updated (98a58b011 -> e696c203a)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 98a58b011 [INLONG-7437][Sort] Support metrics for Oracle CDC Connector with incremental snapshot enabled (#7443) add e696c203a [INLONG-7477][Sort] Fix the metadata of table write error for canal-json (#7478) No new revisions were added by this update. Summary of changes: .../CanalJsonEnhancedSerializationSchema.java | 44 -- 1 file changed, 40 insertions(+), 4 deletions(-)
[inlong-website] branch master updated: [INLONG-702][Doc] Remove unsupported options in MongoDB CDC 2.3 (#703)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong-website.git The following commit(s) were added to refs/heads/master by this push: new 7f16fff6a2 [INLONG-702][Doc] Remove unsupported options in MongoDB CDC 2.3 (#703) 7f16fff6a2 is described below commit 7f16fff6a28a1ae9a4d91434930d9c8fbb63190b Author: emhui <111486498+e-m...@users.noreply.github.com> AuthorDate: Mon Mar 6 11:18:21 2023 +0800 [INLONG-702][Doc] Remove unsupported options in MongoDB CDC 2.3 (#703) * [INLONG-702][Doc] Remove unsupported options in MongoDB CDC 2.3 * [INLONG-702][Doc] Modify the default values of some options --- docs/data_node/extract_node/mongodb-cdc.md | 10 +++--- .../current/data_node/extract_node/mongodb-cdc.md | 10 +++--- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/docs/data_node/extract_node/mongodb-cdc.md b/docs/data_node/extract_node/mongodb-cdc.md index 601ef3f9d4..4eca173ad3 100644 --- a/docs/data_node/extract_node/mongodb-cdc.md +++ b/docs/data_node/extract_node/mongodb-cdc.md @@ -125,14 +125,10 @@ TODO: It will be supported in the future. | database | required | (none) | String | Name of the database to watch for changes. | | collection| required | (none) | String | Name of the collection in the database to watch for changes. | | connection.options| optional | (none) | String | The ampersand-separated [connection options](https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options) of MongoDB. eg. `replicaSet=test&connectTimeoutMS=30` | -| errors.tolerance | optional | none | String | Whether to continue processing messages if an error is encountered. Accept `none` or `all`. When set to `none`, the connector reports an error and blocks further processing of the rest of the records when it encounters an error. When set to `all`, the connector silently ignores any bad messages. | -| errors.log.enable | optional | true | Boolean | Whether details of failed operations should be written to the log file. | | copy.existing | optional | true | Boolean | Whether copy existing data from source collections. | -| copy.existing.pipeline| optional | (none) | String | An array of JSON objects describing the pipeline operations to run when copying existing data. This can improve the use of indexes by the copying manager and make copying more efficient. eg. `[{"$match": {"closed": "false"}}]` ensures that only documents in which the closed field is set to false are copied. | -| copy.existing.max.threads | optional | Processors Count | Integer | The number of threads to use when performing the data copy. | -| copy.existing.queue.size | optional | 16000| Integer | The max size of the queue to use when copying data. | -| poll.max.batch.size | optional | 1000 | Integer | Maximum number of change stream documents to include in a single batch when polling for new data. | -| poll.await.time.ms| optional | 1500 | Integer | The amount of time to wait before checking for new results on the change stream. | +| copy.existing.queue.size | optional | 10240| Integer | The max size of the queue to use when copying data. | +| poll.max.batch.size | optional | 1024 | Integer | Maximum number of change stream documents to include in a single batch when polling for new data. | +| poll.await.time.ms| optional | 1000 | Integer | The amount of time to wait before checking for new results on the change stream. | | heartbeat.interval.ms | optional | 0| Integer | The length of time in milliseconds between sending heartbeat messages. Use 0 to disa | | inlong.metric.labels | optional | (none) | String | Inlong metric label, format of value is groupId=`{groupId}`&streamId=`{streamId}`&nodeId=`{nodeId}`. | ## Available Metadata diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mongodb-cdc.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mongodb-cdc.md index ad7f76b0a9..c3b94c189b 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mongodb-cdc.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mongodb-cdc.md @@ -125,14 +125,10 @@ TODO: 未来会支持 | database | 必须 | (none) | String | 要监视更改的数据库的名称。 | | collection| 必须
[inlong] branch master updated: [INLONG-7550][Sort] Optimize the log printing level of dirty data to avoid generating a large number of logs (#7551)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new d1dc70ee7 [INLONG-7550][Sort] Optimize the log printing level of dirty data to avoid generating a large number of logs (#7551) d1dc70ee7 is described below commit d1dc70ee7963a600d0f69f6c27af3a519494f852 Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Wed Mar 8 12:22:07 2023 +0800 [INLONG-7550][Sort] Optimize the log printing level of dirty data to avoid generating a large number of logs (#7551) --- .../org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java | 5 ++--- .../java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java index b7f7af914..2884ac398 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java @@ -46,11 +46,10 @@ public class LogDirtySink implements DirtySink { private static final long serialVersionUID = 1L; private static final Logger LOGGER = LoggerFactory.getLogger(LogDirtySink.class); - -private RowData.FieldGetter[] fieldGetters; private final String format; private final String fieldDelimiter; private final DataType physicalRowDataType; +private RowData.FieldGetter[] fieldGetters; private RowDataToJsonConverter converter; public LogDirtySink(String format, String fieldDelimiter, DataType physicalRowDataType) { @@ -79,7 +78,7 @@ public class LogDirtySink implements DirtySink { // Only support csv format when the row is not a 'RowData' and 'JsonNode' value = FormatUtils.csvFormat(data, labelMap, fieldDelimiter); } -LOGGER.info("[{}] {}", dirtyData.getLogTag(), value); +LOGGER.debug("[{}] {}", dirtyData.getLogTag(), value); } private String format(RowData data, LogicalType rowType, diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java index 66c1428d3..26f9d66f4 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java @@ -161,7 +161,7 @@ public class S3DirtySink implements DirtySink { value = FormatUtils.csvFormat(data, labelMap, s3Options.getFieldDelimiter()); } if (s3Options.enableDirtyLog()) { -LOGGER.info("[{}] {}", dirtyData.getLogTag(), value); +LOGGER.debug("[{}] {}", dirtyData.getLogTag(), value); } batchBytes += value.getBytes(UTF_8).length; size++;
[inlong] branch master updated (b6014fa5f -> 0175d5c1d)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from b6014fa5f [INLONG-7533][Agent] Fix that log cannot be collected for position reset (#7538) add 0175d5c1d [INLONG-7559][Sort] Fix Oracle CDC reads timestamp type record error (#7561) No new revisions were added by this update. Summary of changes: .../table/RowDataDebeziumDeserializeSchema.java| 60 -- 1 file changed, 34 insertions(+), 26 deletions(-)
[inlong] branch master updated: [INLONG-7653][Sort] Support archiving dirty data and metrics for Iceberg connector (#7654)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 6d4aca310 [INLONG-7653][Sort] Support archiving dirty data and metrics for Iceberg connector (#7654) 6d4aca310 is described below commit 6d4aca310330578bc591ca90c1b25366425f8832 Author: Liao Rui AuthorDate: Fri Mar 24 18:01:41 2023 +0800 [INLONG-7653][Sort] Support archiving dirty data and metrics for Iceberg connector (#7654) --- .../sort/iceberg/FlinkDynamicTableFactory.java | 23 +++ .../inlong/sort/iceberg/IcebergTableSink.java | 9 + .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 34 +++- .../sink/multiple/DynamicSchemaHandleOperator.java | 207 + .../multiple/IcebergMultipleFilesCommiter.java | 16 +- .../sink/multiple/IcebergSingleFileCommiter.java | 16 +- .../sink/multiple/IcebergSingleStreamWriter.java | 1 - .../iceberg/sink/multiple/RecordWithSchema.java| 4 + 8 files changed, 214 insertions(+), 96 deletions(-) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java index c6d3ed874..32d3a21bd 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java @@ -34,6 +34,7 @@ import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.util.Preconditions; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.ActionsProvider; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.common.DynMethods; @@ -112,6 +113,25 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami .orNoop() .build(); +public static final ConfigOption WRITE_COMPACT_ENABLE = +ConfigOptions.key("write.compact.enable") +.booleanType() +.defaultValue(false) +.withDescription("Whether to enable compact small file."); + +public static final ConfigOption WRITE_COMPACT_INTERVAL = +ConfigOptions.key("write.compact.snapshot.interval") +.intType() +.defaultValue(20) +.withDescription("Compact snapshot interval."); + +public static final ConfigOption WRITE_DISTRIBUTION_MODE = +ConfigOptions.key(TableProperties.WRITE_DISTRIBUTION_MODE) +.stringType() +.defaultValue(TableProperties.WRITE_DISTRIBUTION_MODE_NONE) +.withDescription("Distribute the records from input data stream based " ++ "on the write.distribution-mode."); + private final FlinkCatalog catalog; public FlinkDynamicTableFactory() { @@ -274,6 +294,9 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY); options.add(SINK_MULTIPLE_PK_AUTO_GENERATED); options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK); +options.add(WRITE_COMPACT_ENABLE); +options.add(WRITE_COMPACT_INTERVAL); +options.add(WRITE_DISTRIBUTION_MODE); return options; } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java index ef93e1f6f..cd239633e 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java @@ -29,6 +29,7 @@ import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.actions.ActionsProvider; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.TableLoader; @@ -54,6 +55,7 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERA import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY; import static org.apache.inlong.sor
[inlong] branch master updated (a39e03cc5 -> b9446908f)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from a39e03cc5 [INLONG-8108][Manager] WorkflowApprover API Permissions Optimization (#8109) add b9446908f [INLONG-7853][Sort] Add common handle for schema-change in sink (#8105) No new revisions were added by this update. Summary of changes: .../sort/protocol/enums/SchemaChangePolicy.java| 73 +++ .../sort/protocol/enums/SchemaChangeType.java | 94 + .../apache/inlong/sort/util/SchemaChangeUtils.java | 229 + .../inlong/sort/util/SchemaChangeUtilsTest.java| 62 ++ .../org/apache/inlong/sort/base/Constants.java | 13 ++ .../apache/inlong/sort/base/dirty/DirtyType.java | 12 ++ .../sort/base/format/JsonDynamicSchemaFormat.java | 2 +- .../base/schema/SchemaChangeHandleException.java | 20 +- 8 files changed, 493 insertions(+), 12 deletions(-) create mode 100644 inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/SchemaChangePolicy.java create mode 100644 inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/SchemaChangeType.java create mode 100644 inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java create mode 100644 inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/util/SchemaChangeUtilsTest.java copy inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ProxysdkException.java => inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHandleException.java (72%)
[inlong] branch master updated: [INLONG-8217][Sort] Sort-core should support running on flink-1.15 (#8230)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 5a789fe7fe [INLONG-8217][Sort] Sort-core should support running on flink-1.15 (#8230) 5a789fe7fe is described below commit 5a789fe7fe43f2f7faab4cd4fa0fadbcd730ba15 Author: Sting AuthorDate: Wed Jun 14 21:31:15 2023 +0800 [INLONG-8217][Sort] Sort-core should support running on flink-1.15 (#8230) --- .../sort-core/src/main/java/org/apache/inlong/sort/Entrance.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java index ae3ebc842d..ea3f3f95be 100644 --- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java +++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java @@ -50,8 +50,7 @@ public class Entrance { config.getInteger(Constants.MIN_PAUSE_BETWEEN_CHECKPOINTS_MS)); env.getCheckpointConfig().setCheckpointTimeout(config.getInteger(Constants.CHECKPOINT_TIMEOUT_MS)); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); -EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner() -.inStreamingMode().build(); +EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.getConfig().getConfiguration().setString(Constants.PIPELINE_NAME, config.getString(Constants.JOB_NAME));