[GitHub] [inlong] gong commented on a diff in pull request #7062: [INLONG-7061][Sort] Support table level metrics for Apache Doris connector and add dirty metrics

2022-12-27 Thread GitBox


gong commented on code in PR #7062:
URL: https://github.com/apache/inlong/pull/7062#discussion_r1057554402


##
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##
@@ -507,8 +512,35 @@ private void handleDirtyData(Object dirtyData, DirtyType 
dirtyType, Exception e)
 LOG.warn("Dirty sink failed", ex);
 }
 }
+metricData.invokeDirty(1, 
dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
+}
+
+private void handleMultipleDirtyData(Object dirtyData, DirtyType 
dirtyType, Exception e) {
+JsonNode rootNode = null;
+try {
+rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) 
dirtyData).getBinary(0));
+} catch (Exception ex) {
+LOG.warn("handle dirty data: rootnode is null", ex);
+}
+
+if (dirtySink != null) {
+DirtyData.Builder builder = DirtyData.builder();
+try {
+builder.setData(dirtyData)
+.setDirtyType(dirtyType)
+.setLabels(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLabels()))
+.setLogTag(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLogTag()))
+.setDirtyMessage(e.getMessage())
+.setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getIdentifier()));
+dirtySink.invoke(builder.build());
+} catch (Exception ex) {
+if (!dirtyOptions.ignoreSideOutputErrors()) {
+throw new RuntimeException(ex);
+}
+LOG.warn("Dirty sink failed", ex);
+}
+}
 try {
-JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) 
dirtyData).getBinary(0));
 metricData.outputDirtyMetricsWithEstimate(
 jsonDynamicSchemaFormat.parse(rootNode, databasePattern),
 null, jsonDynamicSchemaFormat.parse(rootNode, 
tablePattern), 1,

Review Comment:
   schema is null, It will always lost table level metric



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] featzhang opened a new pull request, #7077: [INLONG-7072][Manager][Sort] Resource adaptive adjustment for Hudi

2022-12-27 Thread GitBox


featzhang opened a new pull request, #7077:
URL: https://github.com/apache/inlong/pull/7077

   ### Prepare a Pull Request
   *[INLONG-7072][Manager][Sort] Resource adaptive adjustment for Hudi*
   
   
   - Fixes #7072 
   
   ### Motivation
   
   Hudi flink jobs often have unreasonable resource allocation. Too much 
allocation will lead to waste of resources, and too little will lead to back 
pressure or OOM.
   
   When allocating resources, you first need to determine the concurrency of 
the source side to ensure that there is no data backlog in the upstream when 
reading. Here is a general configuration situation, such as partitioning by 
day, with about 15 billion data per day, and about 50 concurrent 
configurations. Other data volumes can be converted appropriately.
   
   After determining the concurrency on the source side, you can configure the 
concurrency of write according to the ratio of 1:1.5 or 1:2.
   
   If OOM occurs in the write operator during operation, you can appropriately 
add write concurrency and TM memory.
   
   If the following back pressure occurs, the concurrency can be adjusted 
according to the consumption difference between source and write. As follows, 
there is a difference of about 50W, that is, there is 50W of data that cannot 
keep up with the write, and then it can be based on the amount of successfully 
written data and the running (used) Time to calculate how much write 
concurrency is needed to calculate the difference of 50W.
   
   
   
   ### Modifications
   
   1. Estimate the parallelism of the source node based on the estimated daily 
data volume input by the user at a rate of 1,000 per second per core.
   2. Configure write concurrency according to the ratio of 1:1.5 or 1:2
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [x] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] kuansix opened a new pull request, #7078: [INLONG-7075][Sort] Add table level metric for PostgreSQL

2022-12-27 Thread GitBox


kuansix opened a new pull request, #7078:
URL: https://github.com/apache/inlong/pull/7078

   ### Prepare a Pull Request
   - [INLONG-7075][Sort] Add table level metric for PostgreSQL
   - Fixes #7075 
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve?*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #7078: [INLONG-7075][Sort] Add table level metric for PostgreSQL

2022-12-27 Thread GitBox


healchow commented on code in PR #7078:
URL: https://github.com/apache/inlong/pull/7078#discussion_r1058012354


##
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##
@@ -532,6 +515,20 @@ protected void attemptFlush() throws IOException {
 }
 }
 
+/**
+ * Output metrics with estimate for pg or other type jdbc connectors.
+ */
+private void outputMetrics(String tableIdentifier, Long rowSize, Long 
dataSize) {
+String[] fieldArray = tableIdentifier.split("\\.");
+if (fieldArray.length == 3) {
+sinkMetricData.outputMetricsWithEstimate(fieldArray[0], 
fieldArray[1], fieldArray[2],
+false, rowSize, dataSize);
+} else if (fieldArray.length == 2) {
+sinkMetricData.outputMetricsWithEstimate(fieldArray[0], null, 
fieldArray[1],
+false, rowSize, dataSize);
+}

Review Comment:
   What are the forms of the string before cutting? Can you give some examples?
   In addition, if the array length is neither 2 nor 3, should you print logs 
to record data information?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] chyueyi opened a new pull request, #7079: [INLONG-7076] [sort-connector-jdbc] Add multi table sink for MySQL

2022-12-27 Thread GitBox


chyueyi opened a new pull request, #7079:
URL: https://github.com/apache/inlong/pull/7079

   
   ## Prepare a Pull Request
   
   -  [[INLONG-7076] [sort-connector-jdbc] Add multi table sink for 
MySQL](https://github.com/apache/inlong/issues/7076)
   
   - Fixes #XYZ
   
   ### Motivation
   
   - Users want to real time sync multi-tables data in mongoDB、KAFKA etc to 
MySql or other JDBC based databases in many case. We need multi table-sink 
connector for MySql.
   
   - Based on flink-connector-jdbc, sort-connector-jdbc can sink data to MySql 
in two formats: canal-json.
   
   ### Modifications
   
   - Add multi table sink for MySQL
   - add MySQLRowConverter, MySQLDialect
   - modify JdbcMultiBatchingOutputFormat and AbstractJdbcRowConverter to 
Support Mysql data type
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
   ```sql
   CREATE TABLE cdc_mysql_source (
  `data` BYTES METADATA FROM 'meta.data_canal' VIRTUAL
) WITH (
   'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=1',
   'migrate-all' = 'true',
   'connector' = 'mysql-cdc-inlong',
   'scan.incremental.snapshot.enabled' = 'false',
   'hostname' = 'localhost',
   'database-name' = 'test',
   'server-time-zone' = 'Asia/Shanghai',
   'username' = '',
   'password' = '',
   'table-name' = 'test\.[\s\S]*'
   );
   
   CREATE TABLE cdc_mysql_sink (
`data` BYTES
)WITH (
  'connector' = 'jdbc-inlong',
  'url' = 'jdbc:mysql://localhost:3306',
  'username' = '',
  'password' = '',
  'table-name' = 'test',  
  'sink.multiple.enable' = 'true',
  'sink.multiple.schema-update.policy' = 'TRY_IT_BEST',
  'sink.multiple.format' = 'canal-json',
  'sink.multiple.database-pattern' = 'test2',
  'sink.multiple.schema-pattern' = '',
  'sink.multiple.table-pattern' = '${table}'
 );
   
   insert into cdc_mysql_sink select * from cdc_mysql_source;
   ```
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (699af0d53 -> 6a35533c1)

2022-12-27 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 699af0d53 [INLONG-7070][Manager][DataProxy] Fix mq configuration 
dispatching problem (#7071)
 add 6a35533c1 [INLONG-7066][Sort] Kafka connector lost topic level dirty 
data metric and state restore for multi sink (#7074)

No new revisions were added by this update.

Summary of changes:
 .../sort/base/metric/sub/SinkTableMetricData.java  | 25 ++-
 .../sort/base/metric/sub/SinkTopicMetricData.java  | 48 +++---
 .../inlong/sort/base/util/MetricStateUtils.java|  1 -
 .../kafka/DynamicKafkaSerializationSchema.java | 20 +
 .../inlong/sort/kafka/FlinkKafkaProducer.java  |  9 ++--
 5 files changed, 72 insertions(+), 31 deletions(-)



[GitHub] [inlong] EMsnap merged pull request #7074: [INLONG-7066][Sort] Kafka connector lost topic level dirty data metric and state restore for multi sink

2022-12-27 Thread GitBox


EMsnap merged PR #7074:
URL: https://github.com/apache/inlong/pull/7074


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] kuansix commented on a diff in pull request #7078: [INLONG-7075][Sort] Add table level metric for PostgreSQL

2022-12-27 Thread GitBox


kuansix commented on code in PR #7078:
URL: https://github.com/apache/inlong/pull/7078#discussion_r1058024563


##
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##
@@ -532,6 +515,20 @@ protected void attemptFlush() throws IOException {
 }
 }
 
+/**
+ * Output metrics with estimate for pg or other type jdbc connectors.
+ */
+private void outputMetrics(String tableIdentifier, Long rowSize, Long 
dataSize) {
+String[] fieldArray = tableIdentifier.split("\\.");
+if (fieldArray.length == 3) {
+sinkMetricData.outputMetricsWithEstimate(fieldArray[0], 
fieldArray[1], fieldArray[2],
+false, rowSize, dataSize);
+} else if (fieldArray.length == 2) {
+sinkMetricData.outputMetricsWithEstimate(fieldArray[0], null, 
fieldArray[1],
+false, rowSize, dataSize);
+}

Review Comment:
   Executed too many times here. Has log elsewhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] fuweng11 opened a new pull request, #7081: [INLONG-7080][Manager] Use the StringUtils.isNotBlank uniformly

2022-12-27 Thread GitBox


fuweng11 opened a new pull request, #7081:
URL: https://github.com/apache/inlong/pull/7081

   ### Prepare a Pull Request
   - Fixes #7080 
   
   ### Motivation
   
   Use the `StringUtils.isNotBlank` uniformly instead of 
`StringUtils.isNoneBlank`.
   
   ### Modifications
   
   Use the `StringUtils.isNotBlank` uniformly instead of 
`StringUtils.isNoneBlank`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #7079: [INLONG-7076][Sort] Add multi table sink for MySQL

2022-12-27 Thread GitBox


gong commented on code in PR #7079:
URL: https://github.com/apache/inlong/pull/7079#discussion_r1058074563


##
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java:
##
@@ -18,7 +18,8 @@
 package org.apache.inlong.sort.jdbc.table;
 
 import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.MySQLDialect;
+//import org.apache.flink.connector.jdbc.dialect.MySQLDialect;
+import org.apache.inlong.sort.jdbc.dialect.MySQLDialect;

Review Comment:
   remove `//import org.apache.flink.connector.jdbc.dialect.MySQLDialect;`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #7079: [INLONG-7076][Sort] Add multi table sink for MySQL

2022-12-27 Thread GitBox


gong commented on code in PR #7079:
URL: https://github.com/apache/inlong/pull/7079#discussion_r1058077632


##
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java:
##
@@ -25,11 +25,7 @@
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.*;

Review Comment:
   you need use codestyle of inlong, not to use `.*`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on pull request #7078: [INLONG-7075][Sort] Add table level metric for PostgreSQL

2022-12-27 Thread GitBox


gong commented on PR #7078:
URL: https://github.com/apache/inlong/pull/7078#issuecomment-1366395406

   1、lost dirty data metric restore
   ```java
public void open(int taskNumber, int numTasks) throws IOException {
   this.runtimeContext = getRuntimeContext();
   MetricOption metricOption = MetricOption.builder()
   .withInlongLabels(inlongMetric)
   .withInlongAudit(auditHostAndPorts)
   .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
   .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
   .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
   ```
   2、lost dirty data metric computing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong merged pull request #7081: [INLONG-7080][Manager] Use the StringUtils.isNotBlank uniformly

2022-12-27 Thread GitBox


gong merged PR #7081:
URL: https://github.com/apache/inlong/pull/7081


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7080][Manager] Use the StringUtils.isNotBlank uniformly (#7081)

2022-12-27 Thread pacinogong
This is an automated email from the ASF dual-hosted git repository.

pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 7c9044a54 [INLONG-7080][Manager] Use the StringUtils.isNotBlank 
uniformly (#7081)
7c9044a54 is described below

commit 7c9044a5434dcbb54481055f3a18ca2037e3b68b
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Dec 28 14:25:24 2022 +0800

[INLONG-7080][Manager] Use the StringUtils.isNotBlank uniformly (#7081)
---
 .../inlong/manager/pojo/stream/InlongStreamExtParam.java | 12 ++--
 .../service/resource/sink/greenplum/GreenplumSqlBuilder.java |  2 +-
 .../service/resource/sink/oracle/OracleSqlBuilder.java   |  2 +-
 .../resource/sink/postgresql/PostgreSQLSqlBuilder.java   |  2 +-
 4 files changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 6186b7dc4..2b2e61bfc 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -17,11 +17,6 @@
 
 package org.apache.inlong.manager.pojo.stream;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
-
-import java.io.Serializable;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -29,6 +24,11 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import java.io.Serializable;
 
 /**
  * Extended params, will be saved as JSON string
@@ -68,7 +68,7 @@ public class InlongStreamExtParam implements Serializable {
 public static void unpackExtParams(
 String extParams,
 Object targetObject) {
-if (StringUtils.isNoneBlank(extParams)) {
+if (StringUtils.isNotBlank(extParams)) {
 InlongStreamExtParam inlongStreamExtParam = 
JsonUtils.parseObject(extParams, InlongStreamExtParam.class);
 if (inlongStreamExtParam != null) {
 CommonBeanUtils.copyProperties(inlongStreamExtParam, 
targetObject, true);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java
index 735351d9f..f9aa76276 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java
@@ -211,7 +211,7 @@ public class GreenplumSqlBuilder {
 List columns) {
 final List commentList = new ArrayList<>();
 for (GreenplumColumnInfo columnInfo : columns) {
-if (StringUtils.isNoneBlank(columnInfo.getComment())) {
+if (StringUtils.isNotBlank(columnInfo.getComment())) {
 StringBuilder commSql = new StringBuilder();
 commSql.append("COMMENT ON COLUMN \"")
 .append(schemaName)
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oracle/OracleSqlBuilder.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oracle/OracleSqlBuilder.java
index 9d026a62a..2dd4e28f4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oracle/OracleSqlBuilder.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oracle/OracleSqlBuilder.java
@@ -162,7 +162,7 @@ public class OracleSqlBuilder {
 final List commentList = new ArrayList<>();
 final StringBuilder sqlBuilder = new StringBuilder();
 columns.stream()
-.filter(columnInfo -> 
StringUtils.isNoneBlank(columnInfo.getComment()))
+.filter(columnInfo -> 
StringUtils.isNotBlank(columnInfo.getComment()))
 .forEach(columnInfo -> {
 sqlBuilder.append("COMMENT ON COLUMN \"")
 .append(tableName)
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/

[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

2022-12-27 Thread GitBox


gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058092502


##
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/common/KuduUtils.java:
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.common;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.inlong.sort.formats.common.BooleanTypeInfo;
+import org.apache.inlong.sort.formats.common.ByteTypeInfo;
+import org.apache.inlong.sort.formats.common.DateTypeInfo;
+import org.apache.inlong.sort.formats.common.DecimalTypeInfo;
+import org.apache.inlong.sort.formats.common.DoubleTypeInfo;
+import org.apache.inlong.sort.formats.common.FloatTypeInfo;
+import org.apache.inlong.sort.formats.common.IntTypeInfo;
+import org.apache.inlong.sort.formats.common.LongTypeInfo;
+import org.apache.inlong.sort.formats.common.RowTypeInfo;
+import org.apache.inlong.sort.formats.common.StringTypeInfo;
+import org.apache.inlong.sort.formats.common.TimestampTypeInfo;
+import org.apache.inlong.sort.formats.common.TypeInfo;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.kudu.Type.*;
+
+/**
+ * The utility class for Kudu.
+ */
+public class KuduUtils {
+
+private static final Map KUDU_TYPE_2_TYPE_INFO_MAP = new 
HashMap<>(13);
+private static final Map KUDU_TYPE_2_DATA_TYPE_MAP = new 
HashMap<>(13);

Review Comment:
   maybe 13 should modify to 13/0.75



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

2022-12-27 Thread GitBox


gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058093963


##
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java:
##
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.kudu.client.SessionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.inlong.sort.base.Constants.*;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.*;
+
+/**
+ * The base for all kudu sinks.
+ */
+@PublicEvolving
+public abstract class AbstractKuduSinkFunction
+extends
+RichSinkFunction
+implements
+CheckpointedFunction {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(AbstractKuduSinkFunction.class);
+
+/**
+ * The masters of kudu server.
+ */
+protected final String masters;
+/**
+ * The name of kudu table.
+ */
+protected final String tableName;
+/**
+ * The flink TableSchema.
+ */
+protected final TableSchema flinkTableSchema;
+
+protected String connectorMetricIdentify;
+
+/**
+ * The configuration of kudu sinkFunction.
+ */
+protected final Configuration configuration;
+
+/**
+ * The maximum number of buffered records.
+ */
+protected final int maxBufferSize;
+
+/**
+ * The maximum number of retries.
+ */
+protected final int maxRetries;
+
+/**
+ * True if the sink is running.
+ */
+protected volatile boolean running;
+
+/**
+ * The exception thrown in asynchronous tasks.
+ */
+private transient Throwable flushThrowable;
+
+protected final SessionConfiguration.FlushMode flushMode;
+/**
+ * whether load metadata in `open` function
+ */
+protected boolean lazyLoadSchema;
+
+private SinkMetricData sinkMetricData;
+
+private transient ListState metricStateListState;
+private transient MetricState metricState;
+
+private final String auditHostAndPorts;
+
+private final String inlongMetric;
+
+public AbstractKuduSinkFunction(
+TableSchema flinkTableSchema,
+String masters,
+String tableName,
+SessionConfiguration.FlushMode flushMode,
+Configuration configuration,
+String inlongMetric,
+String auditHostAndPorts) {
+this.masters = masters;
+this.flushMode = flushMode;
+this.tableName = tableName;
+this.flinkTableSchema = flinkTableSchema;
+this.configuration = configuration;
+this.maxRetries = configuration.getInteger(MAX_RETRIES);
+this.maxBufferSize = configuration.getInteger(MAX_BUFFER_SIZE);
+this.lazyLoadSchema = configuration.getBoolean(LAZY_LOAD_SCHEMA);
+this.inlongMetric = inlongMetric;
+this.auditHostAndPorts = auditHostAndPorts;
+MetricOption metricOption = MetricOption.builder()
+.withInlongLabels(inlongMetric)
+.withInlongAudit(auditHostAndPorts)
+.withInitReco

[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

2022-12-27 Thread GitBox


gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058095936


##
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/common/KuduOptions.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.common;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * The configuration options for kudu sink.
+ */
+public class KuduOptions {
+
+public static final ConfigOption FLUSH_MODE =
+ConfigOptions.key("flush-mode")
+.stringType()
+.defaultValue("AUTO_FLUSH_SYNC")
+.withDescription(
+"The flush mode of Kudu client session, 
AUTO_FLUSH_SYNC/AUTO_FLUSH_BACKGROUND/MANUAL_FLUSH.");
+
+public static final ConfigOption MAX_CACHE_SIZE =
+ConfigOptions.key("lookup.max-cache-size")
+.intType()
+.defaultValue(-1)
+.withDescription("The maximum number of results cached in 
the " +
+"lookup source.");
+
+public static final ConfigOption MAX_CACHE_TIME =
+ConfigOptions.key("lookup.max-cache-time")
+.stringType()
+.defaultValue("60s")
+.withDescription("The maximum live time for cached results 
in " +
+"the lookup source.");
+public static final ConfigOption SINK_START_NEW_CHAIN =
+ConfigOptions.key("sink.start-new-chain")
+.booleanType()
+.defaultValue(true)
+.withDescription("The sink operator will start a new chain 
if true.");
+
+public static final ConfigOption MAX_RETRIES =
+ConfigOptions.key("max-retries")
+.intType()
+.defaultValue(3)
+.withDescription("The maximum number of retries when an " +
+"exception is caught.");
+public static final ConfigOption MAX_BUFFER_SIZE =
+ConfigOptions.key("sink.max-buffer-size")
+.intType()
+.defaultValue(100)
+.withDescription("The maximum number of records buffered 
in the sink.");
+
+public static final ConfigOption WRITE_THREAD_COUNT =
+ConfigOptions.key("sink.write-thread-count")
+.intType()
+.defaultValue(5)
+.withDescription("The maximum number of thread in the 
sink.");
+
+public static final ConfigOption MAX_BUFFER_TIME =
+ConfigOptions.key("sink.max-buffer-time")
+.stringType()
+.defaultValue("30s")
+.withDescription("The maximum wait time for buffered 
records in the sink.");
+
+public static final ConfigOption SINK_KEY_FIELD_NAMES =
+ConfigOptions.key("sink.key-field-names")
+.stringType()
+.defaultValue("")
+.withDescription("The key fields for updating DB when 
using upsert sink function.");
+public static final ConfigOption ENABLE_KEY_FIELD_CHECK =
+ConfigOptions.key("sink.enable-key-field-check")
+.booleanType()
+.defaultValue(true)
+.withDescription("If true, the check to compare key fields 
assumed by flink " +
+"and key fields provided by user will be 
performed.");
+
+public static final ConfigOption SINK_WRITE_WITH_ASYNC_MODE =
+ConfigOptions.key("sink.write-with-async-mode")
+.booleanType()
+.defaultValue(false)
+.withDescription("Use async write mode for kudu producer 
if true.");
+
+public static final ConfigOption SINK_FORCE_WITH_UPSERT_MODE =
+ConfigOptions.key("sink.write-with-upsert-mode")
+.booleanType()
+.defaultValue(true)
+.withDescription("Force write kudu with upser

[GitHub] [inlong] chyueyi commented on pull request #7079: [INLONG-7076][Sort] Add multi table sink for MySQL

2022-12-27 Thread GitBox


chyueyi commented on PR #7079:
URL: https://github.com/apache/inlong/pull/7079#issuecomment-1366411665

   @gong OK, I've modified it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

2022-12-27 Thread GitBox


gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058096505


##
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/table/KuduDynamicTableSink.java:
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.inlong.sort.kudu.common.KuduOptions;
+import org.apache.inlong.sort.kudu.sink.KuduAsyncSinkFunction;
+import org.apache.inlong.sort.kudu.sink.KuduSinkFunction;
+import org.apache.kudu.client.SessionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.FLUSH_MODE;
+import static 
org.apache.inlong.sort.kudu.common.KuduOptions.KUDU_IGNORE_ALL_CHANGELOG;
+
+/**
+ * The KuduLookupFunction is a standard user-defined table function, it can be
+ * used in tableAPI and also useful for temporal table join plan in SQL.
+ */
+public class KuduDynamicTableSink implements DynamicTableSink {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KuduDynamicTableSink.class);
+
+/**
+ * The schema of the table.
+ */
+private final TableSchema flinkSchema;
+
+/**
+ * The masters of kudu server.
+ */
+private final String masters;
+
+/**
+ * The flush mode of kudu client. 
+ * AUTO_FLUSH_BACKGROUND: calls will return immediately, but the writes 
will be sent in the background,
+ * potentially batched together with other writes from the same session. 

+ * AUTO_FLUSH_SYNC: call will return only after being flushed to the 
server automatically. 
+ * MANUAL_FLUSH: calls will return immediately, but the writes will not be 
sent
+ * until the user calls KuduSession.flush().
+ */
+private final SessionConfiguration.FlushMode flushMode;
+
+/**
+ * The name of kudu table.
+ */
+private final String tableName;
+
+/**
+ * The configuration for the kudu sink.
+ */
+private final Configuration configuration;
+private final String inlongMetric;
+private final String auditHostAndPorts;
+
+/**
+ * True if the data stream consumed by this sink is append-only.
+ */
+private boolean isAppendOnly;
+
+/**
+ * The names of the key fields of the upsert stream consumed by this sink.
+ */
+@Nullable
+private String[] keyFieldNames;
+private ResolvedCatalogTable catalogTable;
+
+public KuduDynamicTableSink(
+ResolvedCatalogTable catalogTable,
+TableSchema flinkSchema,
+String masters,
+String tableName,
+Configuration configuration,
+String inlongMetric,
+String auditHostAndPorts) {
+this.catalogTable = catalogTable;
+this.flinkSchema = checkNotNull(flinkSchema,
+"The schema must not be null.");
+DataType dataType = flinkSchema.toRowDataType();
+LogicalType logicalType = dataType.getLogicalType();
+
+SessionConfiguration.FlushMode flushMode = 
S

[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

2022-12-27 Thread GitBox


gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058097244


##
inlong-sort/sort-connectors/kudu/pom.xml:
##
@@ -0,0 +1,93 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+4.0.0
+
+org.apache.inlong
+sort-connectors
+1.5.0-SNAPSHOT
+
+
+sort-connector-kudu
+jar
+Apache InLong - Sort-connector-kudu
+
+
+
+
+
+org.apache.inlong
+sort-connector-base
+${project.version}
+
+
+
+org.apache.kudu
+kudu-client
+${kudu.version}
+
+
+
+org.apache.inlong
+sort-format-common
+1.5.0-SNAPSHOT
+
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-shade-plugin
+
+
+shade-flink
+
+shade
+
+package
+
+
+
+org.apache.hudi:*
+
org.apache.hive:hive-exec
+org.apache.hadoop:*
+com.fasterxml.woodstox:*
+org.codehaus.woodstox:*
+com.google.guava:*

Review Comment:
   hudi? I can't find kudu



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

2022-12-27 Thread GitBox


gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058097548


##
inlong-sort/sort-connectors/kudu/pom.xml:
##
@@ -0,0 +1,93 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+4.0.0
+
+org.apache.inlong
+sort-connectors
+1.5.0-SNAPSHOT
+
+
+sort-connector-kudu
+jar
+Apache InLong - Sort-connector-kudu
+
+
+
+
+
+org.apache.inlong
+sort-connector-base
+${project.version}
+

Review Comment:
   I suggest add a shade for `sort-connector-base`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

2022-12-27 Thread GitBox


gong commented on PR #7059:
URL: https://github.com/apache/inlong/pull/7059#issuecomment-1366413893

   add fileSet for kudu connector in 
inlong-distribution/src/main/assemblies/sort-connectors.xml


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on pull request #7079: [INLONG-7076][Sort] Add multi table sink for MySQL

2022-12-27 Thread GitBox


gong commented on PR #7079:
URL: https://github.com/apache/inlong/pull/7079#issuecomment-1366414528

   @kuansix please help to review it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] bluewang opened a new pull request, #7084: [INLONG-7063][Dashboard] Optimize the topic name of the new consumption page

2022-12-27 Thread GitBox


bluewang opened a new pull request, #7084:
URL: https://github.com/apache/inlong/pull/7084

   
   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes https://github.com/apache/inlong/issues/7063
   
   ### Modifications
   
   
![image](https://user-images.githubusercontent.com/88174078/209778096-adf7efa5-baeb-4d27-a677-ed825a9785cc.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] liaorui opened a new pull request, #7085: [INLONG-7083][Sort]StarRocks connector supports dirty data archives

2022-12-27 Thread GitBox


liaorui opened a new pull request, #7085:
URL: https://github.com/apache/inlong/pull/7085

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes #7083
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve?*
   
   StarRocks connector supports dirty data archives
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   1. `StarRocksDynamicTableSinkFactory.java` has new DirtyOptions, DirtySink 
and DirtySinkHelper objects.
   2. `StarRocksDynamicTableSink.java`, `StarRocksDynamicSinkFunction.java` and 
`StarRocksSinkManager.java` get `DirtySinkHelper` tool from 
`StarRocksDynamicTableSinkFactory` one by one.
   3. `StarRocksSinkManager.java` uses `DirtySinkHelper` to write dirty data 
and upload dirty data metrics when catching exceptions.
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org