[GitHub] [inlong] healchow commented on a diff in pull request #5815: [INLONG-5810][Manager] Supplement the workflow event API in the manager client
healchow commented on code in PR #5815: URL: https://github.com/apache/inlong/pull/5815#discussion_r966715420 ## inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ProcessEvent.java: ## @@ -15,9 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.manager.workflow.event.process; - -import org.apache.inlong.manager.workflow.event.WorkflowEvent; +package org.apache.inlong.manager.common.enums; Review Comment: Excuse me, but why change the package for those classes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] luchunliang opened a new pull request, #5843: [INLONG-5842][Manager] Support maintenance of message queue cluster
luchunliang opened a new pull request, #5843: URL: https://github.com/apache/inlong/pull/5843 ### Prepare a Pull Request - Title: [INLONG-5842][Manager] Support maintenance of message queue cluster - Fixes #5842 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications *Describe the modifications you've done.* ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao opened a new pull request, #5845: [INLONG-5844][Manager] Sub sources should be filtered when querying task status
woofyzhao opened a new pull request, #5845: URL: https://github.com/apache/inlong/pull/5845 - Fixes #5844 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #5845: [INLONG-5844][Manager] Sub sources should be filtered when querying task status
GanfengTan commented on code in PR #5845: URL: https://github.com/apache/inlong/pull/5845#discussion_r966840561 ## inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java: ## @@ -90,7 +91,10 @@ private List getGroupSources() { if (MapUtils.isNotEmpty(sources)) { for (Map.Entry entry : sources.entrySet()) { StreamSource source = entry.getValue(); -if (source != null) { +// when template id is null it is considered as normal source other than template source +// sub sources are filtered because they are already collected in template source's sub source list +System.out.println("get source = " + source); Review Comment: Please, delete the expression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] ciscozhou opened a new pull request, #5847: [INLONG-5846][Manager] Fix the SQL exception to check if the consumer exists
ciscozhou opened a new pull request, #5847: URL: https://github.com/apache/inlong/pull/5847 ### Prepare a Pull Request - Fixes #5846 ### Motivation SQL exception to check if consumer exists. ### Verifying this change I will add the unit tests in this PR: https://github.com/apache/inlong/pull/5777 ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] github-actions[bot] commented on issue #4969: [Feature][TubeMQ] Python SDK for Producing Message
github-actions[bot] commented on issue #4969: URL: https://github.com/apache/inlong/issues/4969#issuecomment-1242596834 This issue is stale because it has been open for 60 days with no activity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] github-actions[bot] commented on issue #4968: [Feature][TubeMQ] Golang SDK for Producing Message
github-actions[bot] commented on issue #4968: URL: https://github.com/apache/inlong/issues/4968#issuecomment-1242596850 This issue is stale because it has been open for 60 days with no activity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] github-actions[bot] commented on issue #4952: [Improve][Manager] Add physical delete api for groupId
github-actions[bot] commented on issue #4952: URL: https://github.com/apache/inlong/issues/4952#issuecomment-1242596875 This issue is stale because it has been open for 60 days with no activity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] github-actions[bot] commented on issue #1177: [INLONG-588][TubeMQ] C/C++ SDK supports production-side business logic
github-actions[bot] commented on issue #1177: URL: https://github.com/apache/inlong/issues/1177#issuecomment-1242596906 This issue is stale because it has been open for 60 days with no activity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] github-actions[bot] commented on issue #4984: [Feature][Dashboard] Batch Data Stream Management
github-actions[bot] commented on issue #4984: URL: https://github.com/apache/inlong/issues/4984#issuecomment-1242596823 This issue is stale because it has been open for 60 days with no activity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] github-actions[bot] commented on issue #2310: [Feature][Aagent] Support Mobile Source
github-actions[bot] commented on issue #2310: URL: https://github.com/apache/inlong/issues/2310#issuecomment-1242596893 This issue is stale because it has been open for 60 days with no activity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] github-actions[bot] commented on issue #4963: [Umbrella] Add an integration testing framework to cover end-to-end testing
github-actions[bot] commented on issue #4963: URL: https://github.com/apache/inlong/issues/4963#issuecomment-1242596862 This issue is stale because it has been open for 60 days with no activity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #5809: [INLONG-5808][SDK] Sort SDK supports parse InlongMsg
dockerzhang merged PR #5809: URL: https://github.com/apache/inlong/pull/5809 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5808][SDK] Sort SDK supports parse InlongMsg (#5809)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 4e4e8a36e [INLONG-5808][SDK] Sort SDK supports parse InlongMsg (#5809) 4e4e8a36e is described below commit 4e4e8a36ea24285a9e2e5e11e759e4dca0cf0d95 Author: vernedeng AuthorDate: Sat Sep 10 12:30:13 2022 +0800 [INLONG-5808][SDK] Sort SDK supports parse InlongMsg (#5809) --- .../sdk/sort/impl/decode/MessageDeserializer.java | 96 +++- .../apache/inlong/sdk/sort/util/StringUtil.java| 169 + .../sort/impl/decode/MessageDeserializerTest.java | 43 +- 3 files changed, 297 insertions(+), 11 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java index 145b7ad77..b0c54663e 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java @@ -23,8 +23,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import org.apache.inlong.common.msg.InLongMsg; import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry; import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj; import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs; @@ -32,12 +37,14 @@ import org.apache.inlong.sdk.sort.api.ClientContext; import org.apache.inlong.sdk.sort.api.Deserializer; import org.apache.inlong.sdk.sort.entity.InLongMessage; import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.util.StringUtil; import org.apache.inlong.sdk.sort.util.Utils; public class MessageDeserializer implements Deserializer { private static final int MESSAGE_VERSION_NONE = 0; private static final int MESSAGE_VERSION_PB = 1; +private static final int MESSAGE_VERSION_INLONG_MSG = 2; private static final int COMPRESS_TYPE_NONE = 0; private static final int COMPRESS_TYPE_GZIP = 1; private static final int COMPRESS_TYPE_SNAPPY = 2; @@ -48,15 +55,29 @@ public class MessageDeserializer implements Deserializer { private static final String INLONG_GROUPID_KEY = "inlongGroupId"; private static final String INLONG_STREAMID_KEY = "inlongStreamId"; +private static final String INLONGMSG_ATTR_STREAM_ID = "streamId"; +private static final String INLONGMSG_ATTR_GROUP_ID = "groupId"; +private static final String INLONGMSG_ATTR_TIME_T = "t"; +private static final String INLONGMSG_ATTR_TIME_DT = "dt"; +private static final String INLONGMSG_ATTR_NODE_IP = "NodeIP"; +private static final char INLONGMSG_ATTR_ENTRY_DELIMITER = '&'; +private static final char INLONGMSG_ATTR_KV_DELIMITER = '='; +private static final String DEFAULT_IP = "127.0.0.1"; + +private static final String PARSE_ATTR_ERROR_STRING = "Could not find %s in attributes!"; + public MessageDeserializer() { } @Override -public List deserialize(ClientContext context, InLongTopic inLongTopic, Map headers, +public List deserialize( +ClientContext context, +InLongTopic inLongTopic, +Map headers, byte[] data) throws Exception { //1. version -int version = Integer.parseInt(headers.getOrDefault(VERSION_KEY, "0")); +int version = Integer.parseInt(headers.getOrDefault(VERSION_KEY, Integer.toString(MESSAGE_VERSION_INLONG_MSG))); switch (version) { case MESSAGE_VERSION_NONE: { return decode(context, inLongTopic, data, headers); @@ -64,12 +85,18 @@ public class MessageDeserializer implements Deserializer { case MESSAGE_VERSION_PB: { return decodePB(context, inLongTopic, data, headers); } +case MESSAGE_VERSION_INLONG_MSG: { +return decodeInlongMsg(context, inLongTopic, data, headers); +} default: throw new IllegalArgumentException("Unknown version type:" + version); } } -private List decode(ClientContext context, InLongTopic inLongTopic, byte[] msgBytes, +private List decode( +ClientContext context, +InLongTopic inLongTopic, +byte[] msgBytes, Map headers) { long msgTime = Long.parseLong(headers.getOrDefault(MSG_TIME_KEY, "0")); String sourceIp = headers.getOrDefault(SOURCE_IP_KEY, ""); @@ -89,7 +116,10 @@ public class MessageDeserializer
[GitHub] [inlong] dockerzhang merged pull request #5845: [INLONG-5844][Manager] Sub sources should be filtered when querying task status
dockerzhang merged PR #5845: URL: https://github.com/apache/inlong/pull/5845 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5844][Manager] Sub sources should be filtered when querying task status (#5845)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 9c797ebe3 [INLONG-5844][Manager] Sub sources should be filtered when querying task status (#5845) 9c797ebe3 is described below commit 9c797ebe35b8e66b8541d4844f7d9c6612b74b95 Author: woofyzhao <490467...@qq.com> AuthorDate: Sat Sep 10 12:31:01 2022 +0800 [INLONG-5844][Manager] Sub sources should be filtered when querying task status (#5845) Co-authored-by: healchow --- .../java/org/apache/inlong/manager/client/api/InlongGroupContext.java | 4 +++- .../main/java/org/apache/inlong/manager/pojo/source/StreamSource.java | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java index 3ba3a0aec..52cca26a7 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java @@ -90,7 +90,9 @@ public class InlongGroupContext implements Serializable { if (MapUtils.isNotEmpty(sources)) { for (Map.Entry entry : sources.entrySet()) { StreamSource source = entry.getValue(); -if (source != null) { +// when template id is null it is considered as normal source other than template source +// sub sources are filtered because they are already collected in template source's sub source list +if (source != null && source.getTemplateId() == null) { groupSources.add(source); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java index 9c07d7969..a95dd6d5b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java @@ -104,6 +104,9 @@ public abstract class StreamSource extends StreamNode { @Builder.Default private Map properties = new LinkedHashMap<>(); +@ApiModelProperty("Null if not a sub source") +private Integer templateId; + @ApiModelProperty("Sub source information of existing agents") private List subSourceList;
[GitHub] [inlong] dockerzhang merged pull request #5847: [INLONG-5846][Manager] Fix the SQL exception to check if the consumer exists
dockerzhang merged PR #5847: URL: https://github.com/apache/inlong/pull/5847 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5846][Manager] Fix the SQL exception to check if the consumer exists (#5847)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new b0663fd57 [INLONG-5846][Manager] Fix the SQL exception to check if the consumer exists (#5847) b0663fd57 is described below commit b0663fd57d68832ef7ccfd55b6f751b966e1cf72 Author: ciscozhou <45899072+ciscoz...@users.noreply.github.com> AuthorDate: Sat Sep 10 12:31:33 2022 +0800 [INLONG-5846][Manager] Fix the SQL exception to check if the consumer exists (#5847) --- .../src/main/resources/mappers/InlongConsumeEntityMapper.xml| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml index 16e1d9b52..435d6ca3c 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml @@ -123,7 +123,7 @@ from inlong_consume where is_deleted = 0 -and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR} +and inlong_group_id = #{groupId, jdbcType=VARCHAR} and topic = #{topic, jdbcType=VARCHAR} and consumer_group = #{consumerGroup, jdbcType=VARCHAR} limit 1