[GitHub] [inlong] fuweng11 opened a new pull request, #7937: [INLONG-7936][Manager] Support issued pulsar subscriptions to sort
fuweng11 opened a new pull request, #7937: URL: https://github.com/apache/inlong/pull/7937 ### Prepare a Pull Request - Fixes #7936 ### Motivation Support issued pulsar subscriptions to sort. ### Modifications Support issued pulsar subscriptions to sort. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] vernedeng opened a new pull request, #7939: [INLONG-7938][Manager] Fix consume list interface does not filter by request
vernedeng opened a new pull request, #7939: URL: https://github.com/apache/inlong/pull/7939 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #7938 ### Motivation There are three main problems: 1. Use GET request but it should be a POST 2. Didn't add annotation @RequestBody ### Modifications *Describe the modifications you've done.* ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang commented on a diff in pull request #7935: [INLONG-7934][Manager] Optimize the serializationType to support debezium json
dockerzhang commented on code in PR #7935: URL: https://github.com/apache/inlong/pull/7935#discussion_r1178728624 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java: ## @@ -131,8 +127,13 @@ public Map> getSourcesMap(InlongGroupInfo groupInfo, if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) { continue; } -if (StringUtils.isEmpty(kafkaSource.getSerializationType()) && StringUtils.isNotEmpty( -sourceInfo.getSerializationType())) { + +if (sourceInfo.getSourceType().equalsIgnoreCase(SourceType.FILE)) { Review Comment: Here should including `AUTO_PUSH`? ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java: ## @@ -110,17 +110,21 @@ public Map> getSourcesMap(InlongGroupInfo groupInfo, tubeMQSource.setTopic(streamInfo.getMqResource()); tubeMQSource.setGroupId(streamId); tubeMQSource.setMasterRpc(masterRpc); -if (StringUtils.isNotBlank(streamInfo.getDataType())) { -String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType(); -tubeMQSource.setSerializationType(serializationType); -} tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError()); for (StreamSource sourceInfo : streamSources) { if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) { continue; } - tubeMQSource.setSerializationType(sourceInfo.getSerializationType()); + +if (sourceInfo.getSourceType().equalsIgnoreCase(SourceType.FILE)) { Review Comment: ditto ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java: ## @@ -149,10 +145,16 @@ public Map> getSourcesMap(InlongGroupInfo groupInfo, if (!Objects.equal(streamId, sourceInfo.getInlongStreamId())) { continue; } -if (StringUtils.isEmpty(pulsarSource.getSerializationType()) -&& StringUtils.isNotEmpty(sourceInfo.getSerializationType())) { + +if (sourceInfo.getSourceType().equalsIgnoreCase(SourceType.FILE)) { Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] haifxu commented on a diff in pull request #7935: [INLONG-7934][Manager] Optimize the serializationType to support debezium json
haifxu commented on code in PR #7935: URL: https://github.com/apache/inlong/pull/7935#discussion_r1178742790 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java: ## @@ -131,8 +127,13 @@ public Map> getSourcesMap(InlongGroupInfo groupInfo, if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) { continue; } -if (StringUtils.isEmpty(kafkaSource.getSerializationType()) && StringUtils.isNotEmpty( -sourceInfo.getSerializationType())) { + +if (sourceInfo.getSourceType().equalsIgnoreCase(SourceType.FILE)) { Review Comment: Yes, `AUTO_PUSH` also can customize the data format. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] fuweng11 commented on a diff in pull request #7913: [INLONG-7912][Manager] Only issue normal DataProxy nodes
fuweng11 commented on code in PR #7913: URL: https://github.com/apache/inlong/pull/7913#discussion_r1178751472 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java: ## @@ -1154,6 +1155,11 @@ public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolTy // TODO consider the data proxy load and re-balance List nodeList = new ArrayList<>(); for (InlongClusterNodeEntity nodeEntity : nodeEntities) { +if (Objects.equals(nodeEntity.getStatus(), NodeStatus.HEARTBEAT_TIMEOUT.getStatus())) { +LOGGER.debug("data proxy is heart time out by parentId={}, ip={}, port={}", nodeEntity.getParentId(), +nodeEntity.getIp(), nodeEntity.getPort()); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #7937: [INLONG-7936][Manager] Support issued pulsar subscriptions to sort
EMsnap commented on code in PR #7937: URL: https://github.com/apache/inlong/pull/7937#discussion_r1178761123 ## inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java: ## @@ -67,13 +69,15 @@ public PulsarExtractNode(@JsonProperty("id") String id, @Nullable @JsonProperty("watermarkField") WatermarkField watermarkField, @JsonProperty("properties") Map properties, @Nonnull @JsonProperty("topic") String topic, +@JsonProperty("subscription") String subscription, Review Comment: newly added parameter should be appended to the list -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong-website] bluewang opened a new issue, #744: [Bug][Doc] The version number does not match
bluewang opened a new issue, #744: URL: https://github.com/apache/inlong-website/issues/744 ### What happened  ### What you expected to happen open https://inlong.apache.org/zh-CN/docs/sdk/manager-sdk/example ### How to reproduce version number correspondence ### Environment _No response_ ### Are you willing to submit PR? - [X] Yes, I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong-website] bluewang opened a new pull request, #745: [INLONG-744][Doc] The version number does not match
bluewang opened a new pull request, #745: URL: https://github.com/apache/inlong-website/pull/745 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes https://github.com/apache/inlong-website/issues/744 ### Verifying this change   -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang opened a new pull request, #7942: [INLONG-7931][DataProxy] Optimize common.properties related control mechanism
gosonzhang opened a new pull request, #7942: URL: https://github.com/apache/inlong/pull/7942 - Fixes #7931 Modification details: 1. Merge CommonPropertiesHolder and ConfigManager.getCommonProperties() into CommonConfigHolder, manage configuration logic related to common.properties, and replace related content; 2. Converge the field key defined in common.properties, and the default value logic, and unify the value logic of the field; 3. Optimize some common.properties related implementation codes and remove unnecessary classes; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap opened a new pull request, #7943: [INLONG-7940][Manager] Use pulsar subscriptions and specify subscription offset in pulsar connector
EMsnap opened a new pull request, #7943: URL: https://github.com/apache/inlong/pull/7943 ### Prepare a Pull Request - Fixes #7840 ### Motivation [INLONG-7940][Manager] Use pulsar subscriptions and specify subscription offset in pulsar connector ### Modifications [INLONG-7940][Manager] Use pulsar subscriptions and specify subscription offset in pulsar connector ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong-website] dockerzhang merged pull request #745: [INLONG-744][Doc] The version number does not match
dockerzhang merged PR #745: URL: https://github.com/apache/inlong-website/pull/745 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong-website] dockerzhang closed issue #744: [Bug][Doc] The version number does not match
dockerzhang closed issue #744: [Bug][Doc] The version number does not match URL: https://github.com/apache/inlong-website/issues/744 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong-website] branch master updated: [INLONG-744][Doc] The version number does not match (#745)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong-website.git The following commit(s) were added to refs/heads/master by this push: new 416067b476 [INLONG-744][Doc] The version number does not match (#745) 416067b476 is described below commit 416067b4762b83ca0999059dc455abee9ef4508c Author: Lizhen <88174078+bluew...@users.noreply.github.com> AuthorDate: Thu Apr 27 17:23:55 2023 +0800 [INLONG-744][Doc] The version number does not match (#745) --- docs/version.js| 2 +- .../current/version.js | 2 +- .../version-1.3.0/version.js | 2 +- .../version-1.4.0/version.js | 2 +- .../version-1.5.0/version.js | 2 +- .../version-1.6.0.json | 66 ++ .../version-1.6.0/version.js | 2 +- versioned_docs/version-1.3.0/version.js| 2 +- versioned_docs/version-1.4.0/version.js| 2 +- versioned_docs/version-1.5.0/version.js| 2 +- versioned_docs/version-1.6.0/version.js| 2 +- 11 files changed, 76 insertions(+), 10 deletions(-) diff --git a/docs/version.js b/docs/version.js index 03b34a947b..f6199f5491 100644 --- a/docs/version.js +++ b/docs/version.js @@ -1,3 +1,3 @@ export const siteVariables = { - inLongVersion: '1.6.0-SNAPSHOT', + inLongVersion: '1.7.0-SNAPSHOT', }; \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/version.js b/i18n/zh-CN/docusaurus-plugin-content-docs/current/version.js index a881cacb2a..f6199f5491 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/version.js +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/version.js @@ -1,3 +1,3 @@ export const siteVariables = { - inLongVersion: '1.5.0-SNAPSHOT', + inLongVersion: '1.7.0-SNAPSHOT', }; \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.3.0/version.js b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.3.0/version.js index 7a6815e2f4..107d085ca5 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.3.0/version.js +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.3.0/version.js @@ -1,3 +1,3 @@ export const siteVariables = { - inLongVersion: '1.3.0-SNAPSHOT', + inLongVersion: '1.3.0', }; \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.4.0/version.js b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.4.0/version.js index 582c29ff36..13064ef445 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.4.0/version.js +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.4.0/version.js @@ -1,3 +1,3 @@ export const siteVariables = { - inLongVersion: '1.4.0-SNAPSHOT', + inLongVersion: '1.4.0', }; \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.5.0/version.js b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.5.0/version.js index a881cacb2a..4ff2f85a1b 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.5.0/version.js +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.5.0/version.js @@ -1,3 +1,3 @@ export const siteVariables = { - inLongVersion: '1.5.0-SNAPSHOT', + inLongVersion: '1.5.0', }; \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.6.0.json b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.6.0.json new file mode 100644 index 00..e57b4121fe --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.6.0.json @@ -0,0 +1,66 @@ +{ + "version.label": { +"message": "1.6.0", +"description": "The label for version current" + }, + "sidebar.tutorialSidebar.category.Design and Concept": { +"message": "设计和概念", +"description": "The label for category Design and Concept in sidebar tutorialSidebar" + }, + "sidebar.tutorialSidebar.category.Quick Start": { +"message": "快速开始", +"description": "The label for category Quick Start in sidebar tutorialSidebar" + }, + "sidebar.tutorialSidebar.category.Deployment": { +"message": "安装部署", +"description": "The label for category Deployment in sidebar tutorialSidebar" + }, + "sidebar.tutorialSidebar.category.Components": { +"message": "组件介绍", +"description": "The label for category Components in sidebar tutorialSidebar" + }, + "sidebar.tutorialSidebar.category.Manager": { +"message": "Manager", +"description": "The label for category Manager in sidebar tutorialSidebar" + }, + "sidebar.tutorialSidebar.category.Dashboard": { +"message": "Dashboard", +"description": "The label for category Dashboard in sidebar tutorialSidebar" + }, + "sidebar.tutorialSidebar.category.Agent": { +"message": "Agent", +"description": "The label for ca
[GitHub] [inlong] haifxu opened a new pull request, #7944: [INLONG-7941][Manager][Dashborad] Add param to distinguish lightweight
haifxu opened a new pull request, #7944: URL: https://github.com/apache/inlong/pull/7944 ### Prepare a Pull Request - Fixes #7941 ### Motivation 1. We need to distinguish lightweight tasks when querying groups. ### Modifications 1. Add `lightweight` param. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #7923: [INLONG-7879][Manager][Sort][Agent] Multi-version code migration
gong commented on code in PR #7923: URL: https://github.com/apache/inlong/pull/7923#discussion_r1178909366 ## inlong-sort/sort-flink/sort-flink-v1.13/pom.xml: ## @@ -0,0 +1,235 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + +org.apache.inlong +sort-flink +1.7.0-SNAPSHOT +../pom.xml + + +sort-flink-v1.13 +pom + +Apache InLong - Sort-Flink-v1.13 + +sort-connectors-v1.13 +sort-flink-dependencies-v1.13 + + + + 2.3.0 +1.1.0 + 2.2.1 +2.12.1-13.0 + 2.2.1 + 2.2.1 +1.13.6.2 +2.7.6 + 2.3.0 +1.0.3 +2.3.0 + +7.2.2.jre8 + +0.9.3 + + + + + +org.apache.flink + flink-clients_${flink.scala.binary.version} +${flink.version} + + +org.apache.flink + flink-streaming-java_${flink.scala.binary.version} +${flink.version} + + +org.apache.flink + flink-connector-kafka_${flink.scala.binary.version} +${flink.version} + + +org.apache.flink + flink-connector-hive_${flink.scala.binary.version} +${flink.version} + + +org.apache.flink + flink-connector-hbase-2.2_${flink.scala.binary.version} +${flink.version} + + +org.apache.iceberg +iceberg-flink-runtime-1.14 +${iceberg.version} + + +com.ververica +flink-connector-postgres-cdc +${flink.connector.postgres.cdc.version} + + +com.ververica +flink-connector-oracle-cdc +${flink.connector.oracle.cdc.version} + + + +com.oracle.ojdbc +ojdbc8 + + + + +com.ververica +flink-connector-sqlserver-cdc +${flink.connector.sqlserver.cdc.version} + + +com.ververica +flink-connector-mysql-cdc +${flink.connector.mysql.cdc.version} + + +org.apache.flink + flink-connector-jdbc_${flink.scala.binary.version} +${flink.version} + + +io.streamnative.connectors +pulsar-flink-connector-origin +${flink.pulsar.version} + + +com.ververica +flink-connector-mongodb-cdc +${flink.connector.mongodb.cdc.version} + + +org.apache.bahir + flink-connector-redis_${flink.scala.binary.version} +${flink.connector.redis} + + +com.ververica +flink-cdc-base +${flink.cdc.base.version} + + + +org.apache.hudi +hudi-flink1.13-bundle +${hudi.version} + + +io.streamnative.connectors +flink-protobuf +${flink.protobuf.version} + + +com.microsoft.sqlserver +mssql-jdbc +${sqlserver.jdbc.version} + + + +org.apache.flink + flink-table-planner-blink_${flink.scala.binary.version} +${flink.version} + + +org.apache.flink + flink-table-api-java-bridge_${flink.scala.binary.version} +${flink.version} + + +org.apache.flink + flink-table-runtime-blink_${flink.scala.binary.version} +${flink.version} + + +org.apache.flink + flink-sql-parquet_${flink.scala.binary.version} +${flink.version} + + +org.apache.flink + flink-sql-orc_${flink.scala.binary.version} +${flink.version} + + +org.apache.flink + flink-test-utils_${flink.scala.binary.version} +${flink.version} + + +org.ap
[GitHub] [inlong] gong commented on a diff in pull request #7923: [INLONG-7879][Manager][Sort][Agent] Multi-version code migration
gong commented on code in PR #7923: URL: https://github.com/apache/inlong/pull/7923#discussion_r1178934027 ## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors-v1.13/doris/pom.xml: ## @@ -20,7 +20,7 @@ 4.0.0 org.apache.inlong -sort-connectors +sort-connectors-v1.13 1.7.0-SNAPSHOT sort-connector-doris Review Comment: Maybe all connector artifactId need append flink version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #7907: [INLONG-7906][Sort] Improve logic of calculation object byte size
gong commented on code in PR #7907: URL: https://github.com/apache/inlong/pull/7907#discussion_r1178955538 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.util; + +import org.apache.flink.table.data.binary.BinaryRowData; + +import java.nio.charset.StandardCharsets; + +/** + * calculate tool for object + */ +public class CalculateObjectSizeUtils { + +/** + * Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #7944: [INLONG-7941][Manager][Dashborad] Add param to distinguish lightweight
healchow commented on code in PR #7944: URL: https://github.com/apache/inlong/pull/7944#discussion_r1179053366 ## inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java: ## @@ -88,9 +90,12 @@ public Response get(@PathVariable String groupId) { @RequestMapping(value = "/group/countByStatus", method = RequestMethod.GET) @ApiOperation(value = "Count inlong group status for current user") -public Response countGroupByUser() { +@ApiImplicitParams({ +@ApiImplicitParam(name = "lightweight", dataTypeClass = Integer.class, required = true) Review Comment: Please do not add `required = true`, and suggest adding one default value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #7944: [INLONG-7941][Manager][Dashborad] Add param to distinguish lightweight
healchow commented on code in PR #7944: URL: https://github.com/apache/inlong/pull/7944#discussion_r1179054093 ## inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml: ## @@ -98,6 +98,7 @@ select count(*) as total, status from inlong_group where is_deleted = 0 + and lightweight = #{lightweight,jdbcType=TINYINT} Review Comment: Suggest check before using it: ```sql ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #7942: [INLONG-7931][DataProxy] Optimize common.properties related control mechanism
healchow commented on code in PR #7942: URL: https://github.com/apache/inlong/pull/7942#discussion_r1179061423 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java: ## @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.config; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.flume.Context; +import org.apache.inlong.dataproxy.sink.common.DefaultEventHandler; +import org.apache.inlong.dataproxy.sink.mq.AllCacheClusterSelector; +import org.apache.inlong.sdk.commons.protocol.ProxySdk; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * common.properties Configure Holder + */ +public class CommonConfigHolder { + +public static final Logger LOG = LoggerFactory.getLogger(CommonConfigHolder.class); +// configure file name +private static final String COMMON_CONFIG_FILE_NAME = "common.properties"; +// allowed keys and default value, begin +// cluster tag +public static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag"; +public static final String VAL_DEF_CLUSTER_TAG = "default_cluster"; +// cluster name +public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name"; +public static final String VAL_DEF_CLUSTER_NAME = "default_dataproxy"; +// cluster incharges +public static final String KEY_PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges"; +public static final String VAL_DEF_CLUSTER_INCHARGES = "admin"; +// cluster exttag, +public static final String KEY_PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag"; +// predefined format of ext tag: {key}={value} +public static final String VAL_DEF_CLUSTER_EXT_TAG = "default=true"; +// manager type +public static final String KEY_MANAGER_TYPE = "manager.type"; +public static final String VAL_DEF_MANAGER_TYPE = DefaultManagerIpListParser.class.getName(); +// manager hosts +public static final String KEY_MANAGER_HOSTS = "manager.hosts"; +public static final String KEY_MANAGER_HOSTS_SEPARATOR = ","; +// manager auth secret id +public static final String KEY_MANAGER_AUTH_SECRET_ID = "manager.auth.secretId"; +// manager auth secret key +public static final String KEY_MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey"; +// configure file check interval +private static final String KEY_CONFIG_CHECK_INTERVAL_MS = "configCheckInterval"; +public static final long VAL_DEF_CONFIG_CHECK_INTERVAL_MS = 6L; +// Whether to accept messages without mapping between groupId/streamId and topic +public static final String KEY_NOTFOUND_TOPIC_ACCEPT = "source.topic.notfound.accept"; +public static final boolean VAL_DEF_NOTFOUND_TOPIC_ACCEPT = false; +// whether enable whitelist, optional field. +public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist"; +public static final boolean VAL_DEF_ENABLE_WHITELIST = false; +// Audit fields +public static final String KEY_ENABLE_AUDIT = "audit.enable"; +public static final boolean VAL_DEF_ENABLE_AUDIT = true; +public static final String KEY_AUDIT_PROXYS = "audit.proxys"; +public static final String KEY_AUDIT_FILE_PATH = "audit.filePath"; +public static final String VAL_DEF_AUDIT_FILE_PATH = "/data/inlong/audit/"; +public static final String KEY_AUDIT_MAX_CACHE_ROWS = "audit.maxCacheRows"; +public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 200; +public static final String KEY_AUDIT_FORMAT_INTERVAL_MS = "auditFormatInterval"; +public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 6L; +// Whether response after save msg +public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave"; +public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false; +// Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs"; +
[GitHub] [inlong] healchow commented on a diff in pull request #7942: [INLONG-7931][DataProxy] Optimize common.properties related control mechanism
healchow commented on code in PR #7942: URL: https://github.com/apache/inlong/pull/7942#discussion_r1179066889 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java: ## @@ -144,7 +144,8 @@ public static void fillInlongId(Event event, Map dimensions) { */ public static void fillAuditFormatTime(Event event, Map dimensions) { long msgTime = (event != null) ? AuditUtils.getLogTime(event) : System.currentTimeMillis(); -long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval(); +long auditFormatTime = Review Comment: Sorry, but what is the hard wrap character number in your IDE? In our project, there is a limit of 120 characters for a hard wrap. If it is not necessary, I recommended not modifying irrelevant line wrap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #7942: [INLONG-7931][DataProxy] Optimize common.properties related control mechanism
healchow commented on code in PR #7942: URL: https://github.com/apache/inlong/pull/7942#discussion_r1179072100 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java: ## @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.config; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.flume.Context; +import org.apache.inlong.dataproxy.sink.common.DefaultEventHandler; +import org.apache.inlong.dataproxy.sink.mq.AllCacheClusterSelector; +import org.apache.inlong.sdk.commons.protocol.ProxySdk; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * common.properties Configure Holder + */ +public class CommonConfigHolder { + +public static final Logger LOG = LoggerFactory.getLogger(CommonConfigHolder.class); +// configure file name +private static final String COMMON_CONFIG_FILE_NAME = "common.properties"; +// allowed keys and default value, begin +// cluster tag +public static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag"; +public static final String VAL_DEF_CLUSTER_TAG = "default_cluster"; +// cluster name +public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name"; +public static final String VAL_DEF_CLUSTER_NAME = "default_dataproxy"; +// cluster incharges +public static final String KEY_PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges"; +public static final String VAL_DEF_CLUSTER_INCHARGES = "admin"; +// cluster exttag, +public static final String KEY_PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag"; +// predefined format of ext tag: {key}={value} +public static final String VAL_DEF_CLUSTER_EXT_TAG = "default=true"; +// manager type +public static final String KEY_MANAGER_TYPE = "manager.type"; +public static final String VAL_DEF_MANAGER_TYPE = DefaultManagerIpListParser.class.getName(); +// manager hosts +public static final String KEY_MANAGER_HOSTS = "manager.hosts"; +public static final String KEY_MANAGER_HOSTS_SEPARATOR = ","; +// manager auth secret id +public static final String KEY_MANAGER_AUTH_SECRET_ID = "manager.auth.secretId"; +// manager auth secret key +public static final String KEY_MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey"; +// configure file check interval +private static final String KEY_CONFIG_CHECK_INTERVAL_MS = "configCheckInterval"; +public static final long VAL_DEF_CONFIG_CHECK_INTERVAL_MS = 6L; +// Whether to accept messages without mapping between groupId/streamId and topic +public static final String KEY_NOTFOUND_TOPIC_ACCEPT = "source.topic.notfound.accept"; +public static final boolean VAL_DEF_NOTFOUND_TOPIC_ACCEPT = false; +// whether enable whitelist, optional field. +public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist"; +public static final boolean VAL_DEF_ENABLE_WHITELIST = false; +// Audit fields +public static final String KEY_ENABLE_AUDIT = "audit.enable"; +public static final boolean VAL_DEF_ENABLE_AUDIT = true; +public static final String KEY_AUDIT_PROXYS = "audit.proxys"; +public static final String KEY_AUDIT_FILE_PATH = "audit.filePath"; +public static final String VAL_DEF_AUDIT_FILE_PATH = "/data/inlong/audit/"; +public static final String KEY_AUDIT_MAX_CACHE_ROWS = "audit.maxCacheRows"; +public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 200; +public static final String KEY_AUDIT_FORMAT_INTERVAL_MS = "auditFormatInterval"; +public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 6L; +// Whether response after save msg +public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave"; +public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false; +// Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs"; +
[GitHub] [inlong] healchow commented on a diff in pull request #7942: [INLONG-7931][DataProxy] Optimize common.properties related control mechanism
healchow commented on code in PR #7942: URL: https://github.com/apache/inlong/pull/7942#discussion_r1179073030 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java: ## @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.config; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.flume.Context; +import org.apache.inlong.dataproxy.sink.common.DefaultEventHandler; +import org.apache.inlong.dataproxy.sink.mq.AllCacheClusterSelector; +import org.apache.inlong.sdk.commons.protocol.ProxySdk; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * common.properties Configure Holder + */ +public class CommonConfigHolder { + +public static final Logger LOG = LoggerFactory.getLogger(CommonConfigHolder.class); +// configure file name +private static final String COMMON_CONFIG_FILE_NAME = "common.properties"; +// allowed keys and default value, begin +// cluster tag +public static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag"; +public static final String VAL_DEF_CLUSTER_TAG = "default_cluster"; +// cluster name +public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name"; +public static final String VAL_DEF_CLUSTER_NAME = "default_dataproxy"; +// cluster incharges +public static final String KEY_PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges"; +public static final String VAL_DEF_CLUSTER_INCHARGES = "admin"; +// cluster exttag, +public static final String KEY_PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag"; +// predefined format of ext tag: {key}={value} +public static final String VAL_DEF_CLUSTER_EXT_TAG = "default=true"; +// manager type +public static final String KEY_MANAGER_TYPE = "manager.type"; +public static final String VAL_DEF_MANAGER_TYPE = DefaultManagerIpListParser.class.getName(); +// manager hosts +public static final String KEY_MANAGER_HOSTS = "manager.hosts"; +public static final String KEY_MANAGER_HOSTS_SEPARATOR = ","; +// manager auth secret id +public static final String KEY_MANAGER_AUTH_SECRET_ID = "manager.auth.secretId"; +// manager auth secret key +public static final String KEY_MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey"; +// configure file check interval +private static final String KEY_CONFIG_CHECK_INTERVAL_MS = "configCheckInterval"; +public static final long VAL_DEF_CONFIG_CHECK_INTERVAL_MS = 6L; +// Whether to accept messages without mapping between groupId/streamId and topic +public static final String KEY_NOTFOUND_TOPIC_ACCEPT = "source.topic.notfound.accept"; +public static final boolean VAL_DEF_NOTFOUND_TOPIC_ACCEPT = false; +// whether enable whitelist, optional field. +public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist"; +public static final boolean VAL_DEF_ENABLE_WHITELIST = false; +// Audit fields +public static final String KEY_ENABLE_AUDIT = "audit.enable"; +public static final boolean VAL_DEF_ENABLE_AUDIT = true; +public static final String KEY_AUDIT_PROXYS = "audit.proxys"; +public static final String KEY_AUDIT_FILE_PATH = "audit.filePath"; +public static final String VAL_DEF_AUDIT_FILE_PATH = "/data/inlong/audit/"; +public static final String KEY_AUDIT_MAX_CACHE_ROWS = "audit.maxCacheRows"; +public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 200; +public static final String KEY_AUDIT_FORMAT_INTERVAL_MS = "auditFormatInterval"; +public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 6L; +// Whether response after save msg +public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave"; +public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false; +// Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs"; +
[GitHub] [inlong] healchow commented on a diff in pull request #7935: [INLONG-7934][Manager] Optimize the serializationType to support debezium json
healchow commented on code in PR #7935: URL: https://github.com/apache/inlong/pull/7935#discussion_r1179080013 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java: ## @@ -110,17 +110,22 @@ public Map> getSourcesMap(InlongGroupInfo groupInfo, tubeMQSource.setTopic(streamInfo.getMqResource()); tubeMQSource.setGroupId(streamId); tubeMQSource.setMasterRpc(masterRpc); -if (StringUtils.isNotBlank(streamInfo.getDataType())) { -String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType(); -tubeMQSource.setSerializationType(serializationType); -} tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError()); for (StreamSource sourceInfo : streamSources) { if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) { continue; } - tubeMQSource.setSerializationType(sourceInfo.getSerializationType()); + +if (sourceInfo.getSourceType().equalsIgnoreCase(SourceType.FILE) || Review Comment: Do not use `equalsIgnoreCase`, because in the usage scenarios of other modules (such as query MySQL, and Dashboard passing parameters), lowercase types are invalid, and we need to limit the generation of dirty data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #7935: [INLONG-7934][Manager] Optimize the serializationType to support debezium json
healchow commented on code in PR #7935: URL: https://github.com/apache/inlong/pull/7935#discussion_r1179081867 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java: ## @@ -110,17 +110,22 @@ public Map> getSourcesMap(InlongGroupInfo groupInfo, tubeMQSource.setTopic(streamInfo.getMqResource()); tubeMQSource.setGroupId(streamId); tubeMQSource.setMasterRpc(masterRpc); -if (StringUtils.isNotBlank(streamInfo.getDataType())) { -String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType(); -tubeMQSource.setSerializationType(serializationType); -} tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError()); for (StreamSource sourceInfo : streamSources) { if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) { continue; } - tubeMQSource.setSerializationType(sourceInfo.getSerializationType()); + +if (sourceInfo.getSourceType().equalsIgnoreCase(SourceType.FILE) || + sourceInfo.getSourceType().equalsIgnoreCase(SourceType.AUTO_PUSH)) { +if (StringUtils.isNotBlank(streamInfo.getDataType())) { Review Comment: And also, can we extract those common codes into `AbstractSourceOperator`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on pull request #7933: [INLONG-7926][TubeMQ] Add "Register2Master" method for GO SDK
healchow commented on PR #7933: URL: https://github.com/apache/inlong/pull/7933#issuecomment-1525619876 @woofyzhao Hi woofy, do you have time to review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] TszKitLo40 commented on a diff in pull request #7933: [INLONG-7926][TubeMQ] Add "Register2Master" method for GO SDK
TszKitLo40 commented on code in PR #7933: URL: https://github.com/apache/inlong/pull/7933#discussion_r1179849093 ## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer_impl.go: ## @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "context" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/multiplexing" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/transport" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/util" +) + +type producer struct { + clientIDstring + config *config.Config + nextAuth2Master int32 + selectorselector.Selector + master *selector.Node + client rpc.RPCClient + masterHBRetry int + unreportedTimes int + publishTopics []string + brokerCheckSum int64 + brokerMap map[string]*metadata.Node + brokerMusync.Mutex +} + +// NewProducer returns a producer which is constructed by a given config. +func NewProducer(config *config.Config) (Producer, error) { + if err := config.ValidateProducer(); err != nil { + return nil, err + } + log.Infof("The config of the producer is %s", config) + + selector, err := selector.Get("ip") + if err != nil { + return nil, err + } + + clientID := newProducerClientID() + pool := multiplexing.NewPool() + opts := &transport.Options{} + if config.Net.TLS.Enable { + opts.TLSEnable = true + opts.CACertFile = config.Net.TLS.CACertFile + opts.TLSCertFile = config.Net.TLS.TLSCertFile + opts.TLSKeyFile = config.Net.TLS.TLSKeyFile + opts.TLSServerName = config.Net.TLS.TLSServerName + } + + client := rpc.New(pool, opts, config) + + p := &producer{ + config: config, + clientID:clientID, + selector:selector, + client: client, + unreportedTimes: 0, + brokerMap: make(map[string]*metadata.Node), + publishTopics: config.Producer.Topics, + } + + err = p.register2Master(true) + if err != nil { + return nil, err + } + + log.Infof("[PRODUCER] start producer success, client=%s", clientID) + return p, nil +} + +func (p *producer) register2Master(needChange bool) error { + if needChange { + p.selector.Refresh(p.config.Producer.Masters) + node, err := p.selector.Select(p.config.Producer.Masters) + if err != nil { + return err + } + p.master = node + } + + retryCount := 0 + for { + rsp, err := p.sendRegRequest2Master() + if err != nil || !rsp.GetSuccess() { + if err != nil { + log.Errorf("[PRODUCER]register2Master error %s", err.Error()) + } + if !p.master.HasNext { + if err != nil { + return err + } + if rsp != nil { + log.Errorf("[PRODUCER] register2Master(%s) failure exist register, client=%s, er
[GitHub] [inlong] TszKitLo40 commented on pull request #7933: [INLONG-7926][TubeMQ] Add "Register2Master" method for GO SDK
TszKitLo40 commented on PR #7933: URL: https://github.com/apache/inlong/pull/7933#issuecomment-1526855483 I don't know the specific logic of producer. Maybe @gosonzhang should have a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang commented on a diff in pull request #7942: [INLONG-7931][DataProxy] Optimize common.properties related control mechanism
gosonzhang commented on code in PR #7942: URL: https://github.com/apache/inlong/pull/7942#discussion_r1179851303 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java: ## @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.config; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.flume.Context; +import org.apache.inlong.dataproxy.sink.common.DefaultEventHandler; +import org.apache.inlong.dataproxy.sink.mq.AllCacheClusterSelector; +import org.apache.inlong.sdk.commons.protocol.ProxySdk; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * common.properties Configure Holder + */ +public class CommonConfigHolder { + +public static final Logger LOG = LoggerFactory.getLogger(CommonConfigHolder.class); +// configure file name +private static final String COMMON_CONFIG_FILE_NAME = "common.properties"; +// allowed keys and default value, begin +// cluster tag +public static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag"; +public static final String VAL_DEF_CLUSTER_TAG = "default_cluster"; +// cluster name +public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name"; +public static final String VAL_DEF_CLUSTER_NAME = "default_dataproxy"; +// cluster incharges +public static final String KEY_PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges"; +public static final String VAL_DEF_CLUSTER_INCHARGES = "admin"; +// cluster exttag, +public static final String KEY_PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag"; +// predefined format of ext tag: {key}={value} +public static final String VAL_DEF_CLUSTER_EXT_TAG = "default=true"; +// manager type +public static final String KEY_MANAGER_TYPE = "manager.type"; +public static final String VAL_DEF_MANAGER_TYPE = DefaultManagerIpListParser.class.getName(); +// manager hosts +public static final String KEY_MANAGER_HOSTS = "manager.hosts"; +public static final String KEY_MANAGER_HOSTS_SEPARATOR = ","; +// manager auth secret id +public static final String KEY_MANAGER_AUTH_SECRET_ID = "manager.auth.secretId"; +// manager auth secret key +public static final String KEY_MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey"; +// configure file check interval +private static final String KEY_CONFIG_CHECK_INTERVAL_MS = "configCheckInterval"; +public static final long VAL_DEF_CONFIG_CHECK_INTERVAL_MS = 6L; +// Whether to accept messages without mapping between groupId/streamId and topic +public static final String KEY_NOTFOUND_TOPIC_ACCEPT = "source.topic.notfound.accept"; +public static final boolean VAL_DEF_NOTFOUND_TOPIC_ACCEPT = false; +// whether enable whitelist, optional field. +public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist"; +public static final boolean VAL_DEF_ENABLE_WHITELIST = false; +// Audit fields +public static final String KEY_ENABLE_AUDIT = "audit.enable"; +public static final boolean VAL_DEF_ENABLE_AUDIT = true; +public static final String KEY_AUDIT_PROXYS = "audit.proxys"; +public static final String KEY_AUDIT_FILE_PATH = "audit.filePath"; +public static final String VAL_DEF_AUDIT_FILE_PATH = "/data/inlong/audit/"; +public static final String KEY_AUDIT_MAX_CACHE_ROWS = "audit.maxCacheRows"; +public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 200; +public static final String KEY_AUDIT_FORMAT_INTERVAL_MS = "auditFormatInterval"; +public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 6L; +// Whether response after save msg +public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave"; +public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false; +// Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
[GitHub] [inlong] gosonzhang commented on a diff in pull request #7942: [INLONG-7931][DataProxy] Optimize common.properties related control mechanism
gosonzhang commented on code in PR #7942: URL: https://github.com/apache/inlong/pull/7942#discussion_r1179854246 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java: ## @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.config; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.flume.Context; +import org.apache.inlong.dataproxy.sink.common.DefaultEventHandler; +import org.apache.inlong.dataproxy.sink.mq.AllCacheClusterSelector; +import org.apache.inlong.sdk.commons.protocol.ProxySdk; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * common.properties Configure Holder + */ +public class CommonConfigHolder { + +public static final Logger LOG = LoggerFactory.getLogger(CommonConfigHolder.class); +// configure file name +private static final String COMMON_CONFIG_FILE_NAME = "common.properties"; +// allowed keys and default value, begin +// cluster tag +public static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag"; +public static final String VAL_DEF_CLUSTER_TAG = "default_cluster"; +// cluster name +public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name"; +public static final String VAL_DEF_CLUSTER_NAME = "default_dataproxy"; +// cluster incharges +public static final String KEY_PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges"; +public static final String VAL_DEF_CLUSTER_INCHARGES = "admin"; +// cluster exttag, +public static final String KEY_PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag"; +// predefined format of ext tag: {key}={value} +public static final String VAL_DEF_CLUSTER_EXT_TAG = "default=true"; +// manager type +public static final String KEY_MANAGER_TYPE = "manager.type"; +public static final String VAL_DEF_MANAGER_TYPE = DefaultManagerIpListParser.class.getName(); +// manager hosts +public static final String KEY_MANAGER_HOSTS = "manager.hosts"; +public static final String KEY_MANAGER_HOSTS_SEPARATOR = ","; +// manager auth secret id +public static final String KEY_MANAGER_AUTH_SECRET_ID = "manager.auth.secretId"; +// manager auth secret key +public static final String KEY_MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey"; +// configure file check interval +private static final String KEY_CONFIG_CHECK_INTERVAL_MS = "configCheckInterval"; +public static final long VAL_DEF_CONFIG_CHECK_INTERVAL_MS = 6L; +// Whether to accept messages without mapping between groupId/streamId and topic +public static final String KEY_NOTFOUND_TOPIC_ACCEPT = "source.topic.notfound.accept"; +public static final boolean VAL_DEF_NOTFOUND_TOPIC_ACCEPT = false; +// whether enable whitelist, optional field. +public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist"; +public static final boolean VAL_DEF_ENABLE_WHITELIST = false; +// Audit fields +public static final String KEY_ENABLE_AUDIT = "audit.enable"; +public static final boolean VAL_DEF_ENABLE_AUDIT = true; +public static final String KEY_AUDIT_PROXYS = "audit.proxys"; +public static final String KEY_AUDIT_FILE_PATH = "audit.filePath"; +public static final String VAL_DEF_AUDIT_FILE_PATH = "/data/inlong/audit/"; +public static final String KEY_AUDIT_MAX_CACHE_ROWS = "audit.maxCacheRows"; +public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 200; +public static final String KEY_AUDIT_FORMAT_INTERVAL_MS = "auditFormatInterval"; +public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 6L; +// Whether response after save msg +public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave"; +public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false; +// Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
[GitHub] [inlong] gosonzhang commented on a diff in pull request #7942: [INLONG-7931][DataProxy] Optimize common.properties related control mechanism
gosonzhang commented on code in PR #7942: URL: https://github.com/apache/inlong/pull/7942#discussion_r1179854788 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java: ## @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.config; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.flume.Context; +import org.apache.inlong.dataproxy.sink.common.DefaultEventHandler; +import org.apache.inlong.dataproxy.sink.mq.AllCacheClusterSelector; +import org.apache.inlong.sdk.commons.protocol.ProxySdk; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * common.properties Configure Holder + */ +public class CommonConfigHolder { + +public static final Logger LOG = LoggerFactory.getLogger(CommonConfigHolder.class); +// configure file name +private static final String COMMON_CONFIG_FILE_NAME = "common.properties"; +// allowed keys and default value, begin +// cluster tag +public static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag"; +public static final String VAL_DEF_CLUSTER_TAG = "default_cluster"; +// cluster name +public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name"; +public static final String VAL_DEF_CLUSTER_NAME = "default_dataproxy"; +// cluster incharges +public static final String KEY_PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges"; +public static final String VAL_DEF_CLUSTER_INCHARGES = "admin"; +// cluster exttag, +public static final String KEY_PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag"; +// predefined format of ext tag: {key}={value} +public static final String VAL_DEF_CLUSTER_EXT_TAG = "default=true"; +// manager type +public static final String KEY_MANAGER_TYPE = "manager.type"; +public static final String VAL_DEF_MANAGER_TYPE = DefaultManagerIpListParser.class.getName(); +// manager hosts +public static final String KEY_MANAGER_HOSTS = "manager.hosts"; +public static final String KEY_MANAGER_HOSTS_SEPARATOR = ","; +// manager auth secret id +public static final String KEY_MANAGER_AUTH_SECRET_ID = "manager.auth.secretId"; +// manager auth secret key +public static final String KEY_MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey"; +// configure file check interval +private static final String KEY_CONFIG_CHECK_INTERVAL_MS = "configCheckInterval"; +public static final long VAL_DEF_CONFIG_CHECK_INTERVAL_MS = 6L; +// Whether to accept messages without mapping between groupId/streamId and topic +public static final String KEY_NOTFOUND_TOPIC_ACCEPT = "source.topic.notfound.accept"; +public static final boolean VAL_DEF_NOTFOUND_TOPIC_ACCEPT = false; +// whether enable whitelist, optional field. +public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist"; +public static final boolean VAL_DEF_ENABLE_WHITELIST = false; +// Audit fields +public static final String KEY_ENABLE_AUDIT = "audit.enable"; +public static final boolean VAL_DEF_ENABLE_AUDIT = true; +public static final String KEY_AUDIT_PROXYS = "audit.proxys"; +public static final String KEY_AUDIT_FILE_PATH = "audit.filePath"; +public static final String VAL_DEF_AUDIT_FILE_PATH = "/data/inlong/audit/"; +public static final String KEY_AUDIT_MAX_CACHE_ROWS = "audit.maxCacheRows"; +public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 200; +public static final String KEY_AUDIT_FORMAT_INTERVAL_MS = "auditFormatInterval"; +public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 6L; +// Whether response after save msg +public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave"; +public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false; +// Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
[GitHub] [inlong] gosonzhang commented on a diff in pull request #7942: [INLONG-7931][DataProxy] Optimize common.properties related control mechanism
gosonzhang commented on code in PR #7942: URL: https://github.com/apache/inlong/pull/7942#discussion_r1179855444 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java: ## @@ -144,7 +144,8 @@ public static void fillInlongId(Event event, Map dimensions) { */ public static void fillAuditFormatTime(Event event, Map dimensions) { long msgTime = (event != null) ? AuditUtils.getLogTime(event) : System.currentTimeMillis(); -long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval(); +long auditFormatTime = Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] fuweng11 commented on a diff in pull request #7944: [INLONG-7941][Manager][Dashborad] Add param to distinguish lightweight
fuweng11 commented on code in PR #7944: URL: https://github.com/apache/inlong/pull/7944#discussion_r1179884219 ## inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java: ## @@ -88,9 +90,12 @@ public Response get(@PathVariable String groupId) { @RequestMapping(value = "/group/countByStatus", method = RequestMethod.GET) @ApiOperation(value = "Count inlong group status for current user") -public Response countGroupByUser() { +@ApiImplicitParams({ Review Comment: No need `@ApiImplicitParams`, just use `@ApiImplicitParam` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap merged pull request #7907: [INLONG-7906][Sort] Improve logic of calculation object byte size
EMsnap merged PR #7907: URL: https://github.com/apache/inlong/pull/7907 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-7906][Sort] Improve logic of calculation object byte size (#7907)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 704ef3eb7 [INLONG-7906][Sort] Improve logic of calculation object byte size (#7907) 704ef3eb7 is described below commit 704ef3eb7440b1ad3a41ab851203f56c1fe650e3 Author: Xin Gong AuthorDate: Fri Apr 28 11:20:01 2023 +0800 [INLONG-7906][Sort] Improve logic of calculation object byte size (#7907) --- .../inlong/sort/base/metric/SinkMetricData.java| 8 ++-- .../inlong/sort/base/metric/SourceMetricData.java | 14 +++ .../sort/base/metric/sub/SinkTableMetricData.java | 7 ++-- .../sort/base/util/CalculateObjectSizeUtils.java | 47 ++ .../base/util/CalculateObjectSizeUtilsTest.java| 47 ++ .../table/DorisDynamicSchemaOutputFormat.java | 9 +++-- .../filesystem/stream/AbstractStreamingWriter.java | 7 ++-- .../inlong/sort/hbase/sink/HBaseSinkFunction.java | 5 ++- .../hive/filesystem/AbstractStreamingWriter.java | 4 +- .../sink/multiple/DynamicSchemaHandleOperator.java | 4 +- .../sink/multiple/IcebergMultipleStreamWriter.java | 4 +- .../jdbc/internal/JdbcBatchingOutputFormat.java| 4 +- .../internal/TableMetricStatementExecutor.java | 9 +++-- 13 files changed, 130 insertions(+), 39 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index 3e0ae04b3..a130be24a 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -25,7 +25,6 @@ import org.apache.inlong.audit.AuditOperator; import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; -import java.nio.charset.StandardCharsets; import java.util.Map; import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; @@ -36,6 +35,7 @@ import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND; +import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize; /** * A collection class for handling metrics @@ -253,13 +253,11 @@ public class SinkMetricData implements MetricData { } public void invokeWithEstimate(Object o) { -long size = o.toString().getBytes(StandardCharsets.UTF_8).length; -invoke(1, size); +invoke(1, getDataSize(o)); } public void invokeDirtyWithEstimate(Object o) { -long size = o.toString().getBytes(StandardCharsets.UTF_8).length; -invokeDirty(1, size); +invokeDirty(1, getDataSize(o)); } public void invoke(long rowCount, long rowSize) { diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index e5ffdf844..b5ca46713 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -17,24 +17,24 @@ package org.apache.inlong.sort.base.metric; -import java.util.List; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; import org.apache.inlong.audit.AuditOperator; - -import java.nio.charset.StandardCharsets; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.Map; + import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND; +import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize; /** * A collection class for handling metrics @@ -194,13 +194,11 @@ public class SourceMetricData implements MetricData { } public void outputMetricsWithEstimate(Object data) { -long size = data.toString()
[GitHub] [inlong] haifxu commented on a diff in pull request #7935: [INLONG-7934][Manager] Optimize the serializationType to support debezium json
haifxu commented on code in PR #7935: URL: https://github.com/apache/inlong/pull/7935#discussion_r1179893375 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java: ## @@ -110,17 +110,22 @@ public Map> getSourcesMap(InlongGroupInfo groupInfo, tubeMQSource.setTopic(streamInfo.getMqResource()); tubeMQSource.setGroupId(streamId); tubeMQSource.setMasterRpc(masterRpc); -if (StringUtils.isNotBlank(streamInfo.getDataType())) { -String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType(); -tubeMQSource.setSerializationType(serializationType); -} tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError()); for (StreamSource sourceInfo : streamSources) { if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) { continue; } - tubeMQSource.setSerializationType(sourceInfo.getSerializationType()); + +if (sourceInfo.getSourceType().equalsIgnoreCase(SourceType.FILE) || + sourceInfo.getSourceType().equalsIgnoreCase(SourceType.AUTO_PUSH)) { +if (StringUtils.isNotBlank(streamInfo.getDataType())) { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] lordcheng10 commented on a diff in pull request #7905: [INLONG-7903][Sort] Kafka sink supports fixed partition strategy
lordcheng10 commented on code in PR #7905: URL: https://github.com/apache/inlong/pull/7905#discussion_r1179958099 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java: ## @@ -187,7 +190,18 @@ public final class Constants { .noDefaultValue() .withDescription( "The format of multiple sink, it represents the real format of the raw binary data"); - +public static final ConfigOption PATTERN_PARTITION_MAP = +ConfigOptions.key("pattern.partition.map") +.stringType() +.noDefaultValue() +.withDescription( +"Pattern rules and partition maps"); Review Comment: OK, I will fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] lordcheng10 commented on a diff in pull request #7905: [INLONG-7903][Sort] Kafka sink supports fixed partition strategy
lordcheng10 commented on code in PR #7905: URL: https://github.com/apache/inlong/pull/7905#discussion_r1179958099 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java: ## @@ -187,7 +190,18 @@ public final class Constants { .noDefaultValue() .withDescription( "The format of multiple sink, it represents the real format of the raw binary data"); - +public static final ConfigOption PATTERN_PARTITION_MAP = +ConfigOptions.key("pattern.partition.map") +.stringType() +.noDefaultValue() +.withDescription( +"Pattern rules and partition maps"); Review Comment: I will fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] lordcheng10 commented on a diff in pull request #7905: [INLONG-7903][Sort] Kafka sink supports fixed partition strategy
lordcheng10 commented on code in PR #7905: URL: https://github.com/apache/inlong/pull/7905#discussion_r1179991387 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java: ## @@ -187,7 +190,18 @@ public final class Constants { .noDefaultValue() .withDescription( "The format of multiple sink, it represents the real format of the raw binary data"); - +public static final ConfigOption PATTERN_PARTITION_MAP = +ConfigOptions.key("pattern.partition.map") +.stringType() +.noDefaultValue() +.withDescription( +"Pattern rules and partition maps"); Review Comment: Fixed. ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java: ## @@ -187,7 +190,18 @@ public final class Constants { .noDefaultValue() .withDescription( "The format of multiple sink, it represents the real format of the raw binary data"); - +public static final ConfigOption PATTERN_PARTITION_MAP = +ConfigOptions.key("pattern.partition.map") +.stringType() +.noDefaultValue() +.withDescription( +"Pattern rules and partition maps"); +public static final ConfigOption> DATASOURCE_PARTITION_MAP = +ConfigOptions.key("datasource.partition.map") +.mapType() +.noDefaultValue() +.withDescription( +"Datasource and partition maps"); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] lordcheng10 commented on a diff in pull request #7905: [INLONG-7903][Sort] Kafka sink supports fixed partition strategy
lordcheng10 commented on code in PR #7905: URL: https://github.com/apache/inlong/pull/7905#discussion_r1179991566 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/InLongFixedPartitionPartitioner.java: ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kafka.partitioner; + +import lombok.Setter; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat; +import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +public class InLongFixedPartitionPartitioner extends FlinkKafkaPartitioner { + +private static final Logger LOG = LoggerFactory.getLogger(InLongFixedPartitionPartitioner.class); + +private final Map patternPartitionMap; +private final Map regexPatternMap; +private AbstractDynamicSchemaFormat dynamicSchemaFormat; + +/** + * The format used to deserialization the raw data(bytes array) + */ +@Setter +private String sinkMultipleFormat; + +private final String DEFAULT_PARTITION = "DEFAULT_PARTITION"; + +private String databasePattern; +private String tablePattern; + +private final static String DELIMITER1 = "&"; +private final static String DELIMITER2 = "_"; +private final static String DELIMITER3 = ":"; +private final static String DELIMITER4 = ","; + +private final int defaultPartitionId; + + +public InLongFixedPartitionPartitioner(String patternPartitionMapConfig, String partitionPattern) { +this.patternPartitionMap = configToMap(patternPartitionMapConfig); +this.regexPatternMap = new HashMap<>(); +this.databasePattern = partitionPattern.split(DELIMITER2)[0]; +this.tablePattern = partitionPattern.split(DELIMITER2)[1]; +this.defaultPartitionId = Integer.parseInt(patternPartitionMap.getOrDefault(DEFAULT_PARTITION, "0")); +} + +@Override +public void open(int parallelInstanceId, int parallelInstances) { +super.open(parallelInstanceId, parallelInstances); +dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat); +} + +@Override +public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] lordcheng10 commented on pull request #7905: [INLONG-7903][Sort] Kafka sink supports fixed partition strategy
lordcheng10 commented on PR #7905: URL: https://github.com/apache/inlong/pull/7905#issuecomment-1527067788 > Thanks for your contribution, could u please add some test for InLongFixedPartitionPartitioner? I will add some tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org