This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e4ed5cf32 [INLONG-4194][Manager] Add update sort config API in manager
client (#4195)
e4ed5cf32 is described below
commit e4ed5cf3276238a586acbcdfe2f4c711709d7e4e
Author: kipshi <[email protected]>
AuthorDate: Fri May 13 16:05:21 2022 +0800
[INLONG-4194][Manager] Add update sort config API in manager client (#4195)
---
.../inlong/manager/client/api/InlongGroup.java | 14 ++++++-------
.../manager/client/api/impl/BlankInlongGroup.java | 5 +++--
.../manager/client/api/impl/InlongGroupImpl.java | 24 +++++++++++++++++-----
3 files changed, 28 insertions(+), 15 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
index c299e4867..e031b634e 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
@@ -54,22 +54,20 @@ public interface InlongGroup {
void update(InlongGroupConf conf) throws Exception;
/**
- * ReInit inlong group after update configuration for group.
- * Must be invoked when group is rejected,failed or started
+ * Update Inlong group on SortBaseConf
*
- * @return inlong group info
+ * @param sortBaseConf
+ * @throws Exception
*/
- InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception;
+ void update(SortBaseConf sortBaseConf) throws Exception;
/**
- * Init inlong group on updated conf.
+ * ReInit inlong group after update configuration for group.
* Must be invoked when group is rejected,failed or started
- * This method is deprecated, recommend to use reInitOnUpdate
*
* @return inlong group info
*/
- @Deprecated
- InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception;
+ InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception;
/**
* Suspend the stream group and return group info.
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
index 633ffd514..bfb470fc7 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
@@ -23,6 +23,7 @@ import
org.apache.inlong.manager.client.api.InlongGroupContext;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.InlongStreamConf;
+import org.apache.inlong.manager.client.api.SortBaseConf;
import java.util.List;
@@ -49,12 +50,12 @@ public class BlankInlongGroup implements InlongGroup {
}
@Override
- public InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws
Exception {
+ public void update(SortBaseConf sortBaseConf) throws Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
}
@Override
- public InlongGroupContext initOnUpdate(InlongGroupConf conf) throws
Exception {
+ public InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws
Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index cc9ae593d..c2d13e9bf 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -29,6 +29,7 @@ import
org.apache.inlong.manager.client.api.InlongGroupContext.InlongGroupStatus
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.InlongStreamConf;
+import org.apache.inlong.manager.client.api.SortBaseConf;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.util.AssertUtil;
@@ -134,20 +135,33 @@ public class InlongGroupImpl implements InlongGroup {
InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(conf);
InlongGroupRequest groupRequest = groupInfo.genRequest();
Pair<String, String> idAndErr =
managerClient.updateGroup(groupRequest);
- this.groupContext.setGroupInfo(groupInfo);
String errMsg = idAndErr.getValue();
AssertUtil.isNull(errMsg, errMsg);
+ this.groupContext.setGroupInfo(groupInfo);
}
@Override
- public InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws
Exception {
- return initOnUpdate(conf);
+ public void update(SortBaseConf sortBaseConf) throws Exception {
+ AssertUtil.notNull(sortBaseConf, "SortBaseConf should not be empty");
+ this.groupConf.setSortBaseConf(sortBaseConf);
+ final String groupName = this.groupConf.getGroupName();
+ final String groupId = "b_" + groupName;
+ InlongGroupResponse groupResponse =
managerClient.getGroupInfo(groupId);
+ InlongGroupStatus state =
InlongGroupStatus.parseStatusByCode(groupResponse.getStatus());
+ AssertUtil.isTrue(state != InlongGroupStatus.INITIALIZING,
+ "Inlong Group is in init state, should not be updated");
+ InlongGroupInfo groupInfo =
InlongGroupTransfer.createGroupInfo(this.groupConf);
+ InlongGroupRequest groupRequest = groupInfo.genRequest();
+ Pair<String, String> idAndErr =
managerClient.updateGroup(groupRequest);
+ String errMsg = idAndErr.getValue();
+ AssertUtil.isNull(errMsg, errMsg);
+ this.groupContext.setGroupInfo(groupInfo);
}
@Override
- public InlongGroupContext initOnUpdate(InlongGroupConf conf) throws
Exception {
+ public InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws
Exception {
update(conf);
- InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(conf);
+ InlongGroupInfo groupInfo = this.groupContext.getGroupInfo();
InlongGroupRequest groupRequest = groupInfo.genRequest();
Pair<Boolean, InlongGroupResponse> existMsg =
managerClient.isGroupExists(groupRequest);
if (existMsg.getKey()) {