[GitHub] [inlong] gong commented on a diff in pull request #7062: [INLONG-7061][Sort] Support table level metrics for Apache Doris connector and add dirty metrics
gong commented on code in PR #7062: URL: https://github.com/apache/inlong/pull/7062#discussion_r1057554402 ## inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java: ## @@ -507,8 +512,35 @@ private void handleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e) LOG.warn("Dirty sink failed", ex); } } +metricData.invokeDirty(1, dirtyData.toString().getBytes(StandardCharsets.UTF_8).length); +} + +private void handleMultipleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e) { +JsonNode rootNode = null; +try { +rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0)); +} catch (Exception ex) { +LOG.warn("handle dirty data: rootnode is null", ex); +} + +if (dirtySink != null) { +DirtyData.Builder builder = DirtyData.builder(); +try { +builder.setData(dirtyData) +.setDirtyType(dirtyType) +.setLabels(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLabels())) +.setLogTag(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLogTag())) +.setDirtyMessage(e.getMessage()) +.setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getIdentifier())); +dirtySink.invoke(builder.build()); +} catch (Exception ex) { +if (!dirtyOptions.ignoreSideOutputErrors()) { +throw new RuntimeException(ex); +} +LOG.warn("Dirty sink failed", ex); +} +} try { -JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0)); metricData.outputDirtyMetricsWithEstimate( jsonDynamicSchemaFormat.parse(rootNode, databasePattern), null, jsonDynamicSchemaFormat.parse(rootNode, tablePattern), 1, Review Comment: schema is null, It will always lost table level metric -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] featzhang opened a new pull request, #7077: [INLONG-7072][Manager][Sort] Resource adaptive adjustment for Hudi
featzhang opened a new pull request, #7077: URL: https://github.com/apache/inlong/pull/7077 ### Prepare a Pull Request *[INLONG-7072][Manager][Sort] Resource adaptive adjustment for Hudi* - Fixes #7072 ### Motivation Hudi flink jobs often have unreasonable resource allocation. Too much allocation will lead to waste of resources, and too little will lead to back pressure or OOM. When allocating resources, you first need to determine the concurrency of the source side to ensure that there is no data backlog in the upstream when reading. Here is a general configuration situation, such as partitioning by day, with about 15 billion data per day, and about 50 concurrent configurations. Other data volumes can be converted appropriately. After determining the concurrency on the source side, you can configure the concurrency of write according to the ratio of 1:1.5 or 1:2. If OOM occurs in the write operator during operation, you can appropriately add write concurrency and TM memory. If the following back pressure occurs, the concurrency can be adjusted according to the consumption difference between source and write. As follows, there is a difference of about 50W, that is, there is 50W of data that cannot keep up with the write, and then it can be based on the amount of successfully written data and the running (used) Time to calculate how much write concurrency is needed to calculate the difference of 50W. ### Modifications 1. Estimate the parallelism of the source node based on the estimated daily data volume input by the user at a rate of 1,000 per second per core. 2. Configure write concurrency according to the ratio of 1:1.5 or 1:2 ### Verifying this change *(Please pick either of the following options)* - [x] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] kuansix opened a new pull request, #7078: [INLONG-7075][Sort] Add table level metric for PostgreSQL
kuansix opened a new pull request, #7078: URL: https://github.com/apache/inlong/pull/7078 ### Prepare a Pull Request - [INLONG-7075][Sort] Add table level metric for PostgreSQL - Fixes #7075 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications *Describe the modifications you've done.* ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #7078: [INLONG-7075][Sort] Add table level metric for PostgreSQL
healchow commented on code in PR #7078: URL: https://github.com/apache/inlong/pull/7078#discussion_r1058012354 ## inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java: ## @@ -532,6 +515,20 @@ protected void attemptFlush() throws IOException { } } +/** + * Output metrics with estimate for pg or other type jdbc connectors. + */ +private void outputMetrics(String tableIdentifier, Long rowSize, Long dataSize) { +String[] fieldArray = tableIdentifier.split("\\."); +if (fieldArray.length == 3) { +sinkMetricData.outputMetricsWithEstimate(fieldArray[0], fieldArray[1], fieldArray[2], +false, rowSize, dataSize); +} else if (fieldArray.length == 2) { +sinkMetricData.outputMetricsWithEstimate(fieldArray[0], null, fieldArray[1], +false, rowSize, dataSize); +} Review Comment: What are the forms of the string before cutting? Can you give some examples? In addition, if the array length is neither 2 nor 3, should you print logs to record data information? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] chyueyi opened a new pull request, #7079: [INLONG-7076] [sort-connector-jdbc] Add multi table sink for MySQL
chyueyi opened a new pull request, #7079: URL: https://github.com/apache/inlong/pull/7079 ## Prepare a Pull Request - [[INLONG-7076] [sort-connector-jdbc] Add multi table sink for MySQL](https://github.com/apache/inlong/issues/7076) - Fixes #XYZ ### Motivation - Users want to real time sync multi-tables data in mongoDB、KAFKA etc to MySql or other JDBC based databases in many case. We need multi table-sink connector for MySql. - Based on flink-connector-jdbc, sort-connector-jdbc can sink data to MySql in two formats: canal-json. ### Modifications - Add multi table sink for MySQL - add MySQLRowConverter, MySQLDialect - modify JdbcMultiBatchingOutputFormat and AbstractJdbcRowConverter to Support Mysql data type ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* ```sql CREATE TABLE cdc_mysql_source ( `data` BYTES METADATA FROM 'meta.data_canal' VIRTUAL ) WITH ( 'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=1', 'migrate-all' = 'true', 'connector' = 'mysql-cdc-inlong', 'scan.incremental.snapshot.enabled' = 'false', 'hostname' = 'localhost', 'database-name' = 'test', 'server-time-zone' = 'Asia/Shanghai', 'username' = '', 'password' = '', 'table-name' = 'test\.[\s\S]*' ); CREATE TABLE cdc_mysql_sink ( `data` BYTES )WITH ( 'connector' = 'jdbc-inlong', 'url' = 'jdbc:mysql://localhost:3306', 'username' = '', 'password' = '', 'table-name' = 'test', 'sink.multiple.enable' = 'true', 'sink.multiple.schema-update.policy' = 'TRY_IT_BEST', 'sink.multiple.format' = 'canal-json', 'sink.multiple.database-pattern' = 'test2', 'sink.multiple.schema-pattern' = '', 'sink.multiple.table-pattern' = '${table}' ); insert into cdc_mysql_sink select * from cdc_mysql_source; ``` ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (699af0d53 -> 6a35533c1)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 699af0d53 [INLONG-7070][Manager][DataProxy] Fix mq configuration dispatching problem (#7071) add 6a35533c1 [INLONG-7066][Sort] Kafka connector lost topic level dirty data metric and state restore for multi sink (#7074) No new revisions were added by this update. Summary of changes: .../sort/base/metric/sub/SinkTableMetricData.java | 25 ++- .../sort/base/metric/sub/SinkTopicMetricData.java | 48 +++--- .../inlong/sort/base/util/MetricStateUtils.java| 1 - .../kafka/DynamicKafkaSerializationSchema.java | 20 + .../inlong/sort/kafka/FlinkKafkaProducer.java | 9 ++-- 5 files changed, 72 insertions(+), 31 deletions(-)
[GitHub] [inlong] EMsnap merged pull request #7074: [INLONG-7066][Sort] Kafka connector lost topic level dirty data metric and state restore for multi sink
EMsnap merged PR #7074: URL: https://github.com/apache/inlong/pull/7074 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] kuansix commented on a diff in pull request #7078: [INLONG-7075][Sort] Add table level metric for PostgreSQL
kuansix commented on code in PR #7078: URL: https://github.com/apache/inlong/pull/7078#discussion_r1058024563 ## inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java: ## @@ -532,6 +515,20 @@ protected void attemptFlush() throws IOException { } } +/** + * Output metrics with estimate for pg or other type jdbc connectors. + */ +private void outputMetrics(String tableIdentifier, Long rowSize, Long dataSize) { +String[] fieldArray = tableIdentifier.split("\\."); +if (fieldArray.length == 3) { +sinkMetricData.outputMetricsWithEstimate(fieldArray[0], fieldArray[1], fieldArray[2], +false, rowSize, dataSize); +} else if (fieldArray.length == 2) { +sinkMetricData.outputMetricsWithEstimate(fieldArray[0], null, fieldArray[1], +false, rowSize, dataSize); +} Review Comment: Executed too many times here. Has log elsewhere -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] fuweng11 opened a new pull request, #7081: [INLONG-7080][Manager] Use the StringUtils.isNotBlank uniformly
fuweng11 opened a new pull request, #7081: URL: https://github.com/apache/inlong/pull/7081 ### Prepare a Pull Request - Fixes #7080 ### Motivation Use the `StringUtils.isNotBlank` uniformly instead of `StringUtils.isNoneBlank`. ### Modifications Use the `StringUtils.isNotBlank` uniformly instead of `StringUtils.isNoneBlank`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #7079: [INLONG-7076][Sort] Add multi table sink for MySQL
gong commented on code in PR #7079: URL: https://github.com/apache/inlong/pull/7079#discussion_r1058074563 ## inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java: ## @@ -18,7 +18,8 @@ package org.apache.inlong.sort.jdbc.table; import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.MySQLDialect; +//import org.apache.flink.connector.jdbc.dialect.MySQLDialect; +import org.apache.inlong.sort.jdbc.dialect.MySQLDialect; Review Comment: remove `//import org.apache.flink.connector.jdbc.dialect.MySQLDialect;` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #7079: [INLONG-7076][Sort] Add multi table sink for MySQL
gong commented on code in PR #7079: URL: https://github.com/apache/inlong/pull/7079#discussion_r1058077632 ## inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java: ## @@ -25,11 +25,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.LogicalTypeRoot; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.*; Review Comment: you need use codestyle of inlong, not to use `.*` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on pull request #7078: [INLONG-7075][Sort] Add table level metric for PostgreSQL
gong commented on PR #7078: URL: https://github.com/apache/inlong/pull/7078#issuecomment-1366395406 1、lost dirty data metric restore ```java public void open(int taskNumber, int numTasks) throws IOException { this.runtimeContext = getRuntimeContext(); MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(auditHostAndPorts) .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(MetricOption.RegisteredMetric.ALL) ``` 2、lost dirty data metric computing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong merged pull request #7081: [INLONG-7080][Manager] Use the StringUtils.isNotBlank uniformly
gong merged PR #7081: URL: https://github.com/apache/inlong/pull/7081 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-7080][Manager] Use the StringUtils.isNotBlank uniformly (#7081)
This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 7c9044a54 [INLONG-7080][Manager] Use the StringUtils.isNotBlank uniformly (#7081) 7c9044a54 is described below commit 7c9044a5434dcbb54481055f3a18ca2037e3b68b Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed Dec 28 14:25:24 2022 +0800 [INLONG-7080][Manager] Use the StringUtils.isNotBlank uniformly (#7081) --- .../inlong/manager/pojo/stream/InlongStreamExtParam.java | 12 ++-- .../service/resource/sink/greenplum/GreenplumSqlBuilder.java | 2 +- .../service/resource/sink/oracle/OracleSqlBuilder.java | 2 +- .../resource/sink/postgresql/PostgreSQLSqlBuilder.java | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java index 6186b7dc4..2b2e61bfc 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java @@ -17,11 +17,6 @@ package org.apache.inlong.manager.pojo.stream; -import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.manager.common.util.CommonBeanUtils; -import org.apache.inlong.manager.common.util.JsonUtils; - -import java.io.Serializable; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -29,6 +24,11 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import java.io.Serializable; /** * Extended params, will be saved as JSON string @@ -68,7 +68,7 @@ public class InlongStreamExtParam implements Serializable { public static void unpackExtParams( String extParams, Object targetObject) { -if (StringUtils.isNoneBlank(extParams)) { +if (StringUtils.isNotBlank(extParams)) { InlongStreamExtParam inlongStreamExtParam = JsonUtils.parseObject(extParams, InlongStreamExtParam.class); if (inlongStreamExtParam != null) { CommonBeanUtils.copyProperties(inlongStreamExtParam, targetObject, true); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java index 735351d9f..f9aa76276 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java @@ -211,7 +211,7 @@ public class GreenplumSqlBuilder { List columns) { final List commentList = new ArrayList<>(); for (GreenplumColumnInfo columnInfo : columns) { -if (StringUtils.isNoneBlank(columnInfo.getComment())) { +if (StringUtils.isNotBlank(columnInfo.getComment())) { StringBuilder commSql = new StringBuilder(); commSql.append("COMMENT ON COLUMN \"") .append(schemaName) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oracle/OracleSqlBuilder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oracle/OracleSqlBuilder.java index 9d026a62a..2dd4e28f4 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oracle/OracleSqlBuilder.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oracle/OracleSqlBuilder.java @@ -162,7 +162,7 @@ public class OracleSqlBuilder { final List commentList = new ArrayList<>(); final StringBuilder sqlBuilder = new StringBuilder(); columns.stream() -.filter(columnInfo -> StringUtils.isNoneBlank(columnInfo.getComment())) +.filter(columnInfo -> StringUtils.isNotBlank(columnInfo.getComment())) .forEach(columnInfo -> { sqlBuilder.append("COMMENT ON COLUMN \"") .append(tableName) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/
[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector
gong commented on code in PR #7059: URL: https://github.com/apache/inlong/pull/7059#discussion_r1058092502 ## inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/common/KuduUtils.java: ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kudu.common; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.util.StringUtils; + +import org.apache.inlong.sort.formats.common.BooleanTypeInfo; +import org.apache.inlong.sort.formats.common.ByteTypeInfo; +import org.apache.inlong.sort.formats.common.DateTypeInfo; +import org.apache.inlong.sort.formats.common.DecimalTypeInfo; +import org.apache.inlong.sort.formats.common.DoubleTypeInfo; +import org.apache.inlong.sort.formats.common.FloatTypeInfo; +import org.apache.inlong.sort.formats.common.IntTypeInfo; +import org.apache.inlong.sort.formats.common.LongTypeInfo; +import org.apache.inlong.sort.formats.common.RowTypeInfo; +import org.apache.inlong.sort.formats.common.StringTypeInfo; +import org.apache.inlong.sort.formats.common.TimestampTypeInfo; +import org.apache.inlong.sort.formats.common.TypeInfo; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; +import static org.apache.kudu.Type.*; + +/** + * The utility class for Kudu. + */ +public class KuduUtils { + +private static final Map KUDU_TYPE_2_TYPE_INFO_MAP = new HashMap<>(13); +private static final Map KUDU_TYPE_2_DATA_TYPE_MAP = new HashMap<>(13); Review Comment: maybe 13 should modify to 13/0.75 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector
gong commented on code in PR #7059: URL: https://github.com/apache/inlong/pull/7059#discussion_r1058093963 ## inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java: ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kudu.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; +import org.apache.kudu.client.SessionConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.apache.inlong.sort.base.Constants.*; +import static org.apache.inlong.sort.kudu.common.KuduOptions.*; + +/** + * The base for all kudu sinks. + */ +@PublicEvolving +public abstract class AbstractKuduSinkFunction +extends +RichSinkFunction +implements +CheckpointedFunction { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduSinkFunction.class); + +/** + * The masters of kudu server. + */ +protected final String masters; +/** + * The name of kudu table. + */ +protected final String tableName; +/** + * The flink TableSchema. + */ +protected final TableSchema flinkTableSchema; + +protected String connectorMetricIdentify; + +/** + * The configuration of kudu sinkFunction. + */ +protected final Configuration configuration; + +/** + * The maximum number of buffered records. + */ +protected final int maxBufferSize; + +/** + * The maximum number of retries. + */ +protected final int maxRetries; + +/** + * True if the sink is running. + */ +protected volatile boolean running; + +/** + * The exception thrown in asynchronous tasks. + */ +private transient Throwable flushThrowable; + +protected final SessionConfiguration.FlushMode flushMode; +/** + * whether load metadata in `open` function + */ +protected boolean lazyLoadSchema; + +private SinkMetricData sinkMetricData; + +private transient ListState metricStateListState; +private transient MetricState metricState; + +private final String auditHostAndPorts; + +private final String inlongMetric; + +public AbstractKuduSinkFunction( +TableSchema flinkTableSchema, +String masters, +String tableName, +SessionConfiguration.FlushMode flushMode, +Configuration configuration, +String inlongMetric, +String auditHostAndPorts) { +this.masters = masters; +this.flushMode = flushMode; +this.tableName = tableName; +this.flinkTableSchema = flinkTableSchema; +this.configuration = configuration; +this.maxRetries = configuration.getInteger(MAX_RETRIES); +this.maxBufferSize = configuration.getInteger(MAX_BUFFER_SIZE); +this.lazyLoadSchema = configuration.getBoolean(LAZY_LOAD_SCHEMA); +this.inlongMetric = inlongMetric; +this.auditHostAndPorts = auditHostAndPorts; +MetricOption metricOption = MetricOption.builder() +.withInlongLabels(inlongMetric) +.withInlongAudit(auditHostAndPorts) +.withInitReco
[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector
gong commented on code in PR #7059: URL: https://github.com/apache/inlong/pull/7059#discussion_r1058095936 ## inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/common/KuduOptions.java: ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kudu.common; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** + * The configuration options for kudu sink. + */ +public class KuduOptions { + +public static final ConfigOption FLUSH_MODE = +ConfigOptions.key("flush-mode") +.stringType() +.defaultValue("AUTO_FLUSH_SYNC") +.withDescription( +"The flush mode of Kudu client session, AUTO_FLUSH_SYNC/AUTO_FLUSH_BACKGROUND/MANUAL_FLUSH."); + +public static final ConfigOption MAX_CACHE_SIZE = +ConfigOptions.key("lookup.max-cache-size") +.intType() +.defaultValue(-1) +.withDescription("The maximum number of results cached in the " + +"lookup source."); + +public static final ConfigOption MAX_CACHE_TIME = +ConfigOptions.key("lookup.max-cache-time") +.stringType() +.defaultValue("60s") +.withDescription("The maximum live time for cached results in " + +"the lookup source."); +public static final ConfigOption SINK_START_NEW_CHAIN = +ConfigOptions.key("sink.start-new-chain") +.booleanType() +.defaultValue(true) +.withDescription("The sink operator will start a new chain if true."); + +public static final ConfigOption MAX_RETRIES = +ConfigOptions.key("max-retries") +.intType() +.defaultValue(3) +.withDescription("The maximum number of retries when an " + +"exception is caught."); +public static final ConfigOption MAX_BUFFER_SIZE = +ConfigOptions.key("sink.max-buffer-size") +.intType() +.defaultValue(100) +.withDescription("The maximum number of records buffered in the sink."); + +public static final ConfigOption WRITE_THREAD_COUNT = +ConfigOptions.key("sink.write-thread-count") +.intType() +.defaultValue(5) +.withDescription("The maximum number of thread in the sink."); + +public static final ConfigOption MAX_BUFFER_TIME = +ConfigOptions.key("sink.max-buffer-time") +.stringType() +.defaultValue("30s") +.withDescription("The maximum wait time for buffered records in the sink."); + +public static final ConfigOption SINK_KEY_FIELD_NAMES = +ConfigOptions.key("sink.key-field-names") +.stringType() +.defaultValue("") +.withDescription("The key fields for updating DB when using upsert sink function."); +public static final ConfigOption ENABLE_KEY_FIELD_CHECK = +ConfigOptions.key("sink.enable-key-field-check") +.booleanType() +.defaultValue(true) +.withDescription("If true, the check to compare key fields assumed by flink " + +"and key fields provided by user will be performed."); + +public static final ConfigOption SINK_WRITE_WITH_ASYNC_MODE = +ConfigOptions.key("sink.write-with-async-mode") +.booleanType() +.defaultValue(false) +.withDescription("Use async write mode for kudu producer if true."); + +public static final ConfigOption SINK_FORCE_WITH_UPSERT_MODE = +ConfigOptions.key("sink.write-with-upsert-mode") +.booleanType() +.defaultValue(true) +.withDescription("Force write kudu with upser
[GitHub] [inlong] chyueyi commented on pull request #7079: [INLONG-7076][Sort] Add multi table sink for MySQL
chyueyi commented on PR #7079: URL: https://github.com/apache/inlong/pull/7079#issuecomment-1366411665 @gong OK, I've modified it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector
gong commented on code in PR #7059: URL: https://github.com/apache/inlong/pull/7059#discussion_r1058096505 ## inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/table/KuduDynamicTableSink.java: ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kudu.table; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.inlong.sort.kudu.common.KuduOptions; +import org.apache.inlong.sort.kudu.sink.KuduAsyncSinkFunction; +import org.apache.inlong.sort.kudu.sink.KuduSinkFunction; +import org.apache.kudu.client.SessionConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.inlong.sort.kudu.common.KuduOptions.FLUSH_MODE; +import static org.apache.inlong.sort.kudu.common.KuduOptions.KUDU_IGNORE_ALL_CHANGELOG; + +/** + * The KuduLookupFunction is a standard user-defined table function, it can be + * used in tableAPI and also useful for temporal table join plan in SQL. + */ +public class KuduDynamicTableSink implements DynamicTableSink { + +private static final Logger LOG = LoggerFactory.getLogger(KuduDynamicTableSink.class); + +/** + * The schema of the table. + */ +private final TableSchema flinkSchema; + +/** + * The masters of kudu server. + */ +private final String masters; + +/** + * The flush mode of kudu client. + * AUTO_FLUSH_BACKGROUND: calls will return immediately, but the writes will be sent in the background, + * potentially batched together with other writes from the same session. + * AUTO_FLUSH_SYNC: call will return only after being flushed to the server automatically. + * MANUAL_FLUSH: calls will return immediately, but the writes will not be sent + * until the user calls KuduSession.flush(). + */ +private final SessionConfiguration.FlushMode flushMode; + +/** + * The name of kudu table. + */ +private final String tableName; + +/** + * The configuration for the kudu sink. + */ +private final Configuration configuration; +private final String inlongMetric; +private final String auditHostAndPorts; + +/** + * True if the data stream consumed by this sink is append-only. + */ +private boolean isAppendOnly; + +/** + * The names of the key fields of the upsert stream consumed by this sink. + */ +@Nullable +private String[] keyFieldNames; +private ResolvedCatalogTable catalogTable; + +public KuduDynamicTableSink( +ResolvedCatalogTable catalogTable, +TableSchema flinkSchema, +String masters, +String tableName, +Configuration configuration, +String inlongMetric, +String auditHostAndPorts) { +this.catalogTable = catalogTable; +this.flinkSchema = checkNotNull(flinkSchema, +"The schema must not be null."); +DataType dataType = flinkSchema.toRowDataType(); +LogicalType logicalType = dataType.getLogicalType(); + +SessionConfiguration.FlushMode flushMode = S
[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector
gong commented on code in PR #7059: URL: https://github.com/apache/inlong/pull/7059#discussion_r1058097244 ## inlong-sort/sort-connectors/kudu/pom.xml: ## @@ -0,0 +1,93 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + +org.apache.inlong +sort-connectors +1.5.0-SNAPSHOT + + +sort-connector-kudu +jar +Apache InLong - Sort-connector-kudu + + + + + +org.apache.inlong +sort-connector-base +${project.version} + + + +org.apache.kudu +kudu-client +${kudu.version} + + + +org.apache.inlong +sort-format-common +1.5.0-SNAPSHOT + + + + + + + +org.apache.maven.plugins +maven-shade-plugin + + +shade-flink + +shade + +package + + + +org.apache.hudi:* + org.apache.hive:hive-exec +org.apache.hadoop:* +com.fasterxml.woodstox:* +org.codehaus.woodstox:* +com.google.guava:* Review Comment: hudi? I can't find kudu -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector
gong commented on code in PR #7059: URL: https://github.com/apache/inlong/pull/7059#discussion_r1058097548 ## inlong-sort/sort-connectors/kudu/pom.xml: ## @@ -0,0 +1,93 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + +org.apache.inlong +sort-connectors +1.5.0-SNAPSHOT + + +sort-connector-kudu +jar +Apache InLong - Sort-connector-kudu + + + + + +org.apache.inlong +sort-connector-base +${project.version} + Review Comment: I suggest add a shade for `sort-connector-base` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector
gong commented on PR #7059: URL: https://github.com/apache/inlong/pull/7059#issuecomment-1366413893 add fileSet for kudu connector in inlong-distribution/src/main/assemblies/sort-connectors.xml -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on pull request #7079: [INLONG-7076][Sort] Add multi table sink for MySQL
gong commented on PR #7079: URL: https://github.com/apache/inlong/pull/7079#issuecomment-1366414528 @kuansix please help to review it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] bluewang opened a new pull request, #7084: [INLONG-7063][Dashboard] Optimize the topic name of the new consumption page
bluewang opened a new pull request, #7084: URL: https://github.com/apache/inlong/pull/7084 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes https://github.com/apache/inlong/issues/7063 ### Modifications  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] liaorui opened a new pull request, #7085: [INLONG-7083][Sort]StarRocks connector supports dirty data archives
liaorui opened a new pull request, #7085: URL: https://github.com/apache/inlong/pull/7085 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #7083 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* StarRocks connector supports dirty data archives ### Modifications *Describe the modifications you've done.* 1. `StarRocksDynamicTableSinkFactory.java` has new DirtyOptions, DirtySink and DirtySinkHelper objects. 2. `StarRocksDynamicTableSink.java`, `StarRocksDynamicSinkFunction.java` and `StarRocksSinkManager.java` get `DirtySinkHelper` tool from `StarRocksDynamicTableSinkFactory` one by one. 3. `StarRocksSinkManager.java` uses `DirtySinkHelper` to write dirty data and upload dirty data metrics when catching exceptions. ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org