[PR] [Inlong 9167][SDK] Use UUID as the batch ID instead of snowflake ID for DataProxy Golang SDK [inlong]
gunli opened a new pull request, #9169: URL: https://github.com/apache/inlong/pull/9169 ### Prepare a Pull Request - Fixes #9167 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications Currently, we use snowflake ID as the batch ID in DataProxy Golang SDK, which dependents on the host IP, when we get private IP failed, SDK will init failed, we can use an UUID as the batch ID. ### Verifying this change *(Please pick either of the following options)* - [x] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9165][Agent] Delete job related file [inlong]
luchunliang merged PR #9166: URL: https://github.com/apache/inlong/pull/9166 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9165][Agent] Delete job related file (#9166)
This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 5f2ad4bf90 [INLONG-9165][Agent] Delete job related file (#9166) 5f2ad4bf90 is described below commit 5f2ad4bf905a162b0cf95753674994122b3051cd Author: justinwwhuang AuthorDate: Tue Oct 31 16:11:00 2023 +0800 [INLONG-9165][Agent] Delete job related file (#9166) --- .../inlong/agent/common/AgentThreadFactory.java| 2 +- .../java/org/apache/inlong/agent/pojo/FileJob.java | 141 --- .../apache/inlong/agent/pojo/JobProfileDto.java| 426 - .../org/apache/inlong/agent/utils/AgentUtils.java | 29 +- .../agent/common/TestAgentThreadFactory.java | 4 +- .../apache/inlong/agent/core/HeartbeatManager.java | 18 - .../inlong/agent/core/task/PositionManager.java| 1 - .../inlong/agent/core/task/TestMemoryManager.java | 11 +- .../inlong/agent/plugin/instance/FileInstance.java | 6 +- .../src/test/resources/fileAgent.trigger.json | 27 -- .../src/test/resources/fileAgentJob.json | 24 -- .../agent-plugins/src/test/resources/test/1.txt| 3 - .../agent-plugins/src/test/resources/test/2.txt| 2 - .../agent-plugins/src/test/resources/test/3.txt| 5 - .../agent-plugins/src/test/resources/test/a.txt| 3 - 15 files changed, 20 insertions(+), 682 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AgentThreadFactory.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AgentThreadFactory.java index 5d756fc7a4..c9b9f3f2a6 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AgentThreadFactory.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AgentThreadFactory.java @@ -33,7 +33,7 @@ public class AgentThreadFactory implements ThreadFactory { private static final Logger LOGGER = LoggerFactory.getLogger(AgentThreadFactory.class); -public static final String NAMED_THREAD_PLACEHOLDER = "running-thread"; +public static final String NAMED_THREAD_PLACEHOLDER = "agent-thread-factory"; private final AtomicInteger mThreadNum = new AtomicInteger(1); diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java deleted file mode 100644 index 33cdacf764..00 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.pojo; - -import lombok.Data; - -import java.util.List; -import java.util.Map; - -@Data -public class FileJob { - -private String trigger; - -private Dir dir; -private Thread thread; -private int id; -private String timeOffset; -private String addictiveString; -private String collectType; -private Line line; - -// INCREMENT -// FULL -private String contentCollectType; - -private String envList; - -// JSON string, the content format is List> -private String metaFields; - -private String dataSeparator; - -// JSON string, the content format is Map -private String filterMetaByLabels; - -// JSON string, the content format is Map -private String properties; - -// Monitor interval for file -private Long monitorInterval; - -// Monitor switch, 1 true and 0 false -private Integer monitorStatus; - -// Monitor expire time and the time in milliseconds -private Long monitorExpire; - -@Data -public static class Dir { - -private String patterns; - -private String blackList; -} - -@Data -public static class Running { - -private String core; -} - -@Data -public static class Thread { - -private Running running; -} - -@Data -public static class Line { - -private String endPattern; -} - -@Data -public static class FileJobTaskConfi
[PR] [INLONG-9170][SDK] use pointers instead of objects [inlong]
gunli opened a new pull request, #9171: URL: https://github.com/apache/inlong/pull/9171 ### [INLONG-9170][SDK] use pointers instead of objects - Fixes #9170 ### Motivation Currently, date types in worker.sendFailedBatches and woker.responseBatches are objects, not pointers, which will cost more memory in runtime, we can use pointer instead of ojbects. ### Modifications worker.go ``` go sendFailedBatches chan sendFailedBatchReq // send failed batches channel responseBatcheschan batchRsp // batch response channel ``` to ``` go sendFailedBatches chan *sendFailedBatchReq // send failed batches channel responseBatcheschan *batchRsp // batch response channel ``` ### Verifying this change *(Please pick either of the following options)* - [x] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-8994][Sort] Add hudi connector on flink 1.15 (#9168)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new f055e00c5a [INLONG-8994][Sort] Add hudi connector on flink 1.15 (#9168) f055e00c5a is described below commit f055e00c5a9d93345c3949f391888192e04e0a8a Author: Sting AuthorDate: Tue Oct 31 16:32:38 2023 +0800 [INLONG-8994][Sort] Add hudi connector on flink 1.15 (#9168) --- .../src/main/assemblies/sort-connectors-v1.15.xml | 8 + inlong-sort/sort-core/pom.xml | 6 + inlong-sort/sort-flink/sort-flink-v1.15/pom.xml| 2 +- .../sort-flink-v1.15/sort-connectors/hudi/pom.xml | 106 .../hudi/table/sink/HudiTableInlongFactory.java| 55 .../org.apache.flink.table.factories.Factory | 16 ++ .../sort-flink-v1.15/sort-connectors/pom.xml | 1 + .../notices/NOTICE-hudi-flink1.15-bundle.txt | 299 + 8 files changed, 492 insertions(+), 1 deletion(-) diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml index 3b8b56497d..c88fd0cd3d 100644 --- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml +++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml @@ -99,5 +99,13 @@ 0644 + + ../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/target +inlong-sort/connectors + + sort-connector-hudi-v1.15-${project.version}.jar + +0644 + diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml index e8303bc57d..7228bfb59c 100644 --- a/inlong-sort/sort-core/pom.xml +++ b/inlong-sort/sort-core/pom.xml @@ -287,6 +287,12 @@ ${project.version} test + +org.apache.inlong +sort-connector-hudi-v1.15 +${project.version} +test + diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml index d1e459e088..923bf01d4c 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml @@ -130,7 +130,7 @@ org.apache.hudi -hudi-flink1.13-bundle +hudi-flink1.15-bundle ${hudi.version} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml new file mode 100644 index 00..ee0118e330 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml @@ -0,0 +1,106 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + +org.apache.inlong +sort-connectors-v1.15 +1.10.0-SNAPSHOT + + +sort-connector-hudi-v1.15 +jar +Apache InLong - Sort-connector-hudi + + + ${project.parent.parent.parent.parent.parent.basedir} +4.2.1 + + + + +org.apache.inlong +sort-connector-base +${project.version} + + +org.apache.hudi +hudi-flink1.15-bundle + + +org.apache.thrift +libfb303 + + +com.google.guava +guava +${guava.version} + + +com.fasterxml.woodstox +woodstox-core +${woodstox-core.version} + + +org.codehaus.woodstox +stax2-api +${stax2-api.version} + + + + + + +org.apache.maven.plugins +maven-shade-plugin + + +shade-flink + +shade + +package + + + +org.apache.hudi:* + org.apache.hive:hive-exec +org.apache.hadoop:* +com.fasterxml.woodstox:* +org.codehaus.woodstox:* +com.google.guava:* +com.google.protobuf:* +
Re: [PR] [INLONG-8994][Sort] Add Hudi connector on Flink 1.15 [inlong]
EMsnap merged PR #9168: URL: https://github.com/apache/inlong/pull/9168 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-9172][SDK]Delete useless debug logs in Golang SDK [inlong]
gunli opened a new pull request, #9173: URL: https://github.com/apache/inlong/pull/9173 ### [INLONG-9172][SDK]Delete useless debug logs in Golang SDK - Fixes #9172 ### Motivation Although the log level is not enabled, Some debug logs in the frequently called methods of Golang SDK will lead to string allocation, which will decrease the perf of the SDK, we need to delete/comment them. ### Modifications worker.go ### Verifying this change *(Please pick either of the following options)* - [x] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-9174][SDK] Optimize response attr parsing in Golang SDK [inlong]
gunli opened a new pull request, #9175: URL: https://github.com/apache/inlong/pull/9175 ### [INLONG-9174][SDK] Optimize response attr parsing in Golang SDK - Fixes #9174 ### Motivation Currently, when we parse the attr in response, we use Strings.Spilt() to parse, which will alloc new strings in runtime, and this is a frequently called method, and will finally decrease the perf of the SDK. We need to improve it. ### Modifications Use `Strings.FieldsFunc()` instead of `Strings.Spilt()` ### Verifying this change *(Please pick either of the following options)* - [x] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-9176][SDK] Fail fast when work is unavailable in Golang SDK [inlong]
gunli opened a new pull request, #9177: URL: https://github.com/apache/inlong/pull/9177 ### [INLONG-9176][SDK] Fail fast when work is unavailable in Golang SDK - Fixes #9176 ### Motivation Currently, when we send a message, if the worker is unavailable, we will try another one and wait 1ms, which will make the SDK step into a 'Blocking' like state, it is better to fail fast. ### Modifications Fail fast when work is unavailable ### Verifying this change *(Please pick either of the following options)* - [x] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-9178][SDK] Update the default values of the config options of Golang SDK [inlong]
gunli opened a new pull request, #9179: URL: https://github.com/apache/inlong/pull/9179 ### [INLONG-9178][SDK] Update the default values of the config options of Golang SDK - Fixes #9178 ### Motivation Currently, the default values of the config options of Golang SDK are too BIG, which will cost a lot of memory, we can make it smaller, and some are to small, we need to make it bigger. ### Modifications options.go ### Verifying this change *(Please pick either of the following options)* - [x] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK [inlong]
gunli opened a new pull request, #9181: URL: https://github.com/apache/inlong/pull/9181 ### [INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK - Fixes #9180 ### Motivation Cache up batchReq.dataReqs Golang SDK to improve perf ### Modifications request.go worker.go ### Verifying this change *(Please pick either of the following options)* - [x] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Improve][DataProxy] Update DataProxy Golang SDK docs [inlong-website]
gunli opened a new issue, #886: URL: https://github.com/apache/inlong-website/issues/886 ### Description The default values of the config options of Golang SDK, update the docs accordingly. ### Are you willing to submit PR? - [X] Yes, I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-886][Doc] Update DataProxy Golang SDK docs [inlong-website]
gunli opened a new pull request, #887: URL: https://github.com/apache/inlong-website/pull/887 ### [INLONG-886][Doc] Update DataProxy Golang SDK docs - Fixes #886 ### Motivation Update the default values of the config options. ### Modifications Update the default values of the config options. ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9172][SDK] Delete useless debug logs in Golang SDK [inlong]
EMsnap commented on code in PR #9173: URL: https://github.com/apache/inlong/pull/9173#discussion_r1377449020 ## inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go: ## @@ -500,9 +500,9 @@ func (w *worker) handleRetry(batch *batchReq, retryOnFail bool) { } func (w *worker) handleBatchTimeout() { - for streamID, batch := range w.pendingBatches { + for _, batch := range w.pendingBatches { if time.Since(batch.batchTime) > w.options.BatchingMaxPublishDelay { - w.log.Debug("worker[", w.index, "] batch timeout, send it now:", batch.batchID, ", streamID:", streamID) + //w.log.Debug("worker[", w.index, "] batch timeout, send it now:", batch.batchID, ", streamID:", streamID) Review Comment: just delete it thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-9182][Agent] Delete useless code [inlong]
justinwwhuang opened a new pull request, #9183: URL: https://github.com/apache/inlong/pull/9183 ### Prepare a Pull Request [INLONG-9182][Agent] Delete useless code - Fixes #9182 ### Motivation Delete useless code ### Modifications Delete useless code ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK [inlong]
dockerzhang merged PR #9181: URL: https://github.com/apache/inlong/pull/9181 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK (#9181)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 0d07152244 [INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK (#9181) 0d07152244 is described below commit 0d07152244153db1299d75a1a64ebb0a8b891d05 Author: gunli <24350...@qq.com> AuthorDate: Wed Nov 1 09:47:14 2023 +0800 [INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK (#9181) Co-authored-by: gunli --- .../dataproxy-sdk-golang/dataproxy/request.go | 23 +- .../dataproxy-sdk-golang/dataproxy/worker.go | 6 +- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go index d93b0af006..187e074415 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go @@ -53,7 +53,9 @@ func init() { } batchPool = &sync.Pool{ New: func() interface{} { - return &batchReq{} + return &batchReq{ + dataReqs: make([]*sendDataReq, 0, 50), + } }, } } @@ -109,12 +111,17 @@ func (b *batchReq) append(req *sendDataReq) { func (b *batchReq) done(err error) { errorCode := getErrorCode(err) - for _, req := range b.dataReqs { + for i, req := range b.dataReqs { req.done(err, errorCode) + b.dataReqs[i] = nil + } + if b.dataReqs != nil { + b.dataReqs = b.dataReqs[:0] } if b.callback != nil { b.callback() + b.callback = nil } if b.buffer != nil && b.bufferPool != nil { @@ -128,10 +135,12 @@ func (b *batchReq) done(err error) { } b.metrics.observeTime(errorCode, time.Since(b.batchTime).Milliseconds()) b.metrics.observeSize(errorCode, b.dataSize) + b.metrics = nil } if b.pool != nil { b.pool.Put(b) + b.pool = nil } } @@ -334,25 +343,29 @@ type sendDataReq struct { func (s *sendDataReq) done(err error, errCode string) { if s.semaphore != nil { s.semaphore.Release() + if s.metrics != nil { + s.metrics.decPending(s.workerID) + } + s.semaphore = nil } if s.callback != nil { s.callback(s.msg, err) + s.callback = nil } if s.metrics != nil { - if s.semaphore != nil { - s.metrics.decPending(s.workerID) - } if errCode == "" { errCode = getErrorCode(err) } s.metrics.incMessage(errCode) + s.metrics = nil } if s.pool != nil { s.pool.Put(s) + s.pool = nil } } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index be3772c38d..d5ab3018cf 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -322,13 +322,17 @@ func (w *worker) handleSendData(req *sendDataReq) { if !ok { streamID := req.msg.StreamID batch = batchPool.Get().(*batchReq) + dataReqs := batch.dataReqs + if dataReqs == nil { + dataReqs = make([]*sendDataReq, 0, w.options.BatchingMaxMessages) + } *batch = batchReq{ pool: batchPool, workerID: w.indexStr, batchID:util.SnowFlakeID(), groupID:w.options.GroupID, streamID: streamID, - dataReqs: make([]*sendDataReq, 0, w.options.BatchingMaxMessages), + dataReqs: dataReqs, batchTime: time.Now(), retries:0, bufferPool: w.bufferPool,
Re: [PR] [INLONG-9178][SDK] Update the default values of the config options of Golang SDK [inlong]
dockerzhang merged PR #9179: URL: https://github.com/apache/inlong/pull/9179 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated (0d07152244 -> 7dacceb0a0)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 0d07152244 [INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK (#9181) add 7dacceb0a0 [INLONG-9178][SDK] Update the default values of the config options of Golang SDK (#9179) No new revisions were added by this update. Summary of changes: .../dataproxy-sdk-golang/dataproxy/options.go | 32 +++--- 1 file changed, 16 insertions(+), 16 deletions(-)
Re: [PR] [INLONG-7056][Manager] Support add more properties for submitting flink job [inlong]
github-actions[bot] commented on PR #8822: URL: https://github.com/apache/inlong/pull/8822#issuecomment-1788273589 This PR is stale because it has been open for 60 days with no activity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9170][SDK] Use pointers instead of objects [inlong]
dockerzhang merged PR #9171: URL: https://github.com/apache/inlong/pull/9171 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated (7dacceb0a0 -> b3295e5f7b)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 7dacceb0a0 [INLONG-9178][SDK] Update the default values of the config options of Golang SDK (#9179) add b3295e5f7b [INLONG-9170][SDK] use pointers instead of objects (#9171) No new revisions were added by this update. Summary of changes: .../dataproxy-sdk-golang/dataproxy/client.go | 2 +- .../dataproxy-sdk-golang/dataproxy/worker.go | 62 +++--- 2 files changed, 32 insertions(+), 32 deletions(-)
(inlong) branch master updated: [INLONG-9174][SDK] Optimize response attr parsing in Golang SDK (#9175)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new dae7141fa3 [INLONG-9174][SDK] Optimize response attr parsing in Golang SDK (#9175) dae7141fa3 is described below commit dae7141fa3831c8ccca1dde723b5e1542c194793 Author: gunli <24350...@qq.com> AuthorDate: Wed Nov 1 09:49:46 2023 +0800 [INLONG-9174][SDK] Optimize response attr parsing in Golang SDK (#9175) Co-authored-by: gunli --- .../dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go index 187e074415..03bca16a6e 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go @@ -302,9 +302,13 @@ func (b *batchRsp) decode(input []byte) { // fmt.Println(string(attr)) - attrList := strings.Split(string(attr), "&") + attrList := strings.FieldsFunc(string(attr), func(r rune) bool { + return r == '&' + }) for _, item := range attrList { - kv := strings.Split(item, "=") + kv := strings.FieldsFunc(item, func(r rune) bool { + return r == '=' + }) if len(kv) != 2 { continue }
Re: [PR] [INLONG-9174][SDK] Optimize response attr parsing in Golang SDK [inlong]
dockerzhang merged PR #9175: URL: https://github.com/apache/inlong/pull/9175 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9176][SDK] Fail fast when work is unavailable in Golang SDK [inlong]
dockerzhang merged PR #9177: URL: https://github.com/apache/inlong/pull/9177 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9174][SDK] Fail fast when work is unavailable in Golang SDK (#9177)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 2bab820bbe [INLONG-9174][SDK] Fail fast when work is unavailable in Golang SDK (#9177) 2bab820bbe is described below commit 2bab820bbe7d99d78624ade3890ca692f1ff1ca2 Author: gunli <24350...@qq.com> AuthorDate: Wed Nov 1 09:50:29 2023 +0800 [INLONG-9174][SDK] Fail fast when work is unavailable in Golang SDK (#9177) Co-authored-by: gunli --- .../dataproxy-sdk-golang/dataproxy/client.go | 35 +- .../dataproxy-sdk-golang/dataproxy/worker.go | 2 +- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go index 65f8160d37..fab00c2949 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go @@ -22,7 +22,6 @@ import ( "errors" "math" "sync" - "time" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/discoverer" @@ -215,16 +214,16 @@ func (c *client) Dial(addr string) (gnet.Conn, error) { } func (c *client) Send(ctx context.Context, msg Message) error { - worker := c.getWorker() - if worker == nil { + worker, err := c.getWorker() + if err != nil { return ErrNoAvailableWorker } return worker.send(ctx, msg) } func (c *client) SendAsync(ctx context.Context, msg Message, cb Callback) { - worker := c.getWorker() - if worker == nil { + worker, err := c.getWorker() + if err != nil { if cb != nil { cb(msg, ErrNoAvailableWorker) } @@ -234,23 +233,17 @@ func (c *client) SendAsync(ctx context.Context, msg Message, cb Callback) { worker.sendAsync(ctx, msg, cb) } -func (c *client) getWorker() *worker { - for i := 0; i < c.options.WorkerNum; i++ { - index := c.curWorkerIndex.Load() - w := c.workers[index%uint64(len(c.workers))] - c.curWorkerIndex.Add(1) - - if w.available() { - return w - } else if i == c.options.WorkerNum-1 { - c.metrics.incError(errAllWorkerBusy.strCode) - return w - } else { - time.Sleep(1 * time.Millisecond) - continue - } +func (c *client) getWorker() (*worker, error) { + index := c.curWorkerIndex.Load() + w := c.workers[index%uint64(len(c.workers))] + c.curWorkerIndex.Add(1) + + if w.available() { + return w, nil } - return nil + + c.metrics.incError(workerBusy.strCode) + return nil, workerBusy } func (c *client) Close() { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index bb37229d63..8a202ce937 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -62,7 +62,7 @@ var ( errBadLog = &errNo{code: 10010, strCode: "10010", message: "input log is invalid"} errServerError = &errNo{code: 10011, strCode: "10011", message: "server error"} errServerPanic = &errNo{code: 10012, strCode: "10012", message: "server panic"} - errAllWorkerBusy= &errNo{code: 10013, strCode: "10013", message: "all workers are busy"} + workerBusy = &errNo{code: 10013, strCode: "10013", message: "worker is busy"} errNoMatchReq4Rsp = &errNo{code: 10014, strCode: "10014", message: "no match unacknowledged request for response"} errConnClosedByPeer = &errNo{code: 10015, strCode: "10015", message: "conn closed by peer"} errUnknown = &errNo{code: 20001, strCode: "20001", message: "unknown"}
Re: [PR] [INLONG-9182][Agent] Delete useless code [inlong]
EMsnap merged PR #9183: URL: https://github.com/apache/inlong/pull/9183 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9167][SDK] Use UUID as the batch ID instead of snowflake ID for DataProxy Golang SDK [inlong]
dockerzhang merged PR #9169: URL: https://github.com/apache/inlong/pull/9169 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9167][SDK] Use UUID as the batch ID instead of snowflake ID for DataProxy Golang SDK (#9169)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new cea1909809 [INLONG-9167][SDK] Use UUID as the batch ID instead of snowflake ID for DataProxy Golang SDK (#9169) cea1909809 is described below commit cea1909809b600734c978cab627c978234d612eb Author: gunli <24350...@qq.com> AuthorDate: Wed Nov 1 09:51:41 2023 +0800 [INLONG-9167][SDK] Use UUID as the batch ID instead of snowflake ID for DataProxy Golang SDK (#9169) Co-authored-by: gunli --- .../dataproxy-sdk-golang/dataproxy/options.go | 17 +++- .../dataproxy-sdk-golang/dataproxy/worker.go | 14 +++- .../dataproxy-sdk-golang/go.mod| 25 +++--- .../dataproxy-sdk-golang/go.sum| 97 ++ .../dataproxy-sdk-golang/util/ip.go| 20 - 5 files changed, 118 insertions(+), 55 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go index 3a9fcbf0d8..d55ca53fad 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go @@ -21,10 +21,10 @@ import ( "strings" "time" - "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool" - "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util" + "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool" + "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger" "github.com/prometheus/client_golang/prometheus" ) @@ -35,9 +35,20 @@ const ( var ( // DefaultURL is the default Manager URL for discovering the DataProxy cluster DefaultURL = "http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList"; - localIP, _ = util.GetFirstPrivateIP() + localIP= "" ) +func init() { + var err error + localIP, err = util.GetFirstPrivateIP() + if err != nil { + localIP, err = util.GetFirstIP() + if err != nil { + localIP = "noIP" + } + } +} + // Options is the DataProxy go client configs type Options struct { GroupID string// InLong group ID diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index 8a202ce937..d6f15e3dd0 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -23,11 +23,11 @@ import ( "strconv" "time" + "github.com/gofrs/uuid" + "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/syncx" - "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util" - "github.com/panjf2000/gnet/v2" "go.uber.org/atomic" ) @@ -315,6 +315,14 @@ func (w *worker) sendAsync(ctx context.Context, msg Message, callback Callback) w.doSendAsync(ctx, msg, callback, false) } +func (w *worker) buildBatchID() string { + u, err := uuid.NewV4() + if err != nil { + return w.indexStr + ":" + strconv.FormatInt(time.Now().UnixNano(), 10) + } + return u.String() +} + func (w *worker) handleSendData(req *sendDataReq) { // w.log.Debug("worker[", w.index, "] handleSendData") // only the messages that with the same stream ID can be sent in a batch, we use the stream ID as the key @@ -329,7 +337,7 @@ func (w *worker) handleSendData(req *sendDataReq) { *batch = batchReq{ pool: batchPool, workerID: w.indexStr, - batchID:util.SnowFlakeID(), + batchID:w.buildBatchID(), groupID:w.options.GroupID, streamID: streamID, dataReqs: dataReqs, diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod index 685b7c14bb..3e72a27db7 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod @@ -20,33 +2
(inlong) branch master updated (2bab820bbe -> 54dc99a695)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 2bab820bbe [INLONG-9174][SDK] Fail fast when work is unavailable in Golang SDK (#9177) add 54dc99a695 [INLONG-9182][Agent] Delete useless code (#9183) No new revisions were added by this update. Summary of changes: .../apache/inlong/agent/pojo/DbCollectorTask.java | 45 --- .../agent/pojo/DbCollectorTaskReportDto.java | 27 - .../agent/pojo/DbCollectorTaskReportResponse.java | 27 - .../agent/pojo/DbCollectorTaskRequestDto.java | 27 - .../inlong/agent/pojo/DbCollectorTaskResult.java | 47 .../apache/inlong/agent/pojo/SqlJobProfileDto.java | 130 - .../agent/core/instance/TestInstanceManager.java | 10 +- 7 files changed, 5 insertions(+), 308 deletions(-) delete mode 100644 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DbCollectorTask.java delete mode 100644 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DbCollectorTaskReportDto.java delete mode 100644 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DbCollectorTaskReportResponse.java delete mode 100644 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DbCollectorTaskRequestDto.java delete mode 100644 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DbCollectorTaskResult.java delete mode 100644 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlJobProfileDto.java
Re: [I] [Improve][DataProxy] Update DataProxy Golang SDK docs [inlong-website]
EMsnap closed issue #886: [Improve][DataProxy] Update DataProxy Golang SDK docs URL: https://github.com/apache/inlong-website/issues/886 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-886][Doc] Update DataProxy Golang SDK docs [inlong-website]
EMsnap merged PR #887: URL: https://github.com/apache/inlong-website/pull/887 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong-website) branch master updated: [INLONG-886][Doc] Update DataProxy Golang SDK docs (#887)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong-website.git The following commit(s) were added to refs/heads/master by this push: new 521909ddf5 [INLONG-886][Doc] Update DataProxy Golang SDK docs (#887) 521909ddf5 is described below commit 521909ddf50ee136f6a9dc2b6d8003224cb5f08e Author: gunli <24350...@qq.com> AuthorDate: Wed Nov 1 09:53:34 2023 +0800 [INLONG-886][Doc] Update DataProxy Golang SDK docs (#887) --- docs/sdk/dataproxy-sdk/go.md | 48 +++--- .../current/sdk/dataproxy-sdk/go.md| 169 + 2 files changed, 193 insertions(+), 24 deletions(-) diff --git a/docs/sdk/dataproxy-sdk/go.md b/docs/sdk/dataproxy-sdk/go.md index f984109f90..bfeebc45c6 100755 --- a/docs/sdk/dataproxy-sdk/go.md +++ b/docs/sdk/dataproxy-sdk/go.md @@ -123,31 +123,31 @@ Some common errors: | Option | Default value| Desc | Optional | | --- | | | | -| WithGroupID() | "" | Sets the Group ID.| No | -| WithURL() | "" | Sets the Manager URL.| No | -| WithUpdateInterval()| 5m | Sets the update interval of service discoery. | Yes | -| WithConnTimeout() | 3000ms | Sets the connection timeout. | Yes | -| WriteBufferSize | 16M | Sets the write buffer size. | Yes | -| WithReadBufferSize | 16M | Sets the read buffer size.| Yes | -| WithSocketSendBufferSize| 16M | Sets the socket send buffer size. | Yes | -| WithSocketRecvBufferSize| 16M | Sets the socket receive buffer size. | Yes | -| WithBufferPool | nil | Sets the buffer pool to use. | Yes, if the application has one, use the same one is recommended. | -| WithBytePool| nil | Sets the byte pool to use. | Yes, if the application has one, use the same one is recommended.| -| WithBufferPoolSize | 409600 | Sets the buffer pool size. | Yes | -| WithBytePoolSize| 409600 | Sets the byte pool size. | Yes | -| WithBytePoolWidth | equals to BatchingMaxSize| Sets the byte pool width.| Yes | -| WithLogger | std.out | Sets the debug logger. | Yes, but the default one is not recommended, as it has no log levels control. | +| WithGroupID() | "" | Sets the Group ID. | No | +| WithURL() | "" | Sets the Manager URL.| No | +| WithUpdateInterval()| 5m | Sets the update interval of service discoery.| Yes | +| WithConnTimeout() | 3000ms | Sets the connection timeout. | Yes | +| WriteBufferSize | 8M
Re: [PR] [INLONG-9172][SDK] Delete useless debug logs in Golang SDK [inlong]
gunli commented on code in PR #9173: URL: https://github.com/apache/inlong/pull/9173#discussion_r1378330842 ## inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go: ## @@ -500,9 +500,9 @@ func (w *worker) handleRetry(batch *batchReq, retryOnFail bool) { } func (w *worker) handleBatchTimeout() { - for streamID, batch := range w.pendingBatches { + for _, batch := range w.pendingBatches { if time.Since(batch.batchTime) > w.options.BatchingMaxPublishDelay { - w.log.Debug("worker[", w.index, "] batch timeout, send it now:", batch.batchID, ", streamID:", streamID) + //w.log.Debug("worker[", w.index, "] batch timeout, send it now:", batch.batchID, ", streamID:", streamID) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-9184][SDK] Update README.md in Golang SDK [inlong]
gunli opened a new pull request, #9186: URL: https://github.com/apache/inlong/pull/9186 ### [INLONG-9184][SDK] Update README.md in Golang SDK - Fixes #9184 ### Motivation Update README.md in Golang SDK ### Modifications Update README.md in Golang SDK ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-9187][Agent] Delete useless memory manager [inlong]
justinwwhuang opened a new pull request, #9188: URL: https://github.com/apache/inlong/pull/9188 ### Prepare a Pull Request [INLONG-9187][Agent] Delete useless memory manager - Fixes #9187 ### Motivation Delete useless memory manager ### Modifications Delete useless memory manager ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9184][SDK] Update README.md in Golang SDK [inlong]
dockerzhang merged PR #9186: URL: https://github.com/apache/inlong/pull/9186 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9184][SDK] Update README.md in Golang SDK (#9186)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new c596091a43 [INLONG-9184][SDK] Update README.md in Golang SDK (#9186) c596091a43 is described below commit c596091a4333f81ff07828313e6112466ad422d8 Author: gunli <24350...@qq.com> AuthorDate: Wed Nov 1 11:09:34 2023 +0800 [INLONG-9184][SDK] Update README.md in Golang SDK (#9186) Co-authored-by: gunli --- .../dataproxy-sdk-twins/dataproxy-sdk-golang/README.md | 16 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md index 5ecb81379c..3273b2a464 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md @@ -139,10 +139,10 @@ type Options struct { URL string// the Manager URL for discovering the DataProxy cluster UpdateInterval time.Duration // interval to refresh the endpoint list, default: 5m ConnTimeout time.Duration // connection timeout: default: 3000ms - WriteBufferSize int // write buffer size in bytes, default: 16M - ReadBufferSize int // read buffer size in bytes, default: 16M - SocketSendBufferSizeint // socket send buffer size in bytes, default: 16M - SocketRecvBufferSizeint // socket receive buffer size in bytes, default: 16M + WriteBufferSize int // write buffer size in bytes, default: 8M + ReadBufferSize int // read buffer size in bytes, default: 1M + SocketSendBufferSizeint // socket send buffer size in bytes, default: 8M + SocketRecvBufferSizeint // socket receive buffer size in bytes, default: 1M BufferPool bufferpool.BufferPool // encoding/decoding buffer pool, if not given, SDK will init a new one BytePoolbufferpool.BytePool // encoding/decoding byte pool, if not given, SDK will init a new one BufferPoolSize int // buffer pool size, default: 409600 @@ -154,10 +154,10 @@ type Options struct { WorkerNum int // worker number, default: 8 SendTimeout time.Duration // send timeout, default: 3ms MaxRetries int // max retry count, default: 2 - BatchingMaxPublishDelay time.Duration // the time period within which the messages sent will be batched, default: 10ms - BatchingMaxMessages int // the maximum number of messages permitted in a batch, default: 10 - BatchingMaxSize int // the maximum number of bytes permitted in a batch, default: 4K - MaxPendingMessages int // the max size of the queue holding the messages pending to receive an acknowledgment from the broker, default: 409600 + BatchingMaxPublishDelay time.Duration // the time period within which the messages sent will be batched, default: 20ms + BatchingMaxMessages int // the maximum number of messages permitted in a batch, default: 50 + BatchingMaxSize int // the maximum number of bytes permitted in a batch, default: 40K + MaxPendingMessages int // the max size of the queue holding the messages pending to receive an acknowledgment from the broker, default: 204800 BlockIfQueueIsFull bool // whether Send and SendAsync block if producer's message queue is full, default: false AddColumns map[string]string // addition columns to add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all the message will be added 2 more columns with worldid=xxx and ip=yyy addColumnStrstring// the string format of the AddColumns, just a cache, used internal
[PR] [INLONG-9185][Dashboard] Cluster management supports SortStandAlone type [inlong]
bluewang opened a new pull request, #9189: URL: https://github.com/apache/inlong/pull/9189 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes https://github.com/apache/inlong/issues/9185 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications *Describe the modifications you've done.* ### Verifying this change  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9172][SDK] Delete useless debug logs in Golang SDK [inlong]
dockerzhang merged PR #9173: URL: https://github.com/apache/inlong/pull/9173 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9172][SDK] Delete useless debug logs in Golang SDK (#9173)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 5c7682c3ab [INLONG-9172][SDK] Delete useless debug logs in Golang SDK (#9173) 5c7682c3ab is described below commit 5c7682c3ab30524d6b7a58df4d1dc131809b6cb8 Author: gunli <24350...@qq.com> AuthorDate: Wed Nov 1 12:44:55 2023 +0800 [INLONG-9172][SDK] Delete useless debug logs in Golang SDK (#9173) Co-authored-by: gunli --- .../dataproxy-sdk-golang/dataproxy/worker.go| 17 + 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index d6f15e3dd0..2991364b82 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -324,7 +324,6 @@ func (w *worker) buildBatchID() string { } func (w *worker) handleSendData(req *sendDataReq) { - // w.log.Debug("worker[", w.index, "] handleSendData") // only the messages that with the same stream ID can be sent in a batch, we use the stream ID as the key batch, ok := w.pendingBatches[req.msg.StreamID] if !ok { @@ -348,7 +347,6 @@ func (w *worker) handleSendData(req *sendDataReq) { metrics:w.metrics, addColumns: w.options.addColumnStr, } - w.log.Debug("worker[", w.index, "] new a batch:", batch.batchID, ", streamID:", batch.streamID) w.pendingBatches[streamID] = batch } @@ -368,7 +366,6 @@ func (w *worker) handleSendData(req *sendDataReq) { } func (w *worker) sendBatch(b *batchReq, retryOnFail bool) { - // w.log.Debug("worker[", w.index, "] sendBatch") b.lastSendTime = time.Now() b.encode() @@ -412,7 +409,6 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) { } } - // w.log.Debug("worker[", w.index, "] write to:", conn.RemoteAddr()) // very important:'cause we use gnet, we must call AsyncWrite to send data in goroutines that are different from gnet.OnTraffic() callback conn := w.getConn() err := conn.AsyncWrite(b.buffer.Bytes(), func(c gnet.Conn, e error) error { @@ -448,7 +444,6 @@ func (w *worker) handleSendFailed(b *sendFailedBatchReq) { func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) { if batch.retries >= w.options.MaxRetries { batch.done(errSendTimeout) - w.log.Debug("to many reties, batch done:", batch.batchID) return } @@ -502,7 +497,6 @@ func (w *worker) handleRetry(batch *batchReq, retryOnFail bool) { batch.retries++ if batch.retries >= w.options.MaxRetries { batch.done(errSendTimeout) - w.log.Debug("to many reties, batch done:", batch.batchID) return } @@ -512,9 +506,8 @@ func (w *worker) handleRetry(batch *batchReq, retryOnFail bool) { } func (w *worker) handleBatchTimeout() { - for streamID, batch := range w.pendingBatches { + for _, batch := range w.pendingBatches { if time.Since(batch.batchTime) > w.options.BatchingMaxPublishDelay { - w.log.Debug("worker[", w.index, "] batch timeout, send it now:", batch.batchID, ", streamID:", streamID) w.sendBatch(batch, true) delete(w.pendingBatches, batch.streamID) } @@ -543,7 +536,6 @@ func (w *worker) handleCleanMap() { return } - w.log.Debug("clean map") // create a new map and copy the data from the old map newMap := make(map[string]*batchReq) for k, v := range w.unackedBatches { @@ -597,16 +589,10 @@ func (w *worker) handleRsp(rsp *batchRsp) { batchID := rsp.batchID batch, ok := w.unackedBatches[batchID] if !ok { - w.log.Debug("worker[", w.index, "] batch not found in unackedBatches map:", batchID, ", send time:", rsp.dt, ", now:", time.Now().UnixMilli()) w.metrics.incError(errNoMatchReq4Rsp.strCode) return } - /* - w.log.Debug("worker[", w.index, "] batch done:", batchID, ", batch time:", batch.batchTime.UnixMilli(), - ", batch last send time:", batch.lastSendTime.UnixMilli(), ", now:", time.Now().UnixMilli(), - "batch retry:", batch.retries) - */ // call batch.done to release the resources it holds var err = error(nil) if rsp.errCode != 0 { @@ -728,7 +714,6 @@ func (w *worker) handleUpdateConn() { } func (w *worker) updateConn
Re: [PR] [INLONG-9187][Agent] Delete useless memory manager [inlong]
EMsnap merged PR #9188: URL: https://github.com/apache/inlong/pull/9188 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9187][Agent] Delete useless memory manager (#9188)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new e136f2d245 [INLONG-9187][Agent] Delete useless memory manager (#9188) e136f2d245 is described below commit e136f2d245875be72a2c1a07ecddb2e712160f1d Author: justinwwhuang AuthorDate: Wed Nov 1 14:05:53 2023 +0800 [INLONG-9187][Agent] Delete useless memory manager (#9188) --- .../inlong/agent/core/task/MemoryManager.java | 115 --- .../inlong/agent/plugin/sinks/ProxySink.java | 216 - .../inlong/agent/plugin/sinks/SenderManager.java | 10 - 3 files changed, 341 deletions(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java deleted file mode 100644 index d67a15fb5a..00 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.core.task; - -import org.apache.inlong.agent.conf.AgentConfiguration; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Semaphore; - -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT; - -/** - * used to limit global memory to avoid oom - */ -public class MemoryManager { - -private static final Logger LOGGER = LoggerFactory.getLogger(MemoryManager.class); -private static volatile MemoryManager memoryManager = null; -private final AgentConfiguration conf; -private ConcurrentHashMap semaphoreMap = new ConcurrentHashMap<>(); - -private MemoryManager() { -this.conf = AgentConfiguration.getAgentConf(); -Semaphore semaphore = null; -semaphore = new Semaphore( -conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT)); -semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore); - -semaphore = new Semaphore( -conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT)); -semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore); - -semaphore = new Semaphore( -conf.getInt(AGENT_GLOBAL_CHANNEL_PERMIT, DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT)); -semaphoreMap.put(AGENT_GLOBAL_CHANNEL_PERMIT, semaphore); - -semaphore = new Semaphore( -conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, DEFAULT_AGENT_GLOBAL_WRITER_PERMIT)); -semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore); -} - -/** - * manager singleton - */ -public static MemoryManager getInstance() { -if (memoryManager == null) { -synchronized (MemoryManager.class) { -if (memoryManager == null) { -memoryManager = new MemoryManager(); -} -} -} -return memoryManager; -} - -public boolean tryAcquire(String semaphoreName, int permit) { -Semaphore semaphore = semaphoreMap.get(semaphoreName); -if (semaphore == null) { -LOGGER.error("tryAcquire {} not exist"); -return false; -
[PR] [INLONG-9190][Agent] Log file source clear buffer queue does not take effect [inlong]
justinwwhuang opened a new pull request, #9191: URL: https://github.com/apache/inlong/pull/9191 ### Prepare a Pull Request [INLONG-9190][Agent] Log file source clear buffer queue does not take effect - Fixes #9190 ### Motivation Log file source clear buffer queue does not take effect ### Modifications Fix the bug and add ut check ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org