[inlong] branch master updated: [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario (#6323)

2022-10-31 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new ca24d2ede [INLONG-6322][Sort] Fix write data incorrect for doris 
connector with sink multiple scenario (#6323)
ca24d2ede is described below

commit ca24d2edea046611f8c77d70b1465b550f146aaa
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Mon Oct 31 17:19:57 2022 +0800

[INLONG-6322][Sort] Fix write data incorrect for doris connector with sink 
multiple scenario (#6323)

* [INLONG-6322][Sort] Fix write data incorrect for doris connector with 
sink multiple scenario

* [INLONG-6322][Sort] Fix flushing update error

* [INLONG-6322][Sort] Optimize flush handle

* [INLONG-6322][Sort] Fix comment error
---
 .../table/DorisDynamicSchemaOutputFormat.java  | 68 +-
 1 file changed, 53 insertions(+), 15 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index c2db85789..9e8142ae7 100644
--- 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * DorisDynamicSchemaOutputFormat, copy from {@link 
org.apache.doris.flink.table.DorisDynamicOutputFormat}
@@ -76,11 +77,16 @@ public class DorisDynamicSchemaOutputFormat extends 
RichOutputFormat {
 private final String tablePattern;
 private final String dynamicSchemaFormat;
 private final boolean ignoreSingleTableErrors;
-private final transient Map flushExceptionMap = new 
HashMap<>();
+private final Map flushExceptionMap = new HashMap<>();
+private final AtomicLong readInNum = new AtomicLong(0);
+private final AtomicLong writeOutNum = new AtomicLong(0);
+private final AtomicLong errorNum = new AtomicLong(0);
+private final AtomicLong ddlNum = new AtomicLong(0);
 private long batchBytes = 0L;
 private int size;
 private DorisStreamLoad dorisStreamLoad;
 private transient volatile boolean closed = false;
+private transient volatile boolean flushing = false;
 private transient ScheduledExecutorService scheduler;
 private transient ScheduledFuture scheduledFuture;
 private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
@@ -131,10 +137,8 @@ public class DorisDynamicSchemaOutputFormat extends 
RichOutputFormat {
 this.scheduler = new ScheduledThreadPoolExecutor(1,
 new 
ExecutorThreadFactory("doris-streamload-output-format"));
 this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> 
{
-synchronized (DorisDynamicSchemaOutputFormat.this) {
-if (!closed) {
-flush();
-}
+if (!closed && !flushing) {
+flush();
 }
 }, executionOptions.getBatchIntervalMs(), 
executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
 }
@@ -158,7 +162,7 @@ public class DorisDynamicSchemaOutputFormat extends 
RichOutputFormat {
 addBatch(row);
 boolean valid = (executionOptions.getBatchSize() > 0 && size >= 
executionOptions.getBatchSize())
 || batchBytes >= executionOptions.getMaxBatchBytes();
-if (valid) {
+if (valid && !flushing) {
 flush();
 }
 }
@@ -168,8 +172,10 @@ public class DorisDynamicSchemaOutputFormat extends 
RichOutputFormat {
 if (row instanceof RowData) {
 RowData rowData = (RowData) row;
 JsonNode rootNode = 
jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
+readInNum.incrementAndGet();
 boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
 if (isDDL) {
+ddlNum.incrementAndGet();
 // Ignore ddl change for now
 return;
 }
@@ -226,6 +232,7 @@ public class DorisDynamicSchemaOutputFormat extends 
RichOutputFormat {
 }
 break;
 default:
+errorNum.incrementAndGet();
 throw n

[inlong] branch master updated (70bf3e0aa -> 93c2dc311)

2022-11-01 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 70bf3e0aa [INLONG-6361][Manager] Fix the deserialization exception 
with Jackson's version below 2.13 (#6362)
 add 93c2dc311 [INLONG-6353][Sort] Fix delete event handle error for doris 
connector (#6358)

No new revisions were added by this update.

Summary of changes:
 .../base/format/CanalJsonDynamicSchemaFormat.java  |   3 +
 .../format/DebeziumJsonDynamicSchemaFormat.java|   3 +
 .../table/DorisDynamicSchemaOutputFormat.java  | 145 +++--
 .../inlong/sort/doris/table/DorisStreamLoad.java   |   4 +-
 4 files changed, 113 insertions(+), 42 deletions(-)



[inlong] branch master updated (e97a2c269 -> 4c20d9504)

2022-11-03 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from e97a2c269 [INLONG-6369][DataProxy] Use the defined message attribute 
Key value in InLong-Common (#6386)
 add 4c20d9504 [INLONG-6356][Sort] Oracle connector supports precision and 
binary type (#6357)

No new revisions were added by this update.

Summary of changes:
 .../table/RowDataDebeziumDeserializeSchema.java|  3 ++
 .../cdc/oracle/table/OracleReadableMetaData.java   | 41 +++---
 2 files changed, 31 insertions(+), 13 deletions(-)



[inlong] branch master updated: [INLONG-6402][Sort] Modify the metadata field of oracle connector (#6404)

2022-11-06 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new f8266fe35 [INLONG-6402][Sort] Modify the metadata field of oracle 
connector (#6404)
f8266fe35 is described below

commit f8266fe35c83456eabe356ff27927deafd6342f0
Author: emhui <111486498+e-m...@users.noreply.github.com>
AuthorDate: Mon Nov 7 11:17:39 2022 +0800

[INLONG-6402][Sort] Modify the metadata field of oracle connector (#6404)

* [INLONG-6402][Sort] Modify the metadata field of oracle connector

* [INLONG-6402][Sort] Remove extra punctuation

* [INLONG-6402][Sort] Modify the metadata field of oracle connector to be 
consistent with mysql connector

* [INLONG-6402][Sort] Compatible with open source oracle connector

* [INLONG-6402][Sort] Remove incorrect information

* [INLONG-6402][Sort] Remove unused meta field

Co-authored-by: menghuiyu 
---
 .../org/apache/inlong/common/enums/MetaField.java  |   5 +
 .../org/apache/inlong/sort/protocol/Metadata.java  |  11 +-
 .../protocol/node/extract/OracleExtractNode.java   |  55 ++-
 .../node/extract/OracleExtractNodeTest.java|  14 +-
 .../cdc/oracle/table/OracleReadableMetaData.java   | 166 -
 5 files changed, 137 insertions(+), 114 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
index f88da9c4d..4a065583d 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
@@ -116,6 +116,11 @@ public enum MetaField {
  */
 MYSQL_TYPE,
 
+/**
+ * The table structure. It is only used for Oracle database
+ */
+ORACLE_TYPE,
+
 /**
  * Primary key field name. Currently, it is used for MySQL database.
  */
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
index 71c37e4ae..6c2ac4d2d 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
@@ -54,14 +54,6 @@ public interface Metadata {
 case OP_TS:
 metadataKey = "op_ts";
 break;
-case DATA:
-case DATA_BYTES:
-metadataKey = "meta.data";
-break;
-case DATA_CANAL:
-case DATA_BYTES_CANAL:
-metadataKey = "meta.data_canal";
-break;
 
 default:
 throw new 
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
@@ -106,6 +98,9 @@ public interface Metadata {
 case MYSQL_TYPE:
 metadataType = "MAP";
 break;
+case ORACLE_TYPE:
+metadataType = "MAP";
+break;
 case PK_NAMES:
 metadataType = "ARRAY";
 break;
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java
index a1bad2523..594ccdd60 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java
@@ -150,7 +150,58 @@ public class OracleExtractNode extends ExtractNode 
implements InlongMetric, Meta
 @Override
 public Set supportedMetaFields() {
 return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, 
MetaField.DATABASE_NAME,
-MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.DATA, 
MetaField.DATA_CANAL,
-MetaField.DATA_BYTES, MetaField.DATA_BYTES_CANAL);
+MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.OP_TYPE, 
MetaField.DATA, MetaField.DATA_BYTES,
+MetaField.DATA_CANAL, MetaField.DATA_BYTES_CANAL, 
MetaField.IS_DDL, MetaField.TS,
+MetaField.SQL_TYPE, MetaField.ORACLE_TYPE, MetaField.PK_NAMES);
 }
+
+@Override
+public String getMetadataKey(MetaField metaField) {
+String metadataKey;
+switch (metaField) {
+case TABLE_NAME:
+metadataKey = "meta.table_name";
+break;
+case DATABASE_NAME:
+metadataKey = "meta.database_name";
+  

[inlong] branch master updated: [INLONG-6382][Sort] Iceberg data is messed up when the source table has no primary key in multiple sink scenes (#6383)

2022-11-06 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new a970fa616 [INLONG-6382][Sort] Iceberg data is messed up when the 
source table has no primary key in multiple sink scenes (#6383)
a970fa616 is described below

commit a970fa6167790cfd9d0797e92b5ea26d5f4712e3
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Mon Nov 7 11:22:30 2022 +0800

[INLONG-6382][Sort] Iceberg data is messed up when the source table has no 
primary key in multiple sink scenes (#6383)

* [INLONG-6379][Sort] Bugfix:iceberg miss metric data in multiple sink 
(#6381)

* [INLONG-6382][Sort] Bugfix:Iceberg miss data when source table do not
have primary key in multiple sink scences

* [INLONG-6382][Sort] Import auto generated primary key config in multiple
sink scences

Co-authored-by: thesumery <158971...@qq.com>
---
 .../org/apache/inlong/sort/base/Constants.java |  9 
 .../inlong/sort/base/sink/MultipleSinkOption.java  | 26 +-
 .../sort/iceberg/FlinkDynamicTableFactory.java |  2 ++
 .../inlong/sort/iceberg/IcebergTableSink.java  |  2 ++
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  3 +--
 .../sink/multiple/IcebergMultipleStreamWriter.java |  8 +--
 6 files changed, 40 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 19c58e9c1..c0c7f8808 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -163,4 +163,13 @@ public final class Constants {
 .booleanType()
 .defaultValue(true)
 .withDescription("Whether ignore the single table erros 
when multiple sink writing scenario.");
+
+public static final ConfigOption SINK_MULTIPLE_PK_AUTO_GENERATED =
+ConfigOptions.key("sink.multiple.pk-auto-generated")
+.booleanType()
+.defaultValue(false)
+.withDescription("Whether generated pk fields as whole 
data when source table does not have a "
++ "primary key.");
+
+
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 77c924b95..69ce3fc68 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -35,22 +35,26 @@ public class MultipleSinkOption implements Serializable {
 private static final long serialVersionUID = 1L;
 private static final Logger LOG = 
LoggerFactory.getLogger(MultipleSinkOption.class);
 
-private String format;
+private final String format;
 
-private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
 
-private String databasePattern;
+private final String databasePattern;
 
-private String tablePattern;
+private final String tablePattern;
+
+private final boolean pkAutoGenerated;
 
 public MultipleSinkOption(String format,
 SchemaUpdateExceptionPolicy schemaUpdatePolicy,
 String databasePattern,
-String tablePattern) {
+String tablePattern,
+boolean pkAutoGenerated) {
 this.format = format;
 this.schemaUpdatePolicy = schemaUpdatePolicy;
 this.databasePattern = databasePattern;
 this.tablePattern = tablePattern;
+this.pkAutoGenerated = pkAutoGenerated;
 }
 
 public String getFormat() {
@@ -69,6 +73,10 @@ public class MultipleSinkOption implements Serializable {
 return tablePattern;
 }
 
+public boolean isPkAutoGenerated() {
+return pkAutoGenerated;
+}
+
 public static Builder builder() {
 return new Builder();
 }
@@ -78,6 +86,7 @@ public class MultipleSinkOption implements Serializable {
 private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
 private String databasePattern;
 private String tablePattern;
+private boolean pkAutoGenerated;
 
 public MultipleSinkOption.Builder withFormat(String format) {
 this.format = format;
@@ -99,8 +108,13 @@ public class MultipleSinkOption implements Serializable {
  

[inlong-website] branch master updated: [INLONG-579][Doc] Add doc for oracle connector for all migrate (#584)

2022-11-07 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
 new f59f2a7186 [INLONG-579][Doc] Add doc for oracle connector for all 
migrate (#584)
f59f2a7186 is described below

commit f59f2a71867a85f6c447b7ea37fea67f12a64e06
Author: emhui <111486498+e-m...@users.noreply.github.com>
AuthorDate: Tue Nov 8 14:29:35 2022 +0800

[INLONG-579][Doc] Add doc for oracle connector for all migrate (#584)

* [INLONG-579][Doc] Add doc for oracle connector for all migrate
---
 docs/data_node/extract_node/oracle-cdc.md  | 109 ++-
 .../current/data_node/extract_node/oracle-cdc.md   | 117 +++--
 2 files changed, 216 insertions(+), 10 deletions(-)

diff --git a/docs/data_node/extract_node/oracle-cdc.md 
b/docs/data_node/extract_node/oracle-cdc.md
index f861091bcf..e330965912 100644
--- a/docs/data_node/extract_node/oracle-cdc.md
+++ b/docs/data_node/extract_node/oracle-cdc.md
@@ -294,7 +294,7 @@ TODO: It will be supported in the future.
   required
   (none)
   String
-  Table name of the Oracle database to monitor.
+  Table name of the Oracle database to monitor. The value is of the 
form <schema_name>.<table_name>
 
 
   port
@@ -328,6 +328,13 @@ TODO: It will be supported in the future.
   String
   Inlong metric label, format of value is 
groupId=xxgroup&streamId=xxstream&nodeId=xxnode. 
 
+
+   source.multiple.enable
+   optional
+   false
+   Boolean
+   Whether to enable multiple schema and table migration. If it is' 
true ', Oracle Extract Node will compress the physical field of the table into 
a special meta field 'data_canal' in the format of 'canal json'. 
+ 
 
 
 
@@ -378,6 +385,61 @@ The following format metadata can be exposed as read-only 
(VIRTUAL) columns in a
   TIMESTAMP_LTZ(3) NOT NULL
   It indicates the time that the change was made in the database. 
If the record is read from snapshot of the table instead of the change 
stream, the value is always 0.
 
+
+  meta.table_name
+  STRING NOT NULL
+  Name of the table that contain the row.
+
+
+  meta.schema_name
+  STRING NOT NULL
+  Name of the schema that contain the row.
+
+
+  meta.database_name
+  STRING NOT NULL
+  Name of the database that contain the row.
+
+
+  meta.op_ts
+  TIMESTAMP_LTZ(3) NOT NULL
+  It indicates the time that the change was made in the database. 
If the record is read from snapshot of the table instead of the change 
stream, the value is always 0.
+
+
+  meta.op_type
+  STRING
+  Type of database operation, such as INSERT/DELETE, etc.
+
+
+  meta.data_canal
+  STRING/BYTES
+  Data for rows in `canal-json` format only exists when the 
`source.multiple.enable` option is 'true'.
+
+
+  meta.is_ddl
+  BOOLEAN
+  Whether the DDL statement.
+
+
+  meta.ts
+  TIMESTAMP_LTZ(3) NOT NULL
+  The current time when the row was received and processed.
+
+
+  meta.sql_type
+  MAP
+  Mapping of sql_type table fields to java data type IDs.
+
+
+  meta.oracle_type
+  MAP
+  Structure of the table.
+
+
+  meta.pk_names
+  ARRAY
+  Primay key name of the table.
+
   
 
 
@@ -387,7 +449,18 @@ CREATE TABLE products (
 db_name STRING METADATA FROM 'database_name' VIRTUAL,
 schema_name STRING METADATA FROM 'schema_name' VIRTUAL, 
 table_name STRING METADATA  FROM 'table_name' VIRTUAL,
-operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+meta_db_name STRING METADATA FROM 'meta.database_name' VIRTUAL,
+meta_schema_name STRING METADATA FROM 'meta.schema_name' VIRTUAL, 
+meta_table_name STRING METADATA  FROM 'meta.table_name' VIRTUAL,
+meat_op_ts TIMESTAMP_LTZ(3) METADATA FROM 'meta.op_ts' VIRTUAL,
+meta_op_type STRING METADATA  FROM 'meta.op_type' VIRTUAL,
+meta_data_canal STRING METADATA  FROM 'meta.data_canal' VIRTUAL,
+meta_is_ddl BOOLEAN METADATA FROM 'meta.is_ddl' VIRTUAL,
+meta_ts TIMESTAMP_LTZ(3) METADATA FROM 'meta.ts' VIRTUAL,
+meta_sql_type MAP METADATA FROM 'meta.sql_type' VIRTUAL,
+meat_oracle_type MAP METADATA FROM 'meta.oracle_type' 
VIRTUAL,
+meta_pk_names ARRAY METADATA FROM 'meta.pk_names' VIRTUAL
 ID INT NOT NULL,
 NAME STRING,
 DESCRIPTION STRING,
@@ -401,7 +474,7 @@ CREATE TABLE products (
 'password' = 

[inlong] branch master updated: [INLONG-6471][Sort] MySQL connector metric restore lost init data for using sourceFunction (#6474)

2022-11-08 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 924df34b9 [INLONG-6471][Sort] MySQL connector metric restore lost init 
data for using sourceFunction (#6474)
924df34b9 is described below

commit 924df34b9362c276a5183c7ef18c69da125e38ef
Author: Xin Gong 
AuthorDate: Wed Nov 9 10:35:53 2022 +0800

[INLONG-6471][Sort] MySQL connector metric restore lost init data for using 
sourceFunction (#6474)
---
 .../org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java   | 4 
 1 file changed, 4 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index 80d6812f2..5ade85306 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -80,6 +80,8 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
 import static 
org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static 
org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
 
@@ -438,6 +440,8 @@ public class DebeziumSourceFunction extends 
RichSourceFunction
 MetricOption metricOption = MetricOption.builder()
 .withInlongLabels(inlongMetric)
 .withInlongAudit(inlongAudit)
+.withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
+.withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
 .withRegisterMetric(RegisteredMetric.ALL)
 .build();
 if (metricOption != null) {



[inlong] branch master updated: [INLONG-6379][Sort] Complement iceberg multiple sink metric data compute (#6472)

2022-11-08 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new c8864f8f4 [INLONG-6379][Sort] Complement iceberg multiple sink metric 
data compute (#6472)
c8864f8f4 is described below

commit c8864f8f43b1b1a42a2b647723eed1a2b460c803
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Nov 9 10:39:06 2022 +0800

[INLONG-6379][Sort] Complement iceberg multiple sink metric data compute 
(#6472)
---
 .../inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java | 3 +++
 1 file changed, 3 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 2c67cbd51..56aa77303 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -210,6 +210,9 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction

[inlong] branch master updated: [INLONG-6529] Fix microtime time zone error in mysql connector (#6530)

2022-11-14 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new e223c34c5 [INLONG-6529] Fix microtime time zone error in mysql 
connector (#6530)
e223c34c5 is described below

commit e223c34c54ca3c6cae251c9e340b138993d36568
Author: Schnapps 
AuthorDate: Tue Nov 15 15:40:34 2022 +0800

[INLONG-6529] Fix microtime time zone error in mysql connector (#6530)
---
 .../sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
index a7231a83e..96777c5ee 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -630,7 +630,7 @@ public final class RowDataDebeziumDeserializeSchema
 switch (schemaName) {
 case MicroTime.SCHEMA_NAME:
 Instant instant = Instant.ofEpochMilli((Long) fieldValue / 
1000);
-fieldValue = 
timeFormatter.format(LocalDateTime.ofInstant(instant, serverTimeZone));
+fieldValue = 
timeFormatter.format(LocalDateTime.ofInstant(instant, ZONE_UTC));
 break;
 case Date.SCHEMA_NAME:
 fieldValue = 
dateFormatter.format(LocalDate.ofEpochDay((Integer) fieldValue));



[inlong] branch master updated: [INLONG-6548][Sort] Optimize metadata field naming for format of canal-json (#6549)

2022-11-15 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 489529262 [INLONG-6548][Sort] Optimize metadata field naming for 
format of canal-json (#6549)
489529262 is described below

commit 4895292623c22a1d268bc8ce25366143bcd18005
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Wed Nov 16 11:08:21 2022 +0800

[INLONG-6548][Sort] Optimize metadata field naming for format of canal-json 
(#6549)
---
 .../protocol/node/extract/KafkaExtractNode.java|   2 +-
 .../sort/protocol/node/load/KafkaLoadNode.java |   8 +-
 .../node/extract/KafkaExtractNodeTest.java |   3 +-
 .../sort/protocol/node/load/KafkaLoadNodeTest.java |   2 +-
 .../canal/CanalJsonEnhancedDecodingFormat.java |  19 ++
 .../CanalJsonEnhancedDeserializationSchema.java| 260 -
 .../canal/CanalJsonEnhancedEncodingFormat.java |  19 ++
 .../CanalJsonEnhancedSerializationSchema.java  |  52 ++---
 8 files changed, 217 insertions(+), 148 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 02da2bef3..fb6693548 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -215,7 +215,7 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
 metadataKey = "value.event-timestamp";
 break;
 case OP_TYPE:
-metadataKey = "value.op-type";
+metadataKey = "value.type";
 break;
 case IS_DDL:
 metadataKey = "value.is-ddl";
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index be326b478..26fa424ca 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -224,7 +224,7 @@ public class KafkaLoadNode extends LoadNode implements 
InlongMetric, Metadata, S
 metadataKey = "value.event-timestamp";
 break;
 case OP_TYPE:
-metadataKey = "value.op-type";
+metadataKey = "value.type";
 break;
 case DATA:
 case DATA_CANAL:
@@ -257,8 +257,8 @@ public class KafkaLoadNode extends LoadNode implements 
InlongMetric, Metadata, S
 @Override
 public Set supportedMetaFields() {
 return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, 
MetaField.OP_TYPE,
-MetaField.DATABASE_NAME, MetaField.SQL_TYPE, MetaField.PK_NAMES, 
MetaField.TS,
-MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE, 
MetaField.BATCH_ID,
-MetaField.UPDATE_BEFORE, MetaField.DATA_CANAL, MetaField.DATA);
+MetaField.DATABASE_NAME, MetaField.SQL_TYPE, 
MetaField.PK_NAMES, MetaField.TS,
+MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE, 
MetaField.BATCH_ID,
+MetaField.UPDATE_BEFORE, MetaField.DATA_CANAL, MetaField.DATA);
 }
 }
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index 661d6f547..1d41c96fd 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -34,7 +34,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -86,7 +85,7 @@ public class KafkaExtractNodeTest extends 
SerializeBaseTest {
 formatMap.put(MetaField.PROCESS_TIME, "AS PROCTIME()");
 formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 
'value.table'");
 formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 
'value.database'");
-formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 
'value.op-type'");
+formatMap.put(MetaFi

[inlong] branch master updated (16ddf5f5b -> 17f01a468)

2022-11-27 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 16ddf5f5b [INLONG-6636][Sort] Keep metric computing consistent for 
source MySQL and sink HBase (#6637)
 add 17f01a468 [INLONG-6615][Sort] Fix upsert kafka will throw NPE when get 
delete record (#6616)

No new revisions were added by this update.

Summary of changes:
 .../main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java | 4 +++-
 .../inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java | 7 ---
 2 files changed, 7 insertions(+), 4 deletions(-)



[inlong] branch master updated: [INLONG-6654][Sort] Supports s3 side-output for dirty data (#6655)

2022-11-28 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 21bfeb155 [INLONG-6654][Sort] Supports s3 side-output for dirty data 
(#6655)
21bfeb155 is described below

commit 21bfeb1555f17a49c40a771f3fc52e44c048a20d
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Tue Nov 29 15:46:47 2022 +0800

[INLONG-6654][Sort] Supports s3 side-output for dirty data (#6655)
---
 inlong-sort/sort-connectors/base/pom.xml   |   5 +
 .../sort/base/dirty/sink/DirtySinkFactory.java |   7 +-
 .../sort/base/dirty/sink/log/LogDirtySink.java |   4 +-
 .../base/dirty/sink/log/LogDirtySinkFactory.java   |  10 +-
 .../sort/base/dirty/sink/s3/S3DirtySink.java   | 279 +
 .../base/dirty/sink/s3/S3DirtySinkFactory.java | 148 +++
 .../inlong/sort/base/dirty/sink/s3/S3Helper.java   | 100 
 .../inlong/sort/base/dirty/sink/s3/S3Options.java  | 241 ++
 .../org.apache.flink.table.factories.Factory   |   3 +-
 licenses/inlong-sort-connectors/LICENSE|   2 +-
 pom.xml|   7 +
 11 files changed, 795 insertions(+), 11 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/pom.xml 
b/inlong-sort/sort-connectors/base/pom.xml
index 8e4d2701e..675190c51 100644
--- a/inlong-sort/sort-connectors/base/pom.xml
+++ b/inlong-sort/sort-connectors/base/pom.xml
@@ -42,6 +42,11 @@
 ${project.version}
 provided
 
+
+
+com.amazonaws
+aws-java-sdk-s3
+
 
 
 
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
index b6725ddd8..07784b443 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
@@ -17,12 +17,13 @@
 
 package org.apache.inlong.sort.base.dirty.sink;
 
-import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.Factory;
 
 /**
  * Dirty sink factory class, it is used to create dirty sink
  */
-public interface DirtySinkFactory extends DynamicTableFactory {
+public interface DirtySinkFactory extends Factory {
 
 /**
  * Create dirty sink
@@ -31,6 +32,6 @@ public interface DirtySinkFactory extends DynamicTableFactory 
{
  * @param  The data mode that is handled by the dirty sink
  * @return A dirty sink
  */
- DirtySink createDirtySink(DynamicTableFactory.Context context);
+ DirtySink createDirtySink(Context context);
 
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
index bf5a4f135..a57c981fe 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
@@ -46,7 +46,7 @@ public class LogDirtySink implements DirtySink {
 
 private static final long serialVersionUID = 1L;
 
-private static final Logger LOG = 
LoggerFactory.getLogger(LogDirtySink.class);
+private static final Logger LOGGER = 
LoggerFactory.getLogger(LogDirtySink.class);
 
 private final RowData.FieldGetter[] fieldGetters;
 private final String format;
@@ -85,7 +85,7 @@ public class LogDirtySink implements DirtySink {
 // Only support csv format when the row is not a 'RowData' and 
'JsonNode'
 value = FormatUtils.csvFormat(data, labelMap, fieldDelimiter);
 }
-LOG.info("[{}] {}", dirtyData.getLogTag(), value);
+LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
 }
 
 private String format(RowData data, Map labels) throws 
JsonProcessingException {
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
index 93a12f584..c3720a93d 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirt

[inlong] branch master updated (0671c3a45 -> 40e888598)

2022-11-30 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 0671c3a45 [INLONG-6678][Dashboard] Add sinkType parameter in 
sinkFieldList (#6679)
 add 40e888598 [INLONG-6584][Sort] Add read phase metric and table level 
metric for MySQL (#6670)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/inlong/sort/base/Constants.java |  32 +++
 .../apache/inlong/sort/base/enums/ReadPhase.java   |  80 +++
 .../inlong/sort/base/metric/MetricOption.java  |  21 +-
 .../inlong/sort/base/metric/MetricState.java   |  11 +
 .../inlong/sort/base/metric/SourceMetricData.java  |   4 +-
 .../base/metric/phase/ReadPhaseMetricData.java | 106 +
 .../sort/base/metric/sub/SourceSubMetricData.java} |  51 ++---
 .../base/metric/sub/SourceTableMetricData.java | 240 +
 .../inlong/sort/base/util/MetricStateUtils.java|  73 +++
 .../sort/cdc/debezium/DebeziumSourceFunction.java  |  31 ++-
 .../apache/inlong/sort/cdc/mysql/MySqlSource.java  |   9 +-
 .../sort/cdc/mysql/table/MySqlTableSource.java |   1 +
 12 files changed, 621 insertions(+), 38 deletions(-)
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/enums/ReadPhase.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/phase/ReadPhaseMetricData.java
 copy 
inlong-sort/{sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarBinaryTypeInfo.java
 => 
sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceSubMetricData.java}
 (56%)
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java



[inlong] branch master updated: [INLONG-6671][Sort] Supports dirty data side-output for kafka connector (#6688)

2022-11-30 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 09bae48fe [INLONG-6671][Sort] Supports dirty data side-output for 
kafka connector (#6688)
09bae48fe is described below

commit 09bae48fe06662bc8cd795a9272573a7ccd9135a
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Thu Dec 1 14:47:54 2022 +0800

[INLONG-6671][Sort] Supports dirty data side-output for kafka connector 
(#6688)
---
 .../base/dirty/sink/log/LogDirtySinkFactory.java   |   9 +-
 .../base/dirty/sink/s3/S3DirtySinkFactory.java |   8 +-
 .../base/dirty/utils/DirtySinkFactoryUtils.java|  48 ++
 .../kafka/DynamicKafkaSerializationSchema.java | 161 -
 .../apache/inlong/sort/kafka/KafkaDynamicSink.java |  18 ++-
 .../table/DynamicKafkaDeserializationSchema.java   |  59 +++-
 .../sort/kafka/table/KafkaDynamicSource.java   |  22 ++-
 .../sort/kafka/table/KafkaDynamicTableFactory.java |  49 +--
 .../table/UpsertKafkaDynamicTableFactory.java  |  25 +++-
 .../inlong/sort/parser/KafkaLoadSqlParseTest.java  | 150 +++
 10 files changed, 481 insertions(+), 68 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
index c3720a93d..b07b581f1 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.sort.base.dirty.sink.log;
 
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.factories.DynamicTableFactory.Context;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
@@ -38,9 +40,10 @@ public class LogDirtySinkFactory implements DirtySinkFactory 
{
 
 @Override
 public  DirtySink createDirtySink(Context context) {
-FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
-String format = 
context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FORMAT);
-String fieldDelimiter = 
context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+ReadableConfig config = 
Configuration.fromMap(context.getCatalogTable().getOptions());
+FactoryUtil.validateFactoryOptions(this, config);
+String format = config.get(DIRTY_SIDE_OUTPUT_FORMAT);
+String fieldDelimiter = config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
 return new LogDirtySink<>(format, fieldDelimiter,
 
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
index d9ec26434..16310926c 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.base.dirty.sink.s3;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.factories.DynamicTableFactory.Context;
@@ -79,9 +80,10 @@ public class S3DirtySinkFactory implements DirtySinkFactory {
 
 @Override
 public  DirtySink createDirtySink(Context context) {
-FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
-validate(context.getConfiguration());
-return new S3DirtySink<>(getS3Options(context.getConfiguration()),
+ReadableConfig config = 
Configuration.fromMap(context.getCatalogTable().getOptions());
+FactoryUtil.validateFactoryOptions(this, config);
+validate(config);
+return new S3DirtySink<>(getS3Options(config),
 
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
 }
 
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/DirtySinkFactoryUtils.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org

[inlong] branch master updated (bffb911d8 -> c1a07c38c)

2022-12-04 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from bffb911d8 [INLONG-6732][Manager] Fix wrong SortSDK topic properties 
(#6733)
 add c1a07c38c [INLONG-6724][Sort] Supports dirty data side-output for 
doris sink (#6725)

No new revisions were added by this update.

Summary of changes:
 .../apache/inlong/sort/base/dirty/DirtyType.java   |  17 +-
 .../table/DorisDynamicSchemaOutputFormat.java  | 339 +
 .../sort/doris/table/DorisDynamicTableFactory.java |  12 +-
 .../sort/doris/table/DorisDynamicTableSink.java|  21 +-
 .../DorisExtractNodeToDorisLoadNodeTest.java   |  13 +-
 .../inlong/sort/parser/DorisMultipleSinkTest.java  |  18 +-
 6 files changed, 278 insertions(+), 142 deletions(-)



[inlong] branch master updated: [INLONG-6575][Sort] Add dirty data metric for Filesystem (#6726)

2022-12-05 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new b7c6f49d7 [INLONG-6575][Sort] Add dirty data metric for Filesystem 
(#6726)
b7c6f49d7 is described below

commit b7c6f49d7fd1cd6c86419f991bb74426ea04252f
Author: Xin Gong 
AuthorDate: Tue Dec 6 11:51:31 2022 +0800

[INLONG-6575][Sort] Add dirty data metric for Filesystem (#6726)
---
 .../filesystem/stream/AbstractStreamingWriter.java | 46 +-
 1 file changed, 36 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 568b639f1..e5e74e62a 100644
--- 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -42,6 +42,10 @@ import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -78,7 +82,8 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 
 private transient long currentWatermark;
 
-private SinkMetricData metricData;
+private Long dataSize = 0L;
+private Long rowSize = 0L;
 
 public AbstractStreamingWriter(
 long bucketCheckInterval,
@@ -118,6 +123,8 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 .withInlongAudit(inlongAudit)
 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+.withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+.withInitDirtyBytes(metricState != null ? 
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
 .withRegisterMetric(RegisteredMetric.ALL)
 .build();
 if (metricOption != null) {
@@ -129,7 +136,20 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
  * Commit up to this checkpoint id.
  */
 protected void commitUpToCheckpoint(long checkpointId) throws Exception {
-helper.commitUpToCheckpoint(checkpointId);
+try {
+helper.commitUpToCheckpoint(checkpointId);
+if (sinkMetricData != null) {
+sinkMetricData.invoke(rowSize, dataSize);
+}
+} catch (Exception e) {
+if (sinkMetricData != null) {
+sinkMetricData.invokeDirty(rowSize, dataSize);
+}
+LOG.error("fileSystem sink commitUpToCheckpoint.", e);
+} finally {
+rowSize = 0L;
+dataSize = 0L;
+}
 }
 
 @Override
@@ -192,14 +212,20 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 }
 
 @Override
-public void processElement(StreamRecord element) throws Exception {
-helper.onElement(
-element.getValue(),
-getProcessingTimeService().getCurrentProcessingTime(),
-element.hasTimestamp() ? element.getTimestamp() : null,
-currentWatermark);
-if (metricData != null) {
-metricData.invokeWithEstimate(element.getValue());
+public void processElement(StreamRecord element) {
+try {
+helper.onElement(
+element.getValue(),
+getProcessingTimeService().getCurrentProcessingTime(),
+element.hasTimestamp() ? element.getTimestamp() : null,
+currentWatermark);
+rowSize = rowSize + 1;
+dataSize = dataSize + 
element.getValue().toString().getBytes(StandardCharsets.UTF_8).length;
+} catch (Exception e) {
+if (sinkMetricData != null) {
+sinkMetricData.invokeDirty(1L, 
element.getValue().toString().getBytes(StandardCharsets.UTF_8).length);
+}
+LOG.error("fileSystem sink processElement.", e);
 }
 }
 



[inlong] branch master updated: [INLONG-6751][Sort] Add read phase metric and table level metric for Oracle (#6808)

2022-12-13 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 3126faaf5 [INLONG-6751][Sort] Add read phase metric and table level 
metric for Oracle (#6808)
3126faaf5 is described below

commit 3126faaf5e8851aa62a8772eeb4096bf47034613
Author: emhui <111486498+e-m...@users.noreply.github.com>
AuthorDate: Tue Dec 13 16:49:37 2022 +0800

[INLONG-6751][Sort] Add read phase metric and table level metric for Oracle 
(#6808)
---
 .../base/metric/sub/SourceTableMetricData.java | 56 ---
 .../sort/cdc/base/util/CallbackCollector.java  | 47 
 .../inlong/sort/cdc/oracle/OracleSource.java   |  9 +++-
 .../oracle/debezium/DebeziumSourceFunction.java| 63 --
 .../sort/cdc/oracle/table/OracleTableSource.java   |  3 +-
 5 files changed, 152 insertions(+), 26 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
index 112000903..a11d73711 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
@@ -122,9 +122,14 @@ public class SourceTableMetricData extends 
SourceMetricData implements SourceSub
 String metricGroupLabels = labels.entrySet().stream().map(entry -> 
entry.getKey() + "=" + entry.getValue())
 .collect(Collectors.joining(DELIMITER));
 StringBuilder labelBuilder = new StringBuilder(metricGroupLabels);
-
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
-
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
-
+if (schemaInfoArray.length == 2) {
+
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
+} else if (schemaInfoArray.length == 3) {
+
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+
.append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaInfoArray[1])
+
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[2]);
+}
 MetricOption metricOption = MetricOption.builder()
 .withInitRecords(subMetricState != null ? 
subMetricState.getMetricValue(NUM_RECORDS_IN) : 0L)
 .withInitBytes(subMetricState != null ? 
subMetricState.getMetricValue(NUM_BYTES_IN) : 0L)
@@ -135,14 +140,18 @@ public class SourceTableMetricData extends 
SourceMetricData implements SourceSub
 }
 
 /**
- * build record schema identify,in the form of database.table
+ * build record schema identify,in the form of database.schema.table or 
database.table
  *
  * @param database the database name of record
+ * @param schema the schema name of record
  * @param table the table name of record
  * @return the record schema identify
  */
-public String buildSchemaIdentify(String database, String table) {
-return database + Constants.SEMICOLON + table;
+public String buildSchemaIdentify(String database, String schema, String 
table) {
+if (schema == null) {
+return database + Constants.SEMICOLON + table;
+}
+return database + Constants.SEMICOLON + schema + Constants.SEMICOLON + 
table;
 }
 
 /**
@@ -168,7 +177,7 @@ public class SourceTableMetricData extends SourceMetricData 
implements SourceSub
 outputMetricsWithEstimate(data);
 return;
 }
-String identify = buildSchemaIdentify(database, table);
+String identify = buildSchemaIdentify(database, null, table);
 SourceMetricData subSourceMetricData;
 if (subSourceMetricMap.containsKey(identify)) {
 subSourceMetricData = subSourceMetricMap.get(identify);
@@ -186,6 +195,39 @@ public class SourceTableMetricData extends 
SourceMetricData implements SourceSub
 outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE : 
ReadPhase.INCREASE_PHASE);
 }
 
+/**
+ * output metrics with estimate
+ *
+ * @param database the database name of record
+ * @param schema the schema name of record
+ * @param table the table name of record
+  

[inlong] branch master updated: [INLONG-6869][Sort] Supports dirty data side-output for elasticsearch sink (#6870)

2022-12-13 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 99947c98d [INLONG-6869][Sort] Supports dirty data side-output for 
elasticsearch sink (#6870)
99947c98d is described below

commit 99947c98d6437a694ccd3ffd1177f88add12dd31
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Wed Dec 14 15:23:38 2022 +0800

[INLONG-6869][Sort] Supports dirty data side-output for elasticsearch sink 
(#6870)
---
 .../inlong/sort/base/dirty/DirtySinkHelper.java| 108 ++
 .../apache/inlong/sort/base/dirty/DirtyType.java   |  16 +++
 .../sort/elasticsearch6/ElasticsearchSink.java |  20 +++-
 .../table/Elasticsearch6DynamicSink.java   |  16 ++-
 .../table/Elasticsearch6DynamicSinkFactory.java|  14 ++-
 .../sort/elasticsearch7/ElasticsearchSink.java |  19 +++-
 .../table/Elasticsearch7DynamicSink.java   |  16 ++-
 .../table/Elasticsearch7DynamicSinkFactory.java|  14 ++-
 .../sort/elasticsearch/ElasticsearchSinkBase.java  | 122 +++-
 .../table/RowElasticsearchSinkFunction.java| 124 ++---
 .../sort/parser/ElasticsearchSqlParseTest.java |  10 +-
 11 files changed, 369 insertions(+), 110 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
new file mode 100644
index 0..a962b974e
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
+/**
+ * Dirty sink helper, it helps dirty data sink for {@link DirtySink}
+ * @param 
+ */
+public class DirtySinkHelper implements Serializable {
+
+private static final long serialVersionUID = 1L;
+private static final Logger LOGGER = 
LoggerFactory.getLogger(DirtySinkHelper.class);
+
+private DirtyOptions dirtyOptions;
+private final @Nullable DirtySink dirtySink;
+
+public DirtySinkHelper(DirtyOptions dirtyOptions, @Nullable DirtySink 
dirtySink) {
+this.dirtyOptions = Preconditions.checkNotNull(dirtyOptions, 
"dirtyOptions is null");
+this.dirtySink = dirtySink;
+}
+
+/**
+ * Open for dirty sink
+ *
+ * @param configuration The configuration that is used for dirty sink
+ */
+public void open(Configuration configuration) {
+if (dirtySink != null) {
+try {
+dirtySink.open(configuration);
+} catch (Exception e) {
+throw new RuntimeException(e);
+}
+}
+}
+
+/**
+ * Dirty data sink
+ * @param dirtyData The dirty data
+ * @param dirtyType The dirty type {@link DirtyType}
+ * @param e The cause of dirty data
+ */
+public void invoke(T dirtyData, DirtyType dirtyType, Throwable e) {
+if (!dirtyOptions.ignoreDirty()) {
+RuntimeException ex;
+if (e instanceof RuntimeException) {
+ex = (RuntimeException) e;
+} else {
+ex = new RuntimeException(e);
+}
+throw ex;
+}
+if (dirtySink != null) {
+DirtyData.Builder builder = DirtyData.builder();
+try {
+builder.setData(dirtyData)
+.setDirtyType(dirtyType)
+.setLabels(dirtyOptions.getLabels())
+.setLogTag(dirtyOptions.getLogTag())
+.setDirtyMessage(e.getMessage())
+  

[inlong] branch master updated (59c83c226 -> 8d0397700)

2022-12-14 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 59c83c226 [INLONG-6890][DataProxy] Fix tubemq MQ_TYPE enum and 
optimize sink code and config file (#6891)
 add 8d0397700 [INLONG-6884][Sort] Add dirty message for kafka connector 
(#6885)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java  | 3 +++
 .../inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java | 2 +-
 2 files changed, 4 insertions(+), 1 deletion(-)



[inlong] branch master updated: [INLONG-7186][Sort] Fix time zone incorrect (#7187)

2023-01-08 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new d7a639b5b [INLONG-7186][Sort] Fix time zone incorrect (#7187)
d7a639b5b is described below

commit d7a639b5b215e4df95ff6b4c717654557a5be247
Author: emhui <111486498+e-m...@users.noreply.github.com>
AuthorDate: Mon Jan 9 10:57:06 2023 +0800

[INLONG-7186][Sort] Fix time zone incorrect (#7187)

* [INLONG-7186][Sort] Fix time zone incorrect

* [INLONG-7186][Sort] Remove unused imports
---
 .../apache/inlong/sort/base/format/JsonToRowDataConverters.java| 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java
index b30e707ec..5d8b8206b 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.base.format;
 
+import java.sql.Timestamp;
+import java.time.ZoneId;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.formats.common.TimestampFormat;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -49,7 +51,6 @@ import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
-import java.time.ZoneOffset;
 import java.time.format.DateTimeParseException;
 import java.time.temporal.TemporalAccessor;
 import java.time.temporal.TemporalQueries;
@@ -253,7 +254,7 @@ public class JsonToRowDataConverters implements 
Serializable {
 LocalTime localTime = 
parsedTimestamp.query(TemporalQueries.localTime());
 LocalDate localDate = 
parsedTimestamp.query(TemporalQueries.localDate());
 
-return TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, 
localTime));
+return 
TimestampData.fromEpochMillis(Timestamp.valueOf(LocalDateTime.of(localDate, 
localTime)).getTime());
 }
 
 private TimestampData convertToTimestampWithLocalZone(JsonNode jsonNode) {
@@ -277,7 +278,7 @@ public class JsonToRowDataConverters implements 
Serializable {
 LocalDate localDate = 
parsedTimestampWithLocalZone.query(TemporalQueries.localDate());
 
 return TimestampData.fromInstant(
-LocalDateTime.of(localDate, 
localTime).toInstant(ZoneOffset.UTC));
+LocalDateTime.of(localDate, 
localTime).atZone(ZoneId.systemDefault()).toInstant());
 }
 
 private StringData convertToString(JsonNode jsonNode) {



[inlong] branch master updated: [INLONG-7335][Sort] Fix Hbase connector lost spi file when shade (#7336)

2023-02-08 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 1385e370c [INLONG-7335][Sort] Fix Hbase connector lost spi file when 
shade (#7336)
1385e370c is described below

commit 1385e370ce521108ed87fe206c32a91eacaf73bd
Author: Xin Gong 
AuthorDate: Wed Feb 8 16:09:45 2023 +0800

[INLONG-7335][Sort] Fix Hbase connector lost spi file when shade (#7336)
---
 inlong-sort/sort-connectors/hbase/pom.xml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/inlong-sort/sort-connectors/hbase/pom.xml 
b/inlong-sort/sort-connectors/hbase/pom.xml
index fd26a1485..019ae7819 100644
--- a/inlong-sort/sort-connectors/hbase/pom.xml
+++ b/inlong-sort/sort-connectors/hbase/pom.xml
@@ -91,6 +91,7 @@
 hbase-default.xml
 hbase-default.xml
 
+
 
 false
 



[inlong] branch master updated: [INLONG-7377][Sort] Fix sort-dist protobuf conflicts with protobuf in sort-connectors (#7378)

2023-02-15 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new a8a65810e [INLONG-7377][Sort] Fix sort-dist protobuf conflicts with 
protobuf in sort-connectors (#7378)
a8a65810e is described below

commit a8a65810e7639292e021b0a039a7b95b7629a4d1
Author: Schnapps 
AuthorDate: Thu Feb 16 15:27:48 2023 +0800

[INLONG-7377][Sort] Fix sort-dist protobuf conflicts with protobuf in 
sort-connectors (#7378)
---
 inlong-sort/sort-connectors/filesystem/pom.xml| 1 +
 inlong-sort/sort-connectors/hbase/pom.xml | 1 +
 inlong-sort/sort-connectors/hudi/pom.xml  | 1 +
 inlong-sort/sort-connectors/iceberg/pom.xml   | 1 +
 inlong-sort/sort-connectors/kafka/pom.xml | 1 +
 inlong-sort/sort-connectors/mongodb-cdc/pom.xml   | 1 +
 inlong-sort/sort-connectors/mysql-cdc/pom.xml | 1 +
 inlong-sort/sort-connectors/oracle-cdc/pom.xml| 1 +
 inlong-sort/sort-connectors/postgres-cdc/pom.xml  | 1 +
 inlong-sort/sort-connectors/pulsar/pom.xml| 1 +
 inlong-sort/sort-connectors/sqlserver-cdc/pom.xml | 1 +
 inlong-sort/sort-dist/pom.xml | 4 ++--
 12 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/inlong-sort/sort-connectors/filesystem/pom.xml 
b/inlong-sort/sort-connectors/filesystem/pom.xml
index 54545dab1..57735743a 100644
--- a/inlong-sort/sort-connectors/filesystem/pom.xml
+++ b/inlong-sort/sort-connectors/filesystem/pom.xml
@@ -53,6 +53,7 @@
 
 
 org.apache.inlong:*
+com.google.protobuf:*
 
 
 
diff --git a/inlong-sort/sort-connectors/hbase/pom.xml 
b/inlong-sort/sort-connectors/hbase/pom.xml
index 019ae7819..823992ac0 100644
--- a/inlong-sort/sort-connectors/hbase/pom.xml
+++ b/inlong-sort/sort-connectors/hbase/pom.xml
@@ -108,6 +108,7 @@
 
org.apache.commons:commons-crypto
 
org.apache.commons:commons-lang3
 io.netty:netty-all
+com.google.protobuf:*
 
io.dropwizard.metrics:metrics-core
 
 
diff --git a/inlong-sort/sort-connectors/hudi/pom.xml 
b/inlong-sort/sort-connectors/hudi/pom.xml
index 660e17113..e62f42f8d 100644
--- a/inlong-sort/sort-connectors/hudi/pom.xml
+++ b/inlong-sort/sort-connectors/hudi/pom.xml
@@ -111,6 +111,7 @@
 com.fasterxml.woodstox:*
 org.codehaus.woodstox:*
 com.google.guava:*
+com.google.protobuf:*
 
 
 
diff --git a/inlong-sort/sort-connectors/iceberg/pom.xml 
b/inlong-sort/sort-connectors/iceberg/pom.xml
index c3cb35176..7d33b4a6e 100644
--- a/inlong-sort/sort-connectors/iceberg/pom.xml
+++ b/inlong-sort/sort-connectors/iceberg/pom.xml
@@ -87,6 +87,7 @@
 org.apache.iceberg:*
 
org.apache.hive:hive-exec
 
org.apache.thrift:libfb303
+com.google.protobuf:*
 
 
 
diff --git a/inlong-sort/sort-connectors/kafka/pom.xml 
b/inlong-sort/sort-connectors/kafka/pom.xml
index 1f645cf91..cb77f730e 100644
--- a/inlong-sort/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-connectors/kafka/pom.xml
@@ -65,6 +65,7 @@
 
 org.apache.inlong:*
 org.apache.kafka:*
+com.google.protobuf:*
 
org.apache.flink:flink-connector-kafka_${scala.binary.version}
 
 
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml 
b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
index c3019e20c..38bb08f93 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
@@ -99,6 +99,7 @@
 com.google.guava:*
 
 
org.apache.flink:flink-shaded-guava
+com.google.protobuf:*
 
 
 
diff --git a/inlong-sort/sort-connectors/mysql-cdc/pom.xml 
b/inlong-sort/sort

[inlong-website] branch master updated: [INLONG-687][Doc] Optimize the standalone deployment guide (#688)

2023-02-23 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
 new 1ac61284f3 [INLONG-687][Doc] Optimize the standalone deployment guide 
(#688)
1ac61284f3 is described below

commit 1ac61284f383daa77e5277f256f0a290811647ab
Author: Charles Zhang 
AuthorDate: Thu Feb 23 17:23:41 2023 +0800

[INLONG-687][Doc] Optimize the standalone deployment guide (#688)
---
 docs/deployment/docker.md  |  9 -
 docs/deployment/standalone.md  | 40 --
 .../current/deployment/docker.md   |  4 +--
 .../current/deployment/standalone.md   | 40 --
 4 files changed, 70 insertions(+), 23 deletions(-)

diff --git a/docs/deployment/docker.md b/docs/deployment/docker.md
index 25ffaf2141..186b880bae 100644
--- a/docs/deployment/docker.md
+++ b/docs/deployment/docker.md
@@ -23,6 +23,13 @@ cd docker/docker-compose
 docker-compose up -d
 ```
 
+## Cluster Initialize
+When all containers are successfully started, you can access the Inlong 
dashboard address `http: // localhost`, and use the following default account 
to log in:
+```
+User: admin
+Password: inlong
+```
+
 ### Create Cluster Tag
 Click [Clusters]->[ClusterTags]->[Create] on the page to specify the cluster 
label name and person in charge:
 ![](img/create_cluster_tag.png)
@@ -41,7 +48,7 @@ The ClusterTags selects the newly created `default_cluster`, 
the Pulsar cluster
 Service URL is `pulsar://pulsar:6650`, Admin URL is `http://pulsar:8080`.
 :::
 
-### Create Data Stream
+## Use
 You can refer [Pulsar Example](quick_start/pulsar_example.md) to create Data 
Stream.
 
 ## Destroy
diff --git a/docs/deployment/standalone.md b/docs/deployment/standalone.md
index 7a08b04ffd..170c7a4f99 100644
--- a/docs/deployment/standalone.md
+++ b/docs/deployment/standalone.md
@@ -16,7 +16,9 @@ InLong Support the following Message Queue services now, you 
can choose one of t
 ## Download the Binary Package
 You can get binary package from [Download 
Page](https://inlong.apache.org/download) ,or you can build the InLong refer to 
[How to Build](quick_start/how_to_build.md).
 
+:::note
 Extract `apache-inlong-[version]-bin.tar.gz` and 
`apache-inlong-[version]-sort-connectors.tar.gz`, and make sure the 
`inlong-sort/connectors/` directory contains 
`sort-connector-[type]-[version].jar`.
+:::
 
 ## DB Dependencies
 - If the backend database is MySQL, please download 
[mysql-connector-java-8.0.27.jar](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar)
 and put it into the following directories:
@@ -33,12 +35,14 @@ In `conf/inlong.conf`, configure the parameters according 
to the actual situatio
 ```shell
 # local IP
 local_ip=
-# Configure MySQL
+# Configure Database, MySQL or PostgreSQL
 spring_datasource_hostname=
 spring_datasource_port=3306
 spring_datasource_username=root
 spring_datasource_password=inlong
-# Configure Pulsar or TubeMQ Address
+# Configure Pulsar Address if use Pulsar for Audit
+pulsar_service_url=pulsar://172.17.0.2:6650
+pulsar_admin_url=http://172.17.0.2:8080
 # the REST server address for Flink
 flink_rest_address=
 # the REST server Port for Flink
@@ -50,12 +54,28 @@ flink_rest_port=8081
 bin/inlong-daemon start standalone
 ```
 
-## Register Message Queue
-You can register message queue for Manger according to [Register MQ 
Cluster](https://inlong.apache.org/docs/next/modules/manager/quick_start/#register-mq-cluster).
+## Cluster Initialize
+When all containers are successfully started, you can access the Inlong 
dashboard address `http: // localhost`, and use the following default account 
to log in:
+```
+User: admin
+Password: inlong
+```
 
-## Check
-After all component run successfully, you can access `http://localhost` with 
default account:
-```shell
-user: admin
-password: inlong
-```
\ No newline at end of file
+### Create Cluster Tag
+Click [Clusters]->[ClusterTags]->[Create] on the page to specify the cluster 
label name and person in charge:
+![](img/create_cluster_tag.png)
+
+:::caution
+Since each component reports the ClusterTags as `default_cluster` by default, 
do not use other names.
+:::
+
+### Register Pulsar Cluster
+Click [Clusters]->[ClusterTags]->[Create] on the page to register Pulsar 
Cluster:
+![](img/create_pulsar_cluster.png)
+
+:::note
+The ClusterTags selects the newly created `default_cluster`, and then 
configuring the Pulsar cluster info.
+:::
+
+## Use
+You can refer [Pulsar Example](quick_start/pulsar_example.md) to create Data 
Stream.
\ No newline at end of file
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/deployment/docker.md 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/deployment/docker.md
index 309de929c2..5d7b1753aa 10064

[inlong] branch master updated (c9a640d4c -> 90b678876)

2022-08-23 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from c9a640d4c [INLONG-5437][SDK] Support initializing SDK(cpp) by 
ClientConfig object (#5641)
 add 90b678876 [INLONG-5637][Sort] Fix kafka load node npe error (#5638)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java | 8 ++--
 1 file changed, 2 insertions(+), 6 deletions(-)



[inlong] branch master updated: [INLONG-5913][Sort] Add metric state for oracle and fix speed computing error (#5935)

2022-09-19 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 4b4f0dc2f [INLONG-5913][Sort] Add metric state for oracle and fix 
speed computing error (#5935)
4b4f0dc2f is described below

commit 4b4f0dc2fb7fdfd9011e75db2738ab3cd35eaa5d
Author: Xin Gong 
AuthorDate: Tue Sep 20 11:15:15 2022 +0800

[INLONG-5913][Sort] Add metric state for oracle and fix speed computing 
error (#5935)
---
 .../inlong/sort/base/metric/SinkMetricData.java| 28 
 .../inlong/sort/base/metric/SourceMetricData.java  | 19 +--
 .../sort/cdc/oracle/DebeziumSourceFunction.java| 38 ++
 3 files changed, 54 insertions(+), 31 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index d065496e4..34f759a83 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -66,28 +66,26 @@ public class SinkMetricData implements MetricData {
 registerMetricsForDirtyRecords(new ThreadSafeCounter());
 break;
 case NORMAL:
-registerMetricsForNumBytesOut(new ThreadSafeCounter());
-registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-registerMetricsForNumBytesOutPerSecond();
-registerMetricsForNumRecordsOutPerSecond();
-
 recordsOutCounter.inc(option.getInitRecords());
 bytesOutCounter.inc(option.getInitBytes());
-registerMetricsForNumRecordsOutForMeter(recordsOutCounter);
-registerMetricsForNumRecordsOutForMeter(bytesOutCounter);
+registerMetricsForNumBytesOut(bytesOutCounter);
+registerMetricsForNumRecordsOut(recordsOutCounter);
+registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter());
+registerMetricsForNumRecordsOutForMeter(new 
ThreadSafeCounter());
+registerMetricsForNumBytesOutPerSecond();
+registerMetricsForNumRecordsOutPerSecond();
 break;
 default:
 registerMetricsForDirtyBytes(new ThreadSafeCounter());
 registerMetricsForDirtyRecords(new ThreadSafeCounter());
-registerMetricsForNumBytesOut(new ThreadSafeCounter());
-registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-registerMetricsForNumBytesOutPerSecond();
-registerMetricsForNumRecordsOutPerSecond();
-
 recordsOutCounter.inc(option.getInitRecords());
 bytesOutCounter.inc(option.getInitBytes());
-registerMetricsForNumRecordsOutForMeter(recordsOutCounter);
-registerMetricsForNumRecordsOutForMeter(bytesOutCounter);
+registerMetricsForNumBytesOut(bytesOutCounter);
+registerMetricsForNumRecordsOut(recordsOutCounter);
+registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter());
+registerMetricsForNumRecordsOutForMeter(new 
ThreadSafeCounter());
+registerMetricsForNumBytesOutPerSecond();
+registerMetricsForNumRecordsOutPerSecond();
 break;
 
 }
@@ -267,7 +265,7 @@ public class SinkMetricData implements MetricData {
 }
 
 if (numBytesOutForMeter != null) {
-numBytesOutForMeter.inc(rowCount);
+numBytesOutForMeter.inc(rowSize);
 }
 
 if (auditImp != null) {
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 3cffcfe54..3ac6a96f8 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -54,23 +54,22 @@ public class SourceMetricData implements MetricData {
 this.metricGroup = metricGroup;
 this.labels = option.getLabels();
 
-SimpleCounter recordsInCounter = new SimpleCounter();
-SimpleCounter bytesInCounter = new SimpleCounter();
+ThreadSafeCounter recordsInCounter = new ThreadSafeCounter();
+ThreadSafeCounter bytesInCounter = new ThreadSafeCounter();
 switch (option.getRegisteredMetric()) {
 default:
-registerMetricsForNumRecordsIn

[inlong] branch master updated: [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)

2022-09-21 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new a859e7973 [INLONG-5969][Sort] Support metrics state restore for hive 
connector (#5972)
a859e7973 is described below

commit a859e7973b699c4b3a4e6c8d5919d63a8e41efb7
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:02:06 2022 +0800

[INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)

Co-authored-by: thesumery <158971...@qq.com>
---
 .../hive/filesystem/AbstractStreamingWriter.java   | 31 ++
 1 file changed, 31 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
 
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index ab2e845f2..dd9456203 100644
--- 
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ 
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -18,6 +18,10 @@
 
 package org.apache.inlong.sort.hive.filesystem;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -34,9 +38,16 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+
 import javax.annotation.Nullable;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * Operator for file system sink. It is a operator version of {@link 
StreamingFileSink}. It can send
  * file and bucket information to downstream.
@@ -70,6 +81,8 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 
 @Nullable
 private transient SinkMetricData metricData;
+private transient ListState metricStateListState;
+private transient MetricState metricState;
 
 public AbstractStreamingWriter(
 long bucketCheckInterval,
@@ -113,6 +126,8 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 MetricOption metricOption = MetricOption.builder()
 .withInlongLabels(inlongMetric)
 .withInlongAudit(auditHostAndPorts)
+.withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+.withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
 .withRegisterMetric(RegisteredMetric.ALL)
 .build();
 if (metricOption != null) {
@@ -151,12 +166,28 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 bucketCheckInterval);
 
 currentWatermark = Long.MIN_VALUE;
+
+// init metric state
+if (this.inlongMetric != null) {
+this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+new ListStateDescriptor<>(
+INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint() {
+})));
+}
+if (context.isRestored()) {
+metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
+}
 }
 
 @Override
 public void snapshotState(StateSnapshotContext context) throws Exception {
 super.snapshotState(context);
 helper.snapshotState(context.getCheckpointId());
+if (metricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, 
metricData,
+getRuntimeContext().getIndexOfThisSubtask());
+}
 }
 
 @Override



[inlong] branch master updated: [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973)

2022-09-21 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 0394181db [INLONG-5970][Sort] Support metrics state restore for 
iceberg connector (#5973)
0394181db is described below

commit 0394181db774a946c88cf3ee2b0668e644e81465
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:04:01 2022 +0800

[INLONG-5970][Sort] Support metrics state restore for iceberg connector 
(#5973)

Co-authored-by: thesumery <158971...@qq.com>
---
 .../sort/iceberg/sink/IcebergStreamWriter.java | 41 ++
 1 file changed, 41 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 8318c7177..6f1b75ea0 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -19,6 +19,12 @@
 
 package org.apache.inlong.sort.iceberg.sink;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -30,11 +36,17 @@ import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
@@ -53,6 +65,8 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 private transient int attemptId;
 @Nullable
 private transient SinkMetricData metricData;
+private transient ListState metricStateListState;
+private transient MetricState metricState;
 
 IcebergStreamWriter(
 String fullTableName,
@@ -81,6 +95,8 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 MetricOption metricOption = MetricOption.builder()
 .withInlongLabels(inlongMetric)
 .withInlongAudit(auditHostAndPorts)
+.withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+.withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
 .withRegisterMetric(RegisteredMetric.ALL)
 .build();
 if (metricOption != null) {
@@ -105,6 +121,31 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 }
 }
 
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+super.initializeState(context);
+// init metric state
+if (this.inlongMetric != null) {
+this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+new ListStateDescriptor<>(
+INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint() {
+})));
+}
+if (context.isRestored()) {
+metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
+}
+}
+
+@Override
+public void snapshotState(StateSnapshotContext context) throws Exception {
+super.snapshotState(context);
+if (metricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, 
metricData,
+getRuntimeContext().getIndexOfThisSubtask());
+}
+}
+
 @Override
 public void dispose() throws Exception {
 super.dispose();



[inlong] branch master updated: [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974)

2022-09-21 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 341aa987b [INLONG-5971][Sort] Support metrics state restore for dlc 
connector (#5974)
341aa987b is described below

commit 341aa987ba86d0598af321d8e5a164a02224ff26
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:05:23 2022 +0800

[INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974)

Co-authored-by: thesumery <158971...@qq.com>
---
 .../inlong/sort/base/metric/MetricOption.java  |  8 +
 .../iceberg/flink/sink/IcebergStreamWriter.java| 41 ++
 2 files changed, 49 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
index f4c679f9c..8cf0d6f01 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -111,6 +111,14 @@ public class MetricOption {
 return initBytes;
 }
 
+public void setInitRecords(long initRecords) {
+this.initRecords = initRecords;
+}
+
+public void setInitBytes(long initBytes) {
+this.initBytes = initBytes;
+}
+
 public static Builder builder() {
 return new Builder();
 }
diff --git 
a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
index bb00b7808..ef7612743 100644
--- 
a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
@@ -19,6 +19,12 @@
 
 package org.apache.inlong.sort.iceberg.flink.sink;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -29,11 +35,17 @@ import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
@@ -51,6 +63,8 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 private transient int attemptId;
 @Nullable
 private transient SinkMetricData metricData;
+private transient ListState metricStateListState;
+private transient MetricState metricState;
 
 IcebergStreamWriter(String fullTableName,
 TaskWriterFactory taskWriterFactory,
@@ -74,6 +88,8 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 
 // Initialize metric
 if (metricOption != null) {
+metricOption.setInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L);
+metricOption.setInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L);
 metricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
 }
 }
@@ -95,6 +111,31 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 }
 }
 
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+super.initializeState(context);
+// init metric state
+if (this.metricData != null) {
+this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+new ListStateDescriptor<>(
+INLONG

[inlong] branch master updated (4f50594a6 -> c164d788f)

2022-10-09 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 4f50594a6 [INLONG-6094][Kubernetes] Add liveness reliability for 
modules (#6095)
 add c164d788f [INLONG-6089][Manager][Sort] kafka extract node support raw 
format and scan mode support timestamp (#6100)

No new revisions were added by this update.

Summary of changes:
 .../manager/pojo/sort/util/ExtractNodeUtils.java   | 10 -
 .../manager/pojo/source/kafka/KafkaOffset.java |  1 +
 .../manager/pojo/source/kafka/KafkaSource.java |  3 ++
 .../manager/pojo/source/kafka/KafkaSourceDTO.java  |  4 ++
 .../pojo/source/kafka/KafkaSourceRequest.java  |  6 ++-
 .../sort/protocol/constant/KafkaConstant.java  |  2 +
 .../sort/protocol/enums/KafkaScanStartupMode.java  |  3 +-
 .../protocol/node/extract/KafkaExtractNode.java| 22 --
 .../node/extract/KafkaExtractNodeTest.java | 36 ++-
 .../sort/parser/DataTypeConvertSqlParseTest.java   |  2 +-
 .../sort/parser/DecimalFormatSqlParseTest.java |  2 +-
 .../sort/parser/DistinctNodeSqlParseTest.java  |  6 +--
 .../inlong/sort/parser/FlinkSqlParserTest.java |  6 +--
 .../sort/parser/FullOuterJoinSqlParseTest.java | 14 +++---
 .../sort/parser/InnerJoinRelationSqlParseTest.java | 10 ++---
 .../parser/IntervalJoinRelationSqlParseTest.java   |  4 +-
 .../inlong/sort/parser/KafkaSqlParseTest.java  | 51 +-
 .../sort/parser/LeftOuterJoinSqlParseTest.java | 10 ++---
 .../inlong/sort/parser/MetaFieldSyncTest.java  |  2 +-
 .../MySqlTemporalJoinRelationSqlParseTest.java |  2 +-
 .../sort/parser/RightOuterJoinSqlParseTest.java| 10 ++---
 21 files changed, 164 insertions(+), 42 deletions(-)



[inlong] branch master updated (5d676a887 -> 31e9f9013)

2022-10-11 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 5d676a887 [INLONG-6131][Agent] Support file filtering by condition 
(#6132)
 add 31e9f9013 [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode 
(#6123)

No new revisions were added by this update.

Summary of changes:
 .../sort/protocol/node/format/AvroFormat.java  |   9 +-
 .../sort/protocol/node/format/CanalJsonFormat.java |   7 +
 .../sort/protocol/node/format/CsvFormat.java   |   9 +-
 .../protocol/node/format/DebeziumJsonFormat.java   |   9 +-
 .../inlong/sort/protocol/node/format/Format.java   |   7 +
 .../sort/protocol/node/format/InLongMsgFormat.java |   9 +-
 .../sort/protocol/node/format/JsonFormat.java  |  10 +-
 .../sort/protocol/node/format/RawFormat.java   |  15 +-
 .../sort/protocol/node/load/KafkaLoadNode.java |  44 +-
 .../org/apache/inlong/sort/base/Constants.java |  26 ++--
 .../base/format/AbstractDynamicSchemaFormat.java   | 116 ++
 .../base/format/CanalJsonDynamicSchemaFormat.java  |  58 +++
 .../format/DebeziumJsonDynamicSchemaFormat.java|  58 +++
 .../base/format/DynamicSchemaFormatFactory.java|  54 +++
 .../sort/base/format/JsonDynamicSchemaFormat.java  | 168 +
 .../format/CanalJsonDynamicSchemaFormatTest.java   |  88 +++
 .../DebeziumJsonDynamicSchemaFormatTest.java   |  79 ++
 .../base/format/DynamicSchemaFormatBaseTest.java   |  63 
 .../kafka/DynamicKafkaSerializationSchema.java |  45 --
 .../apache/inlong/sort/kafka/KafkaDynamicSink.java |  30 +++-
 .../sort/kafka/table/KafkaDynamicTableFactory.java | 118 ---
 .../table/UpsertKafkaDynamicTableFactory.java  |   5 +-
 .../inlong/sort/parser/KafkaLoadSqlParseTest.java  |  54 ++-
 23 files changed, 1023 insertions(+), 58 deletions(-)
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatBaseTest.java



[inlong] branch master updated (31e9f9013 -> a225671e5)

2022-10-11 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 31e9f9013 [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode 
(#6123)
 add a225671e5 [INLONG-6113][Sort] Mysql cdc connector support read table 
schema when using debezium function (#6128)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/inlong/common/enums/MetaField.java  |  7 +-
 .../org/apache/inlong/sort/protocol/Metadata.java  |  5 +-
 .../protocol/node/extract/MySqlExtractNode.java|  8 ++-
 .../node/extract/MySqlExtractNodeTest.java |  1 +
 .../sort/cdc/debezium/DebeziumSourceFunction.java  | 10 +++
 .../debezium/internal/DebeziumChangeFetcher.java   | 12 +++-
 .../internal/FlinkDatabaseSchemaHistory.java   |  2 +-
 .../cdc/mysql/table/MySqlReadableMetadata.java | 28 +---
 .../apache/inlong/sort/parser/AllMigrateTest.java  | 79 ++
 .../inlong/sort/formats/json/canal/CanalJson.java  | 13 
 10 files changed, 137 insertions(+), 28 deletions(-)



[inlong] branch master updated: [INLONG-6188] join supports multiple fields (#6189)

2022-10-19 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 153fdaf96 [INLONG-6188] join supports multiple fields (#6189)
153fdaf96 is described below

commit 153fdaf967197d1a8b3c3558fb83e7987bc10716
Author: jiachengjiang <108396409+jjiach...@users.noreply.github.com>
AuthorDate: Thu Oct 20 11:10:15 2022 +0800

[INLONG-6188] join supports multiple fields (#6189)
---
 .../org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java
index 69ab47153..5896e2887 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java
@@ -148,7 +148,7 @@ public class NodeRelationUtils {
 StreamField leftField = leftJoinFields.get(index);
 StreamField rightField = rightJoinFields.get(index);
 LogicOperator operator;
-if (index != leftJoinFields.size() - 1) {
+if (index != 0) {
 operator = AndOperator.getInstance();
 } else {
 operator = EmptyOperator.getInstance();



[inlong] branch master updated (153fdaf96 -> d062fb335)

2022-10-19 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 153fdaf96 [INLONG-6188] join supports multiple fields (#6189)
 add d062fb335 [INLONG-6224][Sort] Import schema and data parsing ability 
for DynamicSchemaFormat (#6225)

No new revisions were added by this update.

Summary of changes:
 inlong-sort/sort-connectors/base/pom.xml   |  7 ++
 .../org/apache/inlong/sort/base/Constants.java | 22 ---
 .../base/format/AbstractDynamicSchemaFormat.java   | 35 ++
 .../base/format/CanalJsonDynamicSchemaFormat.java  | 55 +++-
 .../format/DebeziumJsonDynamicSchemaFormat.java| 63 --
 .../sort/base/format/JsonDynamicSchemaFormat.java  | 77 ++
 6 files changed, 243 insertions(+), 16 deletions(-)



[inlong] branch master updated (98a58b011 -> e696c203a)

2023-03-01 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 98a58b011 [INLONG-7437][Sort] Support metrics for Oracle CDC Connector 
with incremental snapshot enabled (#7443)
 add e696c203a [INLONG-7477][Sort] Fix the metadata of table write error 
for canal-json (#7478)

No new revisions were added by this update.

Summary of changes:
 .../CanalJsonEnhancedSerializationSchema.java  | 44 --
 1 file changed, 40 insertions(+), 4 deletions(-)



[inlong-website] branch master updated: [INLONG-702][Doc] Remove unsupported options in MongoDB CDC 2.3 (#703)

2023-03-05 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
 new 7f16fff6a2 [INLONG-702][Doc] Remove unsupported options in MongoDB CDC 
2.3 (#703)
7f16fff6a2 is described below

commit 7f16fff6a28a1ae9a4d91434930d9c8fbb63190b
Author: emhui <111486498+e-m...@users.noreply.github.com>
AuthorDate: Mon Mar 6 11:18:21 2023 +0800

[INLONG-702][Doc] Remove unsupported options in MongoDB CDC 2.3 (#703)

* [INLONG-702][Doc] Remove unsupported options in MongoDB CDC 2.3

* [INLONG-702][Doc] Modify the default values of some options
---
 docs/data_node/extract_node/mongodb-cdc.md | 10 +++---
 .../current/data_node/extract_node/mongodb-cdc.md  | 10 +++---
 2 files changed, 6 insertions(+), 14 deletions(-)

diff --git a/docs/data_node/extract_node/mongodb-cdc.md 
b/docs/data_node/extract_node/mongodb-cdc.md
index 601ef3f9d4..4eca173ad3 100644
--- a/docs/data_node/extract_node/mongodb-cdc.md
+++ b/docs/data_node/extract_node/mongodb-cdc.md
@@ -125,14 +125,10 @@ TODO: It will be supported in the future.
 | database  | required | (none)   | String   | 
Name of the database to watch for changes.   |
 | collection| required | (none)   | String   | 
Name of the collection in the database to watch for changes. |
 | connection.options| optional | (none)   | String   | The 
ampersand-separated [connection 
options](https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options)
 of MongoDB. eg. `replicaSet=test&connectTimeoutMS=30` |
-| errors.tolerance  | optional | none | String   | 
Whether to continue processing messages if an error is encountered. Accept 
`none` or `all`. When set to `none`, the connector reports an error and blocks 
further processing of the rest of the records when it encounters an error. When 
set to `all`, the connector silently ignores any bad messages. |
-| errors.log.enable | optional | true | Boolean  | 
Whether details of failed operations should be written to the log file. |
 | copy.existing | optional | true | Boolean  | 
Whether copy existing data from source collections.  |
-| copy.existing.pipeline| optional | (none)   | String   | An 
array of JSON objects describing the pipeline operations to run when copying 
existing data. This can improve the use of indexes by the copying manager and 
make copying more efficient. eg. `[{"$match": {"closed": "false"}}]` ensures 
that only documents in which the closed field is set to false are copied. |
-| copy.existing.max.threads | optional | Processors Count | Integer  | The 
number of threads to use when performing the data copy.  |
-| copy.existing.queue.size  | optional | 16000| Integer  | The 
max size of the queue to use when copying data.  |
-| poll.max.batch.size   | optional | 1000 | Integer  | 
Maximum number of change stream documents to include in a single batch when 
polling for new data. |
-| poll.await.time.ms| optional | 1500 | Integer  | The 
amount of time to wait before checking for new results on the change stream. |
+| copy.existing.queue.size  | optional | 10240| Integer  | The 
max size of the queue to use when copying data.  |
+| poll.max.batch.size   | optional | 1024 | Integer  | 
Maximum number of change stream documents to include in a single batch when 
polling for new data. |
+| poll.await.time.ms| optional | 1000 | Integer  | The 
amount of time to wait before checking for new results on the change stream. |
 | heartbeat.interval.ms | optional | 0| Integer  | The 
length of time in milliseconds between sending heartbeat messages. Use 0 to 
disa |
 | inlong.metric.labels | optional | (none) | String | Inlong metric label, 
format of value is groupId=`{groupId}`&streamId=`{streamId}`&nodeId=`{nodeId}`. 
|
 ## Available Metadata
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mongodb-cdc.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mongodb-cdc.md
index ad7f76b0a9..c3b94c189b 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mongodb-cdc.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mongodb-cdc.md
@@ -125,14 +125,10 @@ TODO: 未来会支持
 | database  | 必须 | (none) | String   | 
要监视更改的数据库的名称。   |
 | collection| 必须   

[inlong] branch master updated: [INLONG-7550][Sort] Optimize the log printing level of dirty data to avoid generating a large number of logs (#7551)

2023-03-07 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new d1dc70ee7 [INLONG-7550][Sort] Optimize the log printing level of dirty 
data to avoid generating a large number of logs (#7551)
d1dc70ee7 is described below

commit d1dc70ee7963a600d0f69f6c27af3a519494f852
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Wed Mar 8 12:22:07 2023 +0800

[INLONG-7550][Sort] Optimize the log printing level of dirty data to avoid 
generating a large number of logs (#7551)
---
 .../org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java | 5 ++---
 .../java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java  | 2 +-
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
index b7f7af914..2884ac398 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
@@ -46,11 +46,10 @@ public class LogDirtySink implements DirtySink {
 private static final long serialVersionUID = 1L;
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(LogDirtySink.class);
-
-private RowData.FieldGetter[] fieldGetters;
 private final String format;
 private final String fieldDelimiter;
 private final DataType physicalRowDataType;
+private RowData.FieldGetter[] fieldGetters;
 private RowDataToJsonConverter converter;
 
 public LogDirtySink(String format, String fieldDelimiter, DataType 
physicalRowDataType) {
@@ -79,7 +78,7 @@ public class LogDirtySink implements DirtySink {
 // Only support csv format when the row is not a 'RowData' and 
'JsonNode'
 value = FormatUtils.csvFormat(data, labelMap, fieldDelimiter);
 }
-LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
+LOGGER.debug("[{}] {}", dirtyData.getLogTag(), value);
 }
 
 private String format(RowData data, LogicalType rowType,
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
index 66c1428d3..26f9d66f4 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
@@ -161,7 +161,7 @@ public class S3DirtySink implements DirtySink {
 value = FormatUtils.csvFormat(data, labelMap, 
s3Options.getFieldDelimiter());
 }
 if (s3Options.enableDirtyLog()) {
-LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
+LOGGER.debug("[{}] {}", dirtyData.getLogTag(), value);
 }
 batchBytes += value.getBytes(UTF_8).length;
 size++;



[inlong] branch master updated (b6014fa5f -> 0175d5c1d)

2023-03-12 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from b6014fa5f [INLONG-7533][Agent] Fix that log cannot be collected for 
position reset (#7538)
 add 0175d5c1d [INLONG-7559][Sort] Fix Oracle CDC reads timestamp type 
record error (#7561)

No new revisions were added by this update.

Summary of changes:
 .../table/RowDataDebeziumDeserializeSchema.java| 60 --
 1 file changed, 34 insertions(+), 26 deletions(-)



[inlong] branch master updated: [INLONG-7653][Sort] Support archiving dirty data and metrics for Iceberg connector (#7654)

2023-03-24 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 6d4aca310 [INLONG-7653][Sort] Support archiving dirty data and metrics 
for Iceberg connector (#7654)
6d4aca310 is described below

commit 6d4aca310330578bc591ca90c1b25366425f8832
Author: Liao Rui 
AuthorDate: Fri Mar 24 18:01:41 2023 +0800

[INLONG-7653][Sort] Support archiving dirty data and metrics for Iceberg 
connector (#7654)
---
 .../sort/iceberg/FlinkDynamicTableFactory.java |  23 +++
 .../inlong/sort/iceberg/IcebergTableSink.java  |   9 +
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  34 +++-
 .../sink/multiple/DynamicSchemaHandleOperator.java | 207 +
 .../multiple/IcebergMultipleFilesCommiter.java |  16 +-
 .../sink/multiple/IcebergSingleFileCommiter.java   |  16 +-
 .../sink/multiple/IcebergSingleStreamWriter.java   |   1 -
 .../iceberg/sink/multiple/RecordWithSchema.java|   4 +
 8 files changed, 214 insertions(+), 96 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index c6d3ed874..32d3a21bd 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.ActionsProvider;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.common.DynMethods;
@@ -112,6 +113,25 @@ public class FlinkDynamicTableFactory implements 
DynamicTableSinkFactory, Dynami
 .orNoop()
 .build();
 
+public static final ConfigOption WRITE_COMPACT_ENABLE =
+ConfigOptions.key("write.compact.enable")
+.booleanType()
+.defaultValue(false)
+.withDescription("Whether to enable compact small file.");
+
+public static final ConfigOption WRITE_COMPACT_INTERVAL =
+ConfigOptions.key("write.compact.snapshot.interval")
+.intType()
+.defaultValue(20)
+.withDescription("Compact snapshot interval.");
+
+public static final ConfigOption WRITE_DISTRIBUTION_MODE =
+ConfigOptions.key(TableProperties.WRITE_DISTRIBUTION_MODE)
+.stringType()
+.defaultValue(TableProperties.WRITE_DISTRIBUTION_MODE_NONE)
+.withDescription("Distribute the records from input data 
stream based "
++ "on the write.distribution-mode.");
+
 private final FlinkCatalog catalog;
 
 public FlinkDynamicTableFactory() {
@@ -274,6 +294,9 @@ public class FlinkDynamicTableFactory implements 
DynamicTableSinkFactory, Dynami
 options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
 options.add(SINK_MULTIPLE_PK_AUTO_GENERATED);
 options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
+options.add(WRITE_COMPACT_ENABLE);
+options.add(WRITE_COMPACT_INTERVAL);
+options.add(WRITE_DISTRIBUTION_MODE);
 return options;
 }
 
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index ef93e1f6f..cd239633e 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.actions.ActionsProvider;
 import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.TableLoader;
@@ -54,6 +55,7 @@ import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERA
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static 
org.apache.inlong.sor

[inlong] branch master updated (a39e03cc5 -> b9446908f)

2023-05-29 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from a39e03cc5 [INLONG-8108][Manager] WorkflowApprover API Permissions 
Optimization (#8109)
 add b9446908f [INLONG-7853][Sort] Add common handle for schema-change in 
sink (#8105)

No new revisions were added by this update.

Summary of changes:
 .../sort/protocol/enums/SchemaChangePolicy.java|  73 +++
 .../sort/protocol/enums/SchemaChangeType.java  |  94 +
 .../apache/inlong/sort/util/SchemaChangeUtils.java | 229 +
 .../inlong/sort/util/SchemaChangeUtilsTest.java|  62 ++
 .../org/apache/inlong/sort/base/Constants.java |  13 ++
 .../apache/inlong/sort/base/dirty/DirtyType.java   |  12 ++
 .../sort/base/format/JsonDynamicSchemaFormat.java  |   2 +-
 .../base/schema/SchemaChangeHandleException.java   |  20 +-
 8 files changed, 493 insertions(+), 12 deletions(-)
 create mode 100644 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/SchemaChangePolicy.java
 create mode 100644 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/SchemaChangeType.java
 create mode 100644 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
 create mode 100644 
inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/util/SchemaChangeUtilsTest.java
 copy 
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ProxysdkException.java
 => 
inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHandleException.java
 (72%)



[inlong] branch master updated: [INLONG-8217][Sort] Sort-core should support running on flink-1.15 (#8230)

2023-06-14 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 5a789fe7fe [INLONG-8217][Sort] Sort-core should support running on 
flink-1.15 (#8230)
5a789fe7fe is described below

commit 5a789fe7fe43f2f7faab4cd4fa0fadbcd730ba15
Author: Sting 
AuthorDate: Wed Jun 14 21:31:15 2023 +0800

[INLONG-8217][Sort] Sort-core should support running on flink-1.15 (#8230)
---
 .../sort-core/src/main/java/org/apache/inlong/sort/Entrance.java   | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
index ae3ebc842d..ea3f3f95be 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
@@ -50,8 +50,7 @@ public class Entrance {
 config.getInteger(Constants.MIN_PAUSE_BETWEEN_CHECKPOINTS_MS));
 
env.getCheckpointConfig().setCheckpointTimeout(config.getInteger(Constants.CHECKPOINT_TIMEOUT_MS));
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner()
-.inStreamingMode().build();
+EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
 
tableEnv.getConfig().getConfiguration().setString(Constants.PIPELINE_NAME,
 config.getString(Constants.JOB_NAME));