[GitHub] [inlong] fuweng11 commented on a diff in pull request #8109: [INLONG-8108][Manager] WorkflowApprover API Permissions Optimization

2023-05-29 Thread via GitHub


fuweng11 commented on code in PR #8109:
URL: https://github.com/apache/inlong/pull/8109#discussion_r1208988031


##
inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/WorkflowApproverController.java:
##
@@ -54,19 +56,22 @@ public class WorkflowApproverController {
 @PostMapping("/workflow/approver/save")
 @OperationLog(operation = OperationType.CREATE)
 @ApiOperation(value = "Save approver info")
+@RequiresRoles(value = UserRoleCode.ADMIN)
 public Response save(@RequestBody ApproverRequest config) {
 return Response.success(workflowApproverService.save(config, 
LoginUserUtils.getLoginUser().getName()));
 }
 
 @GetMapping(value = "/workflow/approver/get/{id}")
 @ApiOperation(value = "Get approver by ID")
 @ApiImplicitParam(name = "id", value = "Workflow approver ID", 
dataTypeClass = Integer.class, required = true)
+@RequiresRoles(value = UserRoleCode.ADMIN)
 public Response get(@PathVariable Integer id) {
 return Response.success(workflowApproverService.get(id));
 }
 
 @GetMapping("/workflow/approver/list")
 @ApiOperation(value = "List workflow approvers")
+@RequiresRoles(value = UserRoleCode.ADMIN)

Review Comment:
   The admin check of the `/workflow/approver/list` interface cannot be added 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] hnrainll commented on a diff in pull request #8109: [INLONG-8108][Manager] WorkflowApprover API Permissions Optimization

2023-05-29 Thread via GitHub


hnrainll commented on code in PR #8109:
URL: https://github.com/apache/inlong/pull/8109#discussion_r1208990812


##
inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/WorkflowApproverController.java:
##
@@ -54,19 +56,22 @@ public class WorkflowApproverController {
 @PostMapping("/workflow/approver/save")
 @OperationLog(operation = OperationType.CREATE)
 @ApiOperation(value = "Save approver info")
+@RequiresRoles(value = UserRoleCode.ADMIN)
 public Response save(@RequestBody ApproverRequest config) {
 return Response.success(workflowApproverService.save(config, 
LoginUserUtils.getLoginUser().getName()));
 }
 
 @GetMapping(value = "/workflow/approver/get/{id}")
 @ApiOperation(value = "Get approver by ID")
 @ApiImplicitParam(name = "id", value = "Workflow approver ID", 
dataTypeClass = Integer.class, required = true)
+@RequiresRoles(value = UserRoleCode.ADMIN)
 public Response get(@PathVariable Integer id) {
 return Response.success(workflowApproverService.get(id));
 }
 
 @GetMapping("/workflow/approver/list")
 @ApiOperation(value = "List workflow approvers")
+@RequiresRoles(value = UserRoleCode.ADMIN)

Review Comment:
   fix it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] thexiay commented on a diff in pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


thexiay commented on code in PR #8095:
URL: https://github.com/apache/inlong/pull/8095#discussion_r1209008166


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java:
##
@@ -51,49 +53,81 @@ public class SchemaChangeUtils {
 static List diffSchema(Schema oldSchema, Schema newSchema) {
 List oldFields = 
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
 List newFields = 
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
-int oi = 0;
-int ni = 0;
+Set oldFieldSet = new HashSet<>(oldFields);
+Set newFieldSet = new HashSet<>(newFields);
+
+Set intersectColSet = Sets.intersection(oldFieldSet, 
newFieldSet);
+Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
 List tableChanges = new ArrayList<>();
-while (ni < newFields.size()) {
-if (oi < oldFields.size() && 
oldFields.get(oi).equals(newFields.get(ni))) {
-oi++;
-ni++;
-} else {
-NestedField newField = newSchema.findField(newFields.get(ni));
+
+// step0: Unknown change
+if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {
+tableChanges.add(new UnknownColumnChange(
+String.format(" old schema: [%s] and new schema: [%s], it 
is unknown column change",
+oldSchema.toString(), newSchema.toString(;
+return tableChanges;
+}
+
+// step1: judge whether update column(only column type change and doc 
change)
+for (String colName : intersectColSet) {

Review Comment:
   ok, if you don't handle reorder case.At least you should identified it and 
marked it as unsupported



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gunli opened a new pull request, #8111: [INLONG-4961][DataProxy] add dataproxy-sdk-golang

2023-05-29 Thread via GitHub


gunli opened a new pull request, #8111:
URL: https://github.com/apache/inlong/pull/8111

   ### [INLONG-4961][DataProxy] add dataproxy-sdk-golang
   
   - Fixes #4961 
   
   ### Motivation
   
   Add a golang version SDK of InLong Data Proxy
   
   ### Modifications
   
   - Add a discoverer to do service discovery;
   - Add a conn pool to keep the connections to the server;
   - Add a Client interface to handle request;
   - Add a worker to do the data send/receive work;
   - Add other util packages/functions to help the whole send/receive jobs;
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [x] This change added tests and can be verified as follows:
   
 - Prepare a groupID and some streamIDs in the server side and define the 
log schema of the streamIDs;
 - Write a test client to send messages to the server, example codes can be 
found in: dataproxy/example_test.go;
 - Verify how many message was sent and received.
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (not documented)
 - I will create a follow-up issue for adding the documentation later
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #8105: [INLONG-7853][Sort] Add common handle for schema-change in sink

2023-05-29 Thread via GitHub


healchow commented on code in PR #8105:
URL: https://github.com/apache/inlong/pull/8105#discussion_r1209032332


##
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java:
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+

Review Comment:
   @yunqingmoswu Please rebase the master branch, the import order was checked 
by spotless plugin.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #8107: [INLONG-8106][DataProxy] Optimize ConfigManager implementation ( part one )

2023-05-29 Thread via GitHub


healchow commented on code in PR #8107:
URL: https://github.com/apache/inlong/pull/8107#discussion_r1209033544


##
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java:
##
@@ -145,11 +143,9 @@ public void configure(Context context) {
 this.slaveChannels.add(channel);
 }
 }
-LOG.info("masters:" + this.masterChannels);
-LOG.info("orders:" + this.orderChannels);
-LOG.info("slaves:" + this.slaveChannels);
-LOG.info("transfers:" + this.transferChannels);
-LOG.info("agentFileMetrics:" + this.agentFileMetricChannels);
-LOG.info("slaMetrics:" + this.slaMetricChannels);
+LOG.info(

Review Comment:
   Good job!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #8107: [INLONG-8106][DataProxy] Optimize ConfigManager implementation ( part one )

2023-05-29 Thread via GitHub


healchow commented on code in PR #8107:
URL: https://github.com/apache/inlong/pull/8107#discussion_r1209037292


##
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BlackListConfigHolder.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+/**
+ * save black list configure to list
+ */
+public class BlackListConfigHolder extends VisitConfigHolder {

Review Comment:
   WhiteList and BlackList have specific discriminatory meanings, can they be 
modified to AllowList or BlockList?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang commented on a diff in pull request #8107: [INLONG-8106][DataProxy] Optimize ConfigManager implementation ( part one )

2023-05-29 Thread via GitHub


gosonzhang commented on code in PR #8107:
URL: https://github.com/apache/inlong/pull/8107#discussion_r1209040833


##
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BlackListConfigHolder.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+/**
+ * save black list configure to list
+ */
+public class BlackListConfigHolder extends VisitConfigHolder {

Review Comment:
   Blacklist and whitelist are commonly used conventions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] github-actions[bot] commented on issue #8112: [Feature][Data Proxy]Golang SDK documents

2023-05-29 Thread via GitHub


github-actions[bot] commented on issue #8112:
URL: https://github.com/apache/inlong/issues/8112#issuecomment-1566736591

   Hello @gunli, thank you for opening your first issue in InLong 🧡 We will 
respond as soon as possible ⏳
   If this is a bug report, please provide screenshots or error logs for us to 
reproduce your issue, so we can do our best to fix it.
   If you have any questions in the meantime, you can also ask us on the 
[InLong Discussions](https://github.com/apache/inlong/discussions) 🔍


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gunli commented on issue #8112: [Feature][Data Proxy]Golang SDK documents

2023-05-29 Thread via GitHub


gunli commented on issue #8112:
URL: https://github.com/apache/inlong/issues/8112#issuecomment-1566744704

   Should to create issue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] lordcheng10 commented on a diff in pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


lordcheng10 commented on code in PR #8095:
URL: https://github.com/apache/inlong/pull/8095#discussion_r1209051456


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java:
##
@@ -51,49 +53,81 @@ public class SchemaChangeUtils {
 static List diffSchema(Schema oldSchema, Schema newSchema) {
 List oldFields = 
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
 List newFields = 
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
-int oi = 0;
-int ni = 0;
+Set oldFieldSet = new HashSet<>(oldFields);
+Set newFieldSet = new HashSet<>(newFields);
+
+Set intersectColSet = Sets.intersection(oldFieldSet, 
newFieldSet);
+Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
 List tableChanges = new ArrayList<>();
-while (ni < newFields.size()) {
-if (oi < oldFields.size() && 
oldFields.get(oi).equals(newFields.get(ni))) {
-oi++;
-ni++;
-} else {
-NestedField newField = newSchema.findField(newFields.get(ni));
+
+// step0: Unknown change
+if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {

Review Comment:
   I got it, thanks for clarifying



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gunli commented on issue #8112: [Feature][Data Proxy]Golang SDK documents

2023-05-29 Thread via GitHub


gunli commented on issue #8112:
URL: https://github.com/apache/inlong/issues/8112#issuecomment-1566745768

   This issue should be create at 
[inlong-website](https://github.com/gunli/inlong-website)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong-website] gunli opened a new issue, #764: [Feature][DataProxy]Add Golang SDK documents

2023-05-29 Thread via GitHub


gunli opened a new issue, #764:
URL: https://github.com/apache/inlong-website/issues/764

   ### Description
   
   Add documents for DataProxy Golang SDK
   
   ### Use case
   
   Add documents for DataProxy Golang SDK to guide users how to use the SDK to 
produce messages to InLong DataProxy.
   
   ### Are you willing to submit PR?
   
   - [X] Yes, I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-8106][DataProxy] Optimize ConfigManager implementation ( part one ) (#8107)

2023-05-29 Thread gosonzhang
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 6e8d77a48 [INLONG-8106][DataProxy] Optimize ConfigManager 
implementation ( part one ) (#8107)
6e8d77a48 is described below

commit 6e8d77a487baaeb6754f65a98be3c3c8d274d1ee
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Mon May 29 16:36:12 2023 +0800

[INLONG-8106][DataProxy] Optimize ConfigManager implementation ( part one ) 
(#8107)
---
 .../dataproxy/channel/FailoverChannelSelector.java |  14 +-
 .../inlong/dataproxy/config/ConfigHolder.java  |   8 +-
 .../inlong/dataproxy/config/ConfigManager.java | 116 ++---
 .../inlong/dataproxy/config/PropertiesHolder.java  | 270 +
 .../config/holder/BlackListConfigHolder.java   |  38 +++
 .../config/holder/GroupIdNumConfigHolder.java  | 268 
 .../config/holder/GroupIdPropertiesHolder.java |  88 ---
 .../config/holder/MQClusterConfigHolder.java   |   2 +-
 .../config/holder/MxPropertiesHolder.java  |   2 +-
 .../config/holder/PropertiesConfigHolder.java  |   2 +-
 ...sitConfigHolder.java => VisitConfigHolder.java} |  12 +-
 .../config/holder/WeightConfigHolder.java  | 157 
 .../config/holder/WhiteListConfigHolder.java   |  40 +++
 .../dataproxy/heartbeat/HeartbeatManager.java  |  17 +-
 .../dataproxy/source/ServerMessageHandler.java |  17 +-
 .../dataproxy/source/SimpleMessageHandler.java |  17 +-
 .../inlong/dataproxy/source2/BaseSource.java   |  14 +-
 .../inlong/dataproxy/source2/SourceConstants.java  |   3 -
 .../dataproxy/source2/v0msg/CodecBinMsg.java   |  55 ++---
 19 files changed, 916 insertions(+), 224 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
index 5bdb1e8da..921d595ed 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
@@ -103,9 +103,7 @@ public class FailoverChannelSelector extends 
AbstractChannelSelector {
  */
 private List splitChannelName(String channelName) {
 List fileMetricList = new ArrayList();
-if (StringUtils.isEmpty(channelName)) {
-LOG.info("channel name is null!");
-} else {
+if (StringUtils.isNotBlank(channelName)) {
 fileMetricList = Arrays.asList(channelName.split("\\s+"));
 }
 return fileMetricList;
@@ -145,11 +143,9 @@ public class FailoverChannelSelector extends 
AbstractChannelSelector {
 this.slaveChannels.add(channel);
 }
 }
-LOG.info("masters:" + this.masterChannels);
-LOG.info("orders:" + this.orderChannels);
-LOG.info("slaves:" + this.slaveChannels);
-LOG.info("transfers:" + this.transferChannels);
-LOG.info("agentFileMetrics:" + this.agentFileMetricChannels);
-LOG.info("slaMetrics:" + this.slaMetricChannels);
+LOG.info(
+"Configure channels, masters={}, orders={}, slaves={}, 
transfers={}, agentFileMetrics={}, slaMetrics={}",
+this.masterChannels, this.orderChannels, this.slaveChannels,
+this.transferChannels, this.agentFileMetricChannels, 
this.slaMetricChannels);
 }
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
index 7748470f5..d6e533494 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
@@ -79,7 +79,7 @@ public abstract class ConfigHolder {
  *
  * @return - true if configure updated
  */
-public abstract boolean loadFromFileToHolder();
+protected abstract boolean loadFromFileToHolder();
 
 /**
  * check updater
@@ -92,7 +92,7 @@ public abstract class ConfigHolder {
 if (configFile != null) {
 this.lastModifyTime = configFile.lastModified();
 }
-LOG.info("File {} has changed, reload from local file agent", 
getFileName());
+LOG.info("File {} has changed, reload from local file", 
this.fileName);
 return loadFromFileToHolder();
 }
 return false;
@@ -125,8 +125,8 @@ public abstract class ConfigHolder {
 if (url != null) {
 this.filePath = ur

[GitHub] [inlong] EMsnap commented on a diff in pull request #8096: [INLONG-8092][Sort] Support all database and multiple tables transmission for Hive

2023-05-29 Thread via GitHub


EMsnap commented on code in PR #8096:
URL: https://github.com/apache/inlong/pull/8096#discussion_r1209078921


##
inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/PartitionPolicy.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.sink;
+
+public enum PartitionPolicy {
+
+PROC_TIME("partition table by flink proccessing time"),

Review Comment:
   processing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang commented on pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


dockerzhang commented on PR #8095:
URL: https://github.com/apache/inlong/pull/8095#issuecomment-1566773880

   @hejiay if all comments are confirmed, please click `resolve`, and I found 
there are some conflicts, please fix them, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #8096: [INLONG-8092][Sort] Support all database and multiple tables transmission for Hive

2023-05-29 Thread via GitHub


EMsnap commented on code in PR #8096:
URL: https://github.com/apache/inlong/pull/8096#discussion_r1209085490


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java:
##
@@ -172,23 +199,50 @@ private DataStreamSink consume(
 try (HiveMetastoreClientWrapper client =
 
HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
 
-Table table = client.getTable(dbName, identifier.getObjectName());
-StorageDescriptor sd = table.getSd();
-
-Class hiveOutputFormatClz =
-
hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
+StorageDescriptor sd;
+Properties tableProps = new Properties();
+Class hiveOutputFormatClz;
+HiveWriterFactory writerFactory;
+boolean sinkMultipleEnable = Boolean.parseBoolean(
+
catalogTable.getOptions().getOrDefault(SINK_MULTIPLE_ENABLE.key(), "false"));
 boolean isCompressed =
 
jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
-HiveWriterFactory writerFactory =
-new HiveWriterFactory(
-jobConf,
-hiveOutputFormatClz,
-sd.getSerdeInfo(),
-tableSchema,
-getPartitionKeyArray(),
-HiveReflectionUtils.getTableMetadata(hiveShim, 
table),
-hiveShim,
-isCompressed);
+if (sinkMultipleEnable) {
+sd = new StorageDescriptor();
+SerDeInfo serDeInfo = new SerDeInfo();
+
serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
+sd.setSerdeInfo(serDeInfo);
+String defaultFs = jobConf.get("fs.defaultFS", "");
+sd.setLocation(defaultFs + "/tmp");
+hiveOutputFormatClz = 
hiveShim.getHiveOutputFormatClass(HiveIgnoreKeyTextOutputFormat.class);
+
+writerFactory = new HiveWriterFactory(

Review Comment:
   here new HiveWriterFactory( ) can be extracted out of the if statement, 
which applies also in else statement 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] hejiay commented on a diff in pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


hejiay commented on code in PR #8095:
URL: https://github.com/apache/inlong/pull/8095#discussion_r1209087824


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java:
##
@@ -51,49 +53,81 @@ public class SchemaChangeUtils {
 static List diffSchema(Schema oldSchema, Schema newSchema) {
 List oldFields = 
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
 List newFields = 
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
-int oi = 0;
-int ni = 0;
+Set oldFieldSet = new HashSet<>(oldFields);
+Set newFieldSet = new HashSet<>(newFields);
+
+Set intersectColSet = Sets.intersection(oldFieldSet, 
newFieldSet);
+Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
 List tableChanges = new ArrayList<>();
-while (ni < newFields.size()) {
-if (oi < oldFields.size() && 
oldFields.get(oi).equals(newFields.get(ni))) {
-oi++;
-ni++;
-} else {
-NestedField newField = newSchema.findField(newFields.get(ni));
+
+// step0: Unknown change
+if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {
+tableChanges.add(new UnknownColumnChange(
+String.format(" old schema: [%s] and new schema: [%s], it 
is unknown column change",
+oldSchema.toString(), newSchema.toString(;
+return tableChanges;
+}
+
+// step1: judge whether update column(only column type change and doc 
change)
+for (String colName : intersectColSet) {

Review Comment:
   thanks,i got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #8096: [INLONG-8092][Sort] Support all database and multiple tables transmission for Hive

2023-05-29 Thread via GitHub


EMsnap commented on code in PR #8096:
URL: https://github.com/apache/inlong/pull/8096#discussion_r1209088899


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOutputFormatFactory.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.table.filesystem.OutputFormatFactory;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.function.Function;
+
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+
+/** Hive {@link OutputFormatFactory}, use {@link RecordWriter} to write 
record. */
+public class HiveOutputFormatFactory implements OutputFormatFactory {
+
+private static final long serialVersionUID = 2L;
+
+private final HashMap factoryMap = new 
HashMap<>(16);
+
+private final HiveWriterFactory factory;
+
+private final boolean sinkMultipleEnable;

Review Comment:
   parameter not used 



##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopRenameFileCommitter.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.inlong.sort.hive.util.CacheHolder;
+
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The Hadoop file committer that directly rename the in-progress file to the 
target file. For
+ * FileSystem like S3, renaming may lead to additional copies.
+ */
+public class HadoopRenameFileCommitter implements HadoopFileCommitter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(HadoopRenameFileCommitter.class);
+
+private final Configuration configuration;
+
+private Path targetFilePath;
+
+private Path tempFilePath;
+
+private boolean sinkMultipleEnable;
+
+public HadoopRenameFileCommitter(Configuration configuration,
+Path targetFilePath,
+boolean sinkMultipleEnable)
+throws IOException {
+this.configuration = configuration;
+this.targetFilePath = targetFilePath;
+this.tempFilePath = generateTempFilePath();
+this.sinkMultipleEnable = sinkMultipleEnable;
+}
+
+public HadoopRenameFileCommitter(Configuration configuration,
+Path targetFilePath,
+Path inProgressPath,
+boolean sinkMultipleEnable) {
+this.configuration = configuration;
+this.targetFilePath = targetFilePath;
+this.tempFilePath = inProgressPath;
+this.sinkMultipleEnable = sinkMultipleEnable;
+}
+
+@Override
+public Path getTargetFilePath() {
+return targetFilePath;
+}
+
+@Override
+public Path getTe

[GitHub] [inlong] chestnut-c opened a new pull request, #8115: [INLONG-8114][Manager] Fix the NEP that may appear when building Properties.

2023-05-29 Thread via GitHub


chestnut-c opened a new pull request, #8115:
URL: https://github.com/apache/inlong/pull/8115

   - Fixes #8114
   
   ### Motivation
   
   Fix the NEP that may appear when building Properties , Filter empty values 
​​to prevent NEP, because empty values ​​are meaningless to pass downstream.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #8105: [INLONG-7853][Sort] Add common handle for schema-change in sink

2023-05-29 Thread via GitHub


EMsnap commented on code in PR #8105:
URL: https://github.com/apache/inlong/pull/8105#discussion_r1209100513


##
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/SchemaChangeType.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.enums;
+
+/**
+ * The class defines the type of schema change.
+ */
+public enum SchemaChangeType {
+
+/**
+ * Create table
+ */
+CREATE_TABLE(1),
+/**
+ * Drop table
+ */
+DROP_TABLE(2),
+/**
+ * Rename table
+ */
+RENAME_TABLE(3),
+/**
+ * Truncate table
+ */
+TRUNCATE_TABLE(4),
+/**
+ * Add column
+ */
+ADD_COLUMN(5),
+/**
+ * Drop column
+ */
+DROP_COLUMN(6),
+/**
+ * Rename column
+ */
+RENAME_COLUMN(7),
+/**
+ * Change column type
+ */
+CHANGE_COLUMN_TYPE(8),
+/**
+ * Alter table, it is a unified description of modified table, which may 
contain multiple operations, it is not
+ * exposed to the outside world in most scenarios.
+ */
+ALTER(-1);
+
+/**
+ * The code represents this schama change type

Review Comment:
   typo here schema 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong opened a new pull request, #8117: [INLONG-8116][Sort] Support table api config setting

2023-05-29 Thread via GitHub


gong opened a new pull request, #8117:
URL: https://github.com/apache/inlong/pull/8117

   ### Prepare a Pull Request
   
   - [INLONG-8116][Sort] Support table api config setting
   
   - Fixes #8116 
   
   ### Motivation
   
   Support table api config setting
   ref to 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/
   
   ### Modifications
   
   Add two table config setting in sort core
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #8115: [INLONG-8114][Manager] Fix the NEP that may appear when building Properties.

2023-05-29 Thread via GitHub


gong commented on code in PR #8115:
URL: https://github.com/apache/inlong/pull/8115#discussion_r1209111662


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.pojo.sort.util;
 
+import java.util.Objects;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;

Review Comment:
   you can rebase master and exec `mvn spotless:apply` for code style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #8115: [INLONG-8114][Manager] Fix the NEP that may appear when building Properties.

2023-05-29 Thread via GitHub


gong commented on code in PR #8115:
URL: https://github.com/apache/inlong/pull/8115#discussion_r1209111662


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.pojo.sort.util;
 
+import java.util.Objects;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;

Review Comment:
   You can rebase master and exec `mvn spotless:apply` for code style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] hejiay closed pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


hejiay closed pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution 
support delete and update columns when sink to Iceberg
URL: https://github.com/apache/inlong/pull/8095


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] fuweng11 commented on a diff in pull request #8115: [INLONG-8114][Manager] Fix the NEP that may appear when building Properties.

2023-05-29 Thread via GitHub


fuweng11 commented on code in PR #8115:
URL: https://github.com/apache/inlong/pull/8115#discussion_r1209117077


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##
@@ -117,6 +118,7 @@ public static LoadNode createLoadNode(StreamSink 
streamSink, Map fieldRelations = 
parseSinkFields(streamSink.getSinkFieldList(), constantFieldMap);
 Map properties = 
streamSink.getProperties().entrySet().stream()
+.filter(v -> Objects.nonNull(v.getValue()))

Review Comment:
   Suggest use `StringUtils.isNotBank()`.



##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java:
##
@@ -563,6 +564,7 @@ private static List 
parseFieldInfos(List streamFields, S
  */
 private static Map parseProperties(Map 
properties) {
 return properties.entrySet().stream()
+.filter(v -> Objects.nonNull(v.getValue()))

Review Comment:
   Suggest use `StringUtils.isNotBank()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] chestnut-c commented on a diff in pull request #8115: [INLONG-8114][Manager] Fix the NEP that may appear when building Properties.

2023-05-29 Thread via GitHub


chestnut-c commented on code in PR #8115:
URL: https://github.com/apache/inlong/pull/8115#discussion_r1209118467


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.pojo.sort.util;
 
+import java.util.Objects;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;

Review Comment:
   > You can rebase master and exec `mvn spotless:apply` for code style
   
   ok  thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] chestnut-c commented on a diff in pull request #8115: [INLONG-8114][Manager] Fix the NEP that may appear when building Properties.

2023-05-29 Thread via GitHub


chestnut-c commented on code in PR #8115:
URL: https://github.com/apache/inlong/pull/8115#discussion_r1209120934


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java:
##
@@ -563,6 +564,7 @@ private static List 
parseFieldInfos(List streamFields, S
  */
 private static Map parseProperties(Map 
properties) {
 return properties.entrySet().stream()
+.filter(v -> Objects.nonNull(v.getValue()))

Review Comment:
   > Suggest use `StringUtils.isNotBank()`.
   
   Value is of type Object,so i use Objects.nonNull(Object obj)



##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##
@@ -117,6 +118,7 @@ public static LoadNode createLoadNode(StreamSink 
streamSink, Map fieldRelations = 
parseSinkFields(streamSink.getSinkFieldList(), constantFieldMap);
 Map properties = 
streamSink.getProperties().entrySet().stream()
+.filter(v -> Objects.nonNull(v.getValue()))

Review Comment:
   > Suggest use `StringUtils.isNotBank()`.
   
   Value is of type Object,so i use Objects.nonNull(Object obj)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] vernedeng opened a new pull request, #8119: [INLONG-8118][Manager] Support tenant user permission control #8118

2023-05-29 Thread via GitHub


vernedeng opened a new pull request, #8119:
URL: https://github.com/apache/inlong/pull/8119

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Fixes #8118 
   - Parent #7914 
   
   ### Motivation
   
Support tenant user permission control
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #8102: [INLONG-8101][Sort] Support multi-version packaging of sort-connectors

2023-05-29 Thread via GitHub


gong commented on code in PR #8102:
URL: https://github.com/apache/inlong/pull/8102#discussion_r1209135921


##
inlong-sort/sort-flink/pom.xml:
##
@@ -30,6 +30,13 @@
 pom
 Apache InLong - Sort Flink
 
+
+cdc-base
+base
+sort-flink-v1.13
+sort-flink-v1.15
+

Review Comment:
Maybe you can add a default profile for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] hejiay opened a new pull request, #8120: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


hejiay opened a new pull request, #8120:
URL: https://github.com/apache/inlong/pull/8120

   
   I accidentally deleted the original PR 
https://github.com/apache/inlong/pull/8095 by mistake, so I resubmitted it.For 
details, see this PR https://github.com/apache/inlong/pull/8095
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] hejiay commented on pull request #8120: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


hejiay commented on PR #8120:
URL: https://github.com/apache/inlong/pull/8120#issuecomment-1566883701

   @dockerzhang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] hejiay commented on pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


hejiay commented on PR #8095:
URL: https://github.com/apache/inlong/pull/8095#issuecomment-1566892860

   I accidentally deleted the original PR by mistake, so I resubmitted it. The 
new PR https://github.com/apache/inlong/pull/8120


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] vernedeng commented on a diff in pull request #8100: [INLONG-8098][Manager] Support Inlong user permission control

2023-05-29 Thread via GitHub


vernedeng commented on code in PR #8100:
URL: https://github.com/apache/inlong/pull/8100#discussion_r1209205206


##
inlong-manager/manager-web/sql/changes-1.8.0.sql:
##
@@ -44,4 +44,42 @@ CREATE TABLE IF NOT EXISTS `inlong_tenant`
   DEFAULT CHARSET = utf8 COMMENT ='Inlong tenant table';
 
 INSERT INTO `inlong_tenant`(`name`, `description`, `creator`, `modifier`)
-VALUES ('public', 'Default tenant', 'admin', 'admin');
\ No newline at end of file
+VALUES ('public', 'Default tenant', 'inlong_init', 'inlong_init');
+
+-- To support distinguish inlong user permission and tenant permission 
control, please see https://github.com/apache/inlong/issues/8098
+CREATE TABLE IF NOT EXISTS `inlong_user_role`
+(
+`id`  int(11)  NOT NULL AUTO_INCREMENT,
+`user_name`   varchar(256) NOT NULL COMMENT 'Username',
+`role_code`   varchar(256) NOT NULL COMMENT 'User role code',
+`disabled`tinyint(1)   NOT NULL DEFAULT '0' COMMENT 'Whether to 
disabled, 0: enabled, 1: disabled',
+`is_deleted`  int(11)   DEFAULT '0' COMMENT 'Whether to 
delete, 0 is not deleted, if greater than 0, delete',
+`creator` varchar(256) NOT NULL COMMENT 'Creator name',
+`modifier`varchar(256)  DEFAULT NULL COMMENT 'Modifier name',
+`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 
'Create time',
+`modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'Modify time',
+`version` int(11)  NOT NULL DEFAULT '1' COMMENT 'Version number, 
which will be incremented by 1 after modification',
+PRIMARY KEY (`id`),
+UNIQUE KEY `unique_tenant_user_role` (`user_name`, `role_code`, 
`is_deleted`)
+) ENGINE = InnoDB
+DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong User Role Table';
+
+INSERT INTO `inlong_user_role` (`user_name`, `role_code`, `creator`)
+VALUES ('admin', 'INLONG_ADMIN', 'inlong_init');
+
+RENAME TABLE user_role TO tenant_user_role;
+ALTER TABLE tenant_user_role
+ADD tenant VARCHAR(256) DEFAULT 'public' NOT NULL comment 'User tenant';

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #8105: [INLONG-7853][Sort] Add common handle for schema-change in sink

2023-05-29 Thread via GitHub


yunqingmoswu commented on code in PR #8105:
URL: https://github.com/apache/inlong/pull/8105#discussion_r1209205400


##
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java:
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.StringJoiner;
+
+/**
+ * Schema-change Utils
+ */
+public final class SchemaChangeUtils {
+
+private final static String DELIMITER = "&";
+private final static String KEY_VALUE_DELIMITER = "=";
+
+private SchemaChangeUtils() {
+}
+
+/**
+ * deserialize the policies to a Map[{@link SchemaChangeType}, {@link 
SchemaChangePolicy}]
+ *
+ * @param policies The policies format by 'key1=value1&key2=value2...'
+ * @return A policy Map[{@link SchemaChangeType}, {@link 
SchemaChangePolicy}]
+ */
+public static Map deserialize(String 
policies) {
+Preconditions.checkNotNull(policies, "policies is null");
+Map policyMap = new HashMap<>();
+for (String kv : policies.split(DELIMITER)) {
+int index = kv.indexOf(KEY_VALUE_DELIMITER);
+if (index < 1 || index == kv.length() - 1) {
+throw new IllegalArgumentException(
+"The format of policies must be like 
'key1=value1&key2=value2...'");
+}
+String typeCode = kv.substring(0, index);
+String policyCode = kv.substring(index + 1);
+SchemaChangeType type;
+SchemaChangePolicy policy;
+try {
+type = 
SchemaChangeType.getInstance(Integer.parseInt(typeCode));
+} catch (NumberFormatException e) {
+throw new IllegalArgumentException(
+String.format("Unsupported type of schema-change: %s 
for InLong", typeCode));
+}
+try {
+policy = 
SchemaChangePolicy.getInstance(Integer.parseInt(policyCode));
+} catch (NumberFormatException e) {
+throw new IllegalArgumentException(
+String.format("Unsupported policy of schema-change: %s 
for InLong", policyCode));
+}
+policyMap.put(type, policy);
+}
+return policyMap;
+}
+
+/**
+ * Serialize the policy Map[{@link SchemaChangeType}, {@link 
SchemaChangePolicy}] to a string
+ *
+ * @param policyMap The policy Map[{@link SchemaChangeType}, {@link 
SchemaChangePolicy}]
+ * @return A string format by 'key1=value1&key2=value2...'
+ */
+public static String serialize(Map 
policyMap) {
+Preconditions.checkNotNull(policyMap, "policyMap is null");
+StringJoiner joiner = new StringJoiner("&");
+for (Entry kv : 
policyMap.entrySet()) {
+joiner.add(kv.getKey().getCode() + "=" + kv.getValue().getCode());
+}
+return joiner.toString();
+}
+
+/**
+ * Extract the schema change types from {@link Operation}
+ *
+ * @param operation The operation
+ * @return Set of {@link SchemaChangeType}
+ */
+public static Set extractSchemaChangeTypes(Operation 
operation) {
+Set types = new HashSet<>();
+switch (operation.getOperationType()) {
+case ALTER:
+AlterOperation alterOperation = (AlterOperation) operation;
+Preconditions.checkState(alterOperation.getAlterColumns() != 
null
+&& !alterOperation.getAlterColumns().isEmpty(), "alter 
columns is empty");
+for (AlterColumn alterColumn : 
alterOperation.getAl

[GitHub] [inlong] yunqingmoswu closed pull request #8105: [INLONG-7853][Sort] Add common handle for schema-change in sink

2023-05-29 Thread via GitHub


yunqingmoswu closed pull request #8105: [INLONG-7853][Sort] Add common handle 
for schema-change in sink
URL: https://github.com/apache/inlong/pull/8105


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #8115: [INLONG-8114][Manager] Fix the NEP that may appear when building Properties.

2023-05-29 Thread via GitHub


dockerzhang merged PR #8115:
URL: https://github.com/apache/inlong/pull/8115


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-8114][Manager] Fix the NEP that may appear when building Properties. (#8115)

2023-05-29 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 049eaf36b [INLONG-8114][Manager] Fix the NEP that may appear when 
building Properties. (#8115)
049eaf36b is described below

commit 049eaf36b3492d13e7cc33cc67392cc45ec6fa5a
Author: chestnufang <65438734+chestnu...@users.noreply.github.com>
AuthorDate: Mon May 29 19:31:37 2023 +0800

[INLONG-8114][Manager] Fix the NEP that may appear when building 
Properties. (#8115)

Co-authored-by: chestnufang 
---
 .../org/apache/inlong/manager/client/api/impl/InlongClientImpl.java| 3 ++-
 .../org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java| 1 +
 .../org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java | 2 ++
 .../java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java   | 2 ++
 4 files changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index ab2e837d8..d9f752d71 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -149,7 +149,8 @@ public class InlongClientImpl implements InlongClient {
 .inlongGroupId(briefInfo.getInlongGroupId())
 .originalStatus(briefInfo.getStatus())
 .simpleGroupStatus(groupStatus)
-.streamSources(sources).build();
+.streamSources(sources)
+.build();
 groupStatusMap.put(groupId, statusInfo);
 });
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
index 8afcf9f7f..3ca72d963 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
@@ -57,6 +57,7 @@ public class InlongGroupStatusInfo {
 @ApiModelProperty(value = "Sort job status of the group")
 private SortStatus sortStatus = SortStatus.UNKNOWN;
 
+@Builder.Default
 @ApiModelProperty("Extended properties of the group")
 private Map properties = Maps.newHashMap();
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 26e2d44b6..5342e898c 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -69,6 +69,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
@@ -563,6 +564,7 @@ public class ExtractNodeUtils {
  */
 private static Map parseProperties(Map 
properties) {
 return properties.entrySet().stream()
+.filter(v -> Objects.nonNull(v.getValue()))
 .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
 }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 26ec9ad4c..9209d19b5 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -89,6 +89,7 @@ import org.apache.commons.lang3.StringUtils;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
@@ -117,6 +118,7 @@ public class LoadNodeUtils {
 .collect(Collectors.toList());
 List fieldRelations = 
parseSinkFields(streamSink.getSinkFieldList(), constantFieldMap);
 Map properties = 
streamSink.getProperties().entrySet().stream()
+.filter(v -> Objects.nonNull(v.getValue()))
 .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
 String sinkType = streamSink.getSinkType();
   

[GitHub] [inlong] EMsnap commented on a diff in pull request #8102: [INLONG-8101][Sort] Support multi-version packaging of sort-connectors

2023-05-29 Thread via GitHub


EMsnap commented on code in PR #8102:
URL: https://github.com/apache/inlong/pull/8102#discussion_r1209231337


##
inlong-sort/sort-flink/pom.xml:
##
@@ -30,6 +30,13 @@
 pom
 Apache InLong - Sort Flink
 
+
+cdc-base
+base
+sort-flink-v1.13
+sort-flink-v1.15
+

Review Comment:
   done thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] fuweng11 commented on a diff in pull request #8109: [INLONG-8108][Manager] WorkflowApprover API Permissions Optimization

2023-05-29 Thread via GitHub


fuweng11 commented on code in PR #8109:
URL: https://github.com/apache/inlong/pull/8109#discussion_r1209349797


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java:
##
@@ -89,11 +92,18 @@ public Integer save(ApproverRequest request, String 
operator) {
 @Override
 public ApproverResponse get(Integer id) {
 Preconditions.expectNotNull(id, "approver id cannot be null");
+
 WorkflowApproverEntity approverEntity = approverMapper.selectById(id);
 if (approverEntity == null) {
 LOGGER.error("workflow approver not found by id={}", id);
 throw new 
BusinessException(ErrorCodeEnum.WORKFLOW_APPROVER_NOT_FOUND);
 }
+
+UserInfo userInfo = LoginUserUtils.getLoginUser();
+boolean isAdmin = 
userInfo.getRoles().contains(UserTypeEnum.ADMIN.name());
+Preconditions.expectTrue(isAdmin || 
approverEntity.getApprovers().contains(userInfo.getName()),

Review Comment:
   Suggest use `userService.checkUser(approverEntity.getApprovers(), 
userInfo.getName(), "Current user does not have permission to get this workflow 
approver info")`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] hnrainll commented on a diff in pull request #8109: [INLONG-8108][Manager] WorkflowApprover API Permissions Optimization

2023-05-29 Thread via GitHub


hnrainll commented on code in PR #8109:
URL: https://github.com/apache/inlong/pull/8109#discussion_r1209379107


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java:
##
@@ -89,11 +92,18 @@ public Integer save(ApproverRequest request, String 
operator) {
 @Override
 public ApproverResponse get(Integer id) {
 Preconditions.expectNotNull(id, "approver id cannot be null");
+
 WorkflowApproverEntity approverEntity = approverMapper.selectById(id);
 if (approverEntity == null) {
 LOGGER.error("workflow approver not found by id={}", id);
 throw new 
BusinessException(ErrorCodeEnum.WORKFLOW_APPROVER_NOT_FOUND);
 }
+
+UserInfo userInfo = LoginUserUtils.getLoginUser();
+boolean isAdmin = 
userInfo.getRoles().contains(UserTypeEnum.ADMIN.name());
+Preconditions.expectTrue(isAdmin || 
approverEntity.getApprovers().contains(userInfo.getName()),

Review Comment:
   fix it
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] github-actions[bot] commented on issue #7737: [Feature][Tool] Auto-add the JDBC jar to the Manager-web module with inlong-dev-toolkit.sh

2023-05-29 Thread via GitHub


github-actions[bot] commented on issue #7737:
URL: https://github.com/apache/inlong/issues/7737#issuecomment-1567664051

   This issue is stale because it has been open for 60 days with no activity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang commented on a diff in pull request #8111: [INLONG-4961][DataProxy] add dataproxy-sdk-golang

2023-05-29 Thread via GitHub


dockerzhang commented on code in PR #8111:
URL: https://github.com/apache/inlong/pull/8111#discussion_r1209640138


##
inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md:
##
@@ -0,0 +1,197 @@
+# dataproxy-sdk-golang

Review Comment:
   This line is needless.



##
inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md:
##
@@ -0,0 +1,197 @@
+# dataproxy-sdk-golang
+
+## Overview
+
+dataproxy-sdk-golang is the golang version of InLong data proxy client SDK.
+

Review Comment:
   Extra blank line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang commented on a diff in pull request #8111: [INLONG-4961][DataProxy] add dataproxy-sdk-golang

2023-05-29 Thread via GitHub


dockerzhang commented on code in PR #8111:
URL: https://github.com/apache/inlong/pull/8111#discussion_r1209641026


##
inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md:
##
@@ -0,0 +1,197 @@
+# dataproxy-sdk-golang
+
+## Overview
+
+dataproxy-sdk-golang is the golang version of InLong data proxy client SDK.
+
+
+
+## Features
+
+- Service discovery;
+- Connection pool, buffer pool, byte pool;
+- Backoff retry;
+- Concurrently batch send;
+- Send synchronously;
+- Send asynchronously;
+- Close gracefully;
+- Hookable debug log;
+- Heartbeat;
+- Metrics;
+- Snappy compress;
+- Additional column;
+- Server offline re-balance;
+
+
+## Usage
+
+### example
+
+refer: cli/main.go
+
+``` go
+package main
+
+import (
+   "context"
+   "errors"
+   "flag"
+   "fmt"
+   "log"
+   "strings"
+   "time"
+
+   "go.uber.org/atomic"
+
+   
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy"
+)
+
+var (
+   set  string
+   url  string
+   groupID  string
+   streamID string
+   payload  string
+   countint
+   addCols  mapFlag
+   asyncbool
+   succeed  atomic.Int32
+   failed   atomic.Int32
+)
+
+type mapFlag map[string]string
+
+func (f mapFlag) String() string {
+   return fmt.Sprintf("%v", map[string]string(f))
+}
+
+func (f mapFlag) Set(value string) error {
+   split := strings.SplitN(value, "=", 2)
+   if len(split) < 2 {
+   return errors.New("invalid map flag")
+   }
+
+   f[split[0]] = split[1]
+   return nil
+}
+
+func main() {
+   addCols = make(map[string]string)
+   flag.StringVar(&set, "set", "SH_IEG", "dataproxy set")
+   flag.StringVar(&url, "url", dataproxy.DefaultURL, "dataproxy URL")
+   flag.StringVar(&groupID, "group-id", "b_ieg_tglogv3_test", "dataproxy 
group ID")
+   flag.StringVar(&streamID, "stream-id", "GameSvrState", "dataproxy 
stream ID")
+   flag.StringVar(&payload, "payload", 
"GameSvrState|GameSvrId-Test|2023-01-11 10:08:30|127.0.0.1|1", "message 
payload")
+   flag.IntVar(&count, "count", 10, "send count")
+   flag.Var(&addCols, "col", "add columns, for example: -col k1=v1 -col 
k2=v2")
+   flag.BoolVar(&async, "async", false, "send asynchronously")
+   flag.Parse()
+
+   var err error
+   client, err := dataproxy.NewClient(
+   dataproxy.WithSet(set),
+   dataproxy.WithGroupID(groupID),
+   dataproxy.WithURL(url),
+   dataproxy.WithMetricsName("clit"),
+   dataproxy.WithAddColumns(addCols),
+   )
+
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   msg := dataproxy.Message{GroupID: groupID, StreamID: streamID, Payload: 
[]byte(payload)}
+   for i := 0; i < count; i++ {
+   if !async {
+   err = client.Send(context.Background(), msg)
+   if err != nil {
+   fmt.Println(err)
+   }
+   } else {
+   client.SendAsync(context.Background(), msg, onResult)
+   }
+   }
+
+   if async {
+   wait()
+   }
+}
+
+func onResult(msg dataproxy.Message, err error) {
+   if err != nil {
+   fmt.Println("error message, streamID = " + msg.StreamID + ", 
Payload = " + string(msg.Payload))
+   failed.Add(1)
+   } else {
+   succeed.Add(1)
+   }
+}
+
+func wait() {
+   for {
+   if int(succeed.Load()+failed.Load()) >= count {
+   fmt.Println("succeed:", succeed.Load())
+   fmt.Println("failed:", failed.Load())
+   return
+   }
+   time.Sleep(1 * time.Second)
+   }
+}
+
+```
+
+### Options
+
+refer: dataproxy/options.go
+
+``` go
+// Options is the data proxy go client configs

Review Comment:
   data proxy 
   ->
   DataProxy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #8105: [INLONG-7853][Sort] Add common handle for schema-change in sink

2023-05-29 Thread via GitHub


gong commented on code in PR #8105:
URL: https://github.com/apache/inlong/pull/8105#discussion_r1209643175


##
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java:
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.StringJoiner;
+
+/**
+ * Schema-change Utils
+ */
+public final class SchemaChangeUtils {
+
+private final static String DELIMITER = "&";
+private final static String KEY_VALUE_DELIMITER = "=";
+
+private SchemaChangeUtils() {
+}
+
+/**
+ * deserialize the policies to a Map[{@link SchemaChangeType}, {@link 
SchemaChangePolicy}]
+ *
+ * @param policies The policies format by 'key1=value1&key2=value2...'
+ * @return A policy Map[{@link SchemaChangeType}, {@link 
SchemaChangePolicy}]
+ */
+public static Map deserialize(String 
policies) {
+Preconditions.checkNotNull(policies, "policies is null");
+Map policyMap = new HashMap<>();
+for (String kv : policies.split(DELIMITER)) {
+int index = kv.indexOf(KEY_VALUE_DELIMITER);
+if (index < 1 || index == kv.length() - 1) {
+throw new IllegalArgumentException(
+"The format of policies must be like 
'key1=value1&key2=value2...'");
+}
+String typeCode = kv.substring(0, index);
+String policyCode = kv.substring(index + 1);
+SchemaChangeType type;
+SchemaChangePolicy policy;
+try {
+type = 
SchemaChangeType.getInstance(Integer.parseInt(typeCode));
+} catch (NumberFormatException e) {
+throw new IllegalArgumentException(
+String.format("Unsupported type of schema-change: %s 
for InLong", typeCode));
+}
+try {
+policy = 
SchemaChangePolicy.getInstance(Integer.parseInt(policyCode));
+} catch (NumberFormatException e) {
+throw new IllegalArgumentException(
+String.format("Unsupported policy of schema-change: %s 
for InLong", policyCode));
+}
+policyMap.put(type, policy);
+}
+return policyMap;
+}
+
+/**
+ * Serialize the policy Map[{@link SchemaChangeType}, {@link 
SchemaChangePolicy}] to a string
+ *
+ * @param policyMap The policy Map[{@link SchemaChangeType}, {@link 
SchemaChangePolicy}]
+ * @return A string format by 'key1=value1&key2=value2...'
+ */
+public static String serialize(Map 
policyMap) {
+Preconditions.checkNotNull(policyMap, "policyMap is null");
+StringJoiner joiner = new StringJoiner("&");
+for (Entry kv : 
policyMap.entrySet()) {
+joiner.add(kv.getKey().getCode() + "=" + kv.getValue().getCode());
+}
+return joiner.toString();
+}
+
+/**
+ * Extract the schema change types from {@link Operation}
+ *
+ * @param operation The operation
+ * @return Set of {@link SchemaChangeType}
+ */
+public static Set extractSchemaChangeTypes(Operation 
operation) {
+Set types = new HashSet<>();
+switch (operation.getOperationType()) {
+case ALTER:
+AlterOperation alterOperation = (AlterOperation) operation;
+Preconditions.checkState(alterOperation.getAlterColumns() != 
null
+&& !alterOperation.getAlterColumns().isEmpty(), "alter 
columns is empty");
+for (AlterColumn alterColumn : 
alterOperation.getAlterColum

[GitHub] [inlong] e-mhui commented on a diff in pull request #8044: [INLONG-8038][Sort] Optimize MySQL CDC chunk splitting

2023-05-29 Thread via GitHub


e-mhui commented on code in PR #8044:
URL: https://github.com/apache/inlong/pull/8044#discussion_r1209650776


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java:
##
@@ -228,7 +228,10 @@ private void startAsynchronouslySplit() {
 
.map(MySqlSnapshotSplit::toSchemaLessSnapshotSplit)
 .collect(Collectors.toList());
 synchronized (lock) {
-
remainingSplits.addAll(schemaLessSnapshotSplits);
+int size = schemaLessSnapshotSplits.size();

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] xuehuanran opened a new pull request, #8123: [INLONG-8122][TubeMQ] Implement "Heartbeat" method for GO SDK

2023-05-29 Thread via GitHub


xuehuanran opened a new pull request, #8123:
URL: https://github.com/apache/inlong/pull/8123

   - Add `Heartbeat` for GO producer SDK
   - Implement #8122 
   - Partent issue #4968 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] fuweng11 opened a new pull request, #8124: [INLONG-8121][Manager] Supports cluster node status management in the case of multiple manager nodes

2023-05-29 Thread via GitHub


fuweng11 opened a new pull request, #8124:
URL: https://github.com/apache/inlong/pull/8124

   
   
   ### Prepare a Pull Request
   
   - Fixes #8121 
   
   ### Motivation
   
   Supports cluster node status management in the case of multiple manager 
nodes.
   
   ### Modifications
   
   Supports cluster node status management in the case of multiple manager 
nodes.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gunli commented on a diff in pull request #8111: [INLONG-4961][DataProxy] Add dataproxy-sdk-golang

2023-05-29 Thread via GitHub


gunli commented on code in PR #8111:
URL: https://github.com/apache/inlong/pull/8111#discussion_r1209652770


##
inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md:
##
@@ -0,0 +1,197 @@
+# dataproxy-sdk-golang
+
+## Overview
+
+dataproxy-sdk-golang is the golang version of InLong data proxy client SDK.
+

Review Comment:
   fixed now



##
inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md:
##
@@ -0,0 +1,197 @@
+# dataproxy-sdk-golang
+
+## Overview
+
+dataproxy-sdk-golang is the golang version of InLong data proxy client SDK.
+
+
+
+## Features
+
+- Service discovery;
+- Connection pool, buffer pool, byte pool;
+- Backoff retry;
+- Concurrently batch send;
+- Send synchronously;
+- Send asynchronously;
+- Close gracefully;
+- Hookable debug log;
+- Heartbeat;
+- Metrics;
+- Snappy compress;
+- Additional column;
+- Server offline re-balance;
+
+
+## Usage
+
+### example
+
+refer: cli/main.go
+
+``` go
+package main
+
+import (
+   "context"
+   "errors"
+   "flag"
+   "fmt"
+   "log"
+   "strings"
+   "time"
+
+   "go.uber.org/atomic"
+
+   
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy"
+)
+
+var (
+   set  string
+   url  string
+   groupID  string
+   streamID string
+   payload  string
+   countint
+   addCols  mapFlag
+   asyncbool
+   succeed  atomic.Int32
+   failed   atomic.Int32
+)
+
+type mapFlag map[string]string
+
+func (f mapFlag) String() string {
+   return fmt.Sprintf("%v", map[string]string(f))
+}
+
+func (f mapFlag) Set(value string) error {
+   split := strings.SplitN(value, "=", 2)
+   if len(split) < 2 {
+   return errors.New("invalid map flag")
+   }
+
+   f[split[0]] = split[1]
+   return nil
+}
+
+func main() {
+   addCols = make(map[string]string)
+   flag.StringVar(&set, "set", "SH_IEG", "dataproxy set")
+   flag.StringVar(&url, "url", dataproxy.DefaultURL, "dataproxy URL")
+   flag.StringVar(&groupID, "group-id", "b_ieg_tglogv3_test", "dataproxy 
group ID")
+   flag.StringVar(&streamID, "stream-id", "GameSvrState", "dataproxy 
stream ID")
+   flag.StringVar(&payload, "payload", 
"GameSvrState|GameSvrId-Test|2023-01-11 10:08:30|127.0.0.1|1", "message 
payload")
+   flag.IntVar(&count, "count", 10, "send count")
+   flag.Var(&addCols, "col", "add columns, for example: -col k1=v1 -col 
k2=v2")
+   flag.BoolVar(&async, "async", false, "send asynchronously")
+   flag.Parse()
+
+   var err error
+   client, err := dataproxy.NewClient(
+   dataproxy.WithSet(set),
+   dataproxy.WithGroupID(groupID),
+   dataproxy.WithURL(url),
+   dataproxy.WithMetricsName("clit"),
+   dataproxy.WithAddColumns(addCols),
+   )
+
+   if err != nil {
+   log.Fatal(err)
+   }
+
+   msg := dataproxy.Message{GroupID: groupID, StreamID: streamID, Payload: 
[]byte(payload)}
+   for i := 0; i < count; i++ {
+   if !async {
+   err = client.Send(context.Background(), msg)
+   if err != nil {
+   fmt.Println(err)
+   }
+   } else {
+   client.SendAsync(context.Background(), msg, onResult)
+   }
+   }
+
+   if async {
+   wait()
+   }
+}
+
+func onResult(msg dataproxy.Message, err error) {
+   if err != nil {
+   fmt.Println("error message, streamID = " + msg.StreamID + ", 
Payload = " + string(msg.Payload))
+   failed.Add(1)
+   } else {
+   succeed.Add(1)
+   }
+}
+
+func wait() {
+   for {
+   if int(succeed.Load()+failed.Load()) >= count {
+   fmt.Println("succeed:", succeed.Load())
+   fmt.Println("failed:", failed.Load())
+   return
+   }
+   time.Sleep(1 * time.Second)
+   }
+}
+
+```
+
+### Options
+
+refer: dataproxy/options.go
+
+``` go
+// Options is the data proxy go client configs

Review Comment:
   fixed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] featzhang commented on a diff in pull request #8120: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


featzhang commented on code in PR #8120:
URL: https://github.com/apache/inlong/pull/8120#discussion_r1209654300


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java:
##
@@ -52,49 +54,85 @@ public class SchemaChangeUtils {
 static List diffSchema(Schema oldSchema, Schema newSchema) {
 List oldFields = 
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
 List newFields = 
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
-int oi = 0;
-int ni = 0;
+Set oldFieldSet = new HashSet<>(oldFields);
+Set newFieldSet = new HashSet<>(newFields);
+
+Set intersectColSet = Sets.intersection(oldFieldSet, 
newFieldSet);
+Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
 List tableChanges = new ArrayList<>();
-while (ni < newFields.size()) {
-if (oi < oldFields.size() && 
oldFields.get(oi).equals(newFields.get(ni))) {
-oi++;
-ni++;
-} else {
-NestedField newField = newSchema.findField(newFields.get(ni));
+
+// step0: judge whether unknown change
+// just diff two different schema can not distinguish(add + delete) vs 
modify
+// Example first [a, b, c] -> then delete c [a, b] -> add d [a, b, d], 
currently it is only judged as unknown
+// change.
+// In next version,we will judge it is [delete and add] or rename by 
using information extracted from DDL
+if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {
+tableChanges.add(new UnknownColumnChange(
+String.format(" old schema: [%s] and new schema: [%s], it 
is unknown column change",
+oldSchema.toString(), newSchema.toString(;
+return tableChanges;
+}
+
+// step1: judge whether column type change
+for (String colName : intersectColSet) {
+NestedField oldField = oldSchema.findField(colName);
+NestedField newField = newSchema.findField(colName);
+if (!oldField.type().equals(newField.type()) || 
!oldField.doc().equals(newField.doc())) {
 tableChanges.add(
-new AddColumn(
+new TableChange.UpdateColumn(
 new String[]{newField.name()},
 FlinkSchemaUtil.convert(newField.type()),
 !newField.isRequired(),
-newField.doc(),
-ni == 0 ? ColumnPosition.first() : 
ColumnPosition.after(newFields.get(ni - 1;
-ni++;
+newField.doc()));
 }
 }
 
-if (oi != oldFields.size()) {
-tableChanges.clear();
-tableChanges.add(
-new UnknownColumnChange(
-String.format("Unsupported schema update.\n"
-+ "oldSchema:\n%s\n, newSchema:\n %s", 
oldSchema, newSchema)));
+// step2: judge whether delete column
+for (String colName : oldFields) {
+if (colsToDelete.contains(colName)) {
+tableChanges.add(
+new TableChange.DeleteColumn(
+new String[]{colName}));
+}
 }
 
+// step3: judge whether add column
+if (!colsToAdd.isEmpty()) {
+for (int i = 0; i < newFields.size(); i++) {

Review Comment:
   Why do we need to judge again from newFields? In fact, all the elements in 
colsToAdd should come from newFields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] thexiay commented on a diff in pull request #8120: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


thexiay commented on code in PR #8120:
URL: https://github.com/apache/inlong/pull/8120#discussion_r1209655765


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java:
##
@@ -52,49 +54,85 @@ public class SchemaChangeUtils {
 static List diffSchema(Schema oldSchema, Schema newSchema) {
 List oldFields = 
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
 List newFields = 
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
-int oi = 0;
-int ni = 0;
+Set oldFieldSet = new HashSet<>(oldFields);
+Set newFieldSet = new HashSet<>(newFields);
+
+Set intersectColSet = Sets.intersection(oldFieldSet, 
newFieldSet);
+Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
 List tableChanges = new ArrayList<>();
-while (ni < newFields.size()) {
-if (oi < oldFields.size() && 
oldFields.get(oi).equals(newFields.get(ni))) {
-oi++;
-ni++;
-} else {
-NestedField newField = newSchema.findField(newFields.get(ni));
+
+// step0: judge whether unknown change
+// just diff two different schema can not distinguish(add + delete) vs 
modify
+// Example first [a, b, c] -> then delete c [a, b] -> add d [a, b, d], 
currently it is only judged as unknown
+// change.
+// In next version,we will judge it is [delete and add] or rename by 
using information extracted from DDL
+if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {

Review Comment:
   whether to add a reorder column check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] hejiay commented on a diff in pull request #8120: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


hejiay commented on code in PR #8120:
URL: https://github.com/apache/inlong/pull/8120#discussion_r1209656737


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java:
##
@@ -52,49 +54,85 @@ public class SchemaChangeUtils {
 static List diffSchema(Schema oldSchema, Schema newSchema) {
 List oldFields = 
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
 List newFields = 
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
-int oi = 0;
-int ni = 0;
+Set oldFieldSet = new HashSet<>(oldFields);
+Set newFieldSet = new HashSet<>(newFields);
+
+Set intersectColSet = Sets.intersection(oldFieldSet, 
newFieldSet);
+Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
 List tableChanges = new ArrayList<>();
-while (ni < newFields.size()) {
-if (oi < oldFields.size() && 
oldFields.get(oi).equals(newFields.get(ni))) {
-oi++;
-ni++;
-} else {
-NestedField newField = newSchema.findField(newFields.get(ni));
+
+// step0: judge whether unknown change
+// just diff two different schema can not distinguish(add + delete) vs 
modify
+// Example first [a, b, c] -> then delete c [a, b] -> add d [a, b, d], 
currently it is only judged as unknown
+// change.
+// In next version,we will judge it is [delete and add] or rename by 
using information extracted from DDL
+if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {
+tableChanges.add(new UnknownColumnChange(
+String.format(" old schema: [%s] and new schema: [%s], it 
is unknown column change",
+oldSchema.toString(), newSchema.toString(;
+return tableChanges;
+}
+
+// step1: judge whether column type change
+for (String colName : intersectColSet) {
+NestedField oldField = oldSchema.findField(colName);
+NestedField newField = newSchema.findField(colName);
+if (!oldField.type().equals(newField.type()) || 
!oldField.doc().equals(newField.doc())) {
 tableChanges.add(
-new AddColumn(
+new TableChange.UpdateColumn(
 new String[]{newField.name()},
 FlinkSchemaUtil.convert(newField.type()),
 !newField.isRequired(),
-newField.doc(),
-ni == 0 ? ColumnPosition.first() : 
ColumnPosition.after(newFields.get(ni - 1;
-ni++;
+newField.doc()));
 }
 }
 
-if (oi != oldFields.size()) {
-tableChanges.clear();
-tableChanges.add(
-new UnknownColumnChange(
-String.format("Unsupported schema update.\n"
-+ "oldSchema:\n%s\n, newSchema:\n %s", 
oldSchema, newSchema)));
+// step2: judge whether delete column
+for (String colName : oldFields) {
+if (colsToDelete.contains(colName)) {
+tableChanges.add(
+new TableChange.DeleteColumn(
+new String[]{colName}));
+}
 }
 
+// step3: judge whether add column
+if (!colsToAdd.isEmpty()) {
+for (int i = 0; i < newFields.size(); i++) {

Review Comment:
   In order to get the position of the added field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] hejiay commented on a diff in pull request #8120: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


hejiay commented on code in PR #8120:
URL: https://github.com/apache/inlong/pull/8120#discussion_r1209657943


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java:
##
@@ -52,49 +54,85 @@ public class SchemaChangeUtils {
 static List diffSchema(Schema oldSchema, Schema newSchema) {
 List oldFields = 
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
 List newFields = 
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
-int oi = 0;
-int ni = 0;
+Set oldFieldSet = new HashSet<>(oldFields);
+Set newFieldSet = new HashSet<>(newFields);
+
+Set intersectColSet = Sets.intersection(oldFieldSet, 
newFieldSet);
+Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
 List tableChanges = new ArrayList<>();
-while (ni < newFields.size()) {
-if (oi < oldFields.size() && 
oldFields.get(oi).equals(newFields.get(ni))) {
-oi++;
-ni++;
-} else {
-NestedField newField = newSchema.findField(newFields.get(ni));
+
+// step0: judge whether unknown change
+// just diff two different schema can not distinguish(add + delete) vs 
modify
+// Example first [a, b, c] -> then delete c [a, b] -> add d [a, b, d], 
currently it is only judged as unknown
+// change.
+// In next version,we will judge it is [delete and add] or rename by 
using information extracted from DDL
+if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {

Review Comment:
   Excuse me, could you elaborate more?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #8117: [INLONG-8116][Sort] Support table api config setting

2023-05-29 Thread via GitHub


EMsnap commented on code in PR #8117:
URL: https://github.com/apache/inlong/pull/8117#discussion_r1209659371


##
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java:
##
@@ -97,6 +97,31 @@ public class Constants {
  */
 public static final String PIPELINE_NAME = "pipeline.name";
 
+/**
+ * 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/
+ * default AUTO
+ */
+public static final String TABLE_EXEC_SINK_UPSERT_MATERIALIZE = 
"table.exec.sink.upsert-materialize";
+
+public static final ConfigOption UPSERT_MATERIALIZE =
+
key("table.exec.sink.upsert-materialize").defaultValue("AUTO").withDescription("Because
 of the disorder "
++ "of ChangeLog data caused by Shuffle in distributed 
system, the data received by Sink may not "
++ "be the order of global upsert. So add upsert 
materialize operator before upsert sink. It "
++ "receives the upstream changelog records and generate an 
upsert view for the downstream.\n"
++ "By default, the materialize operator will be added when 
a distributed disorder occurs on "
++ "unique keys. You can also choose no 
materialization(NONE) or force materialization(FORCE).");
+
+public static final String TABLE_EXEC_SINK_NOT_NULL_ENFORCER = 
"table.exec.sink.not-null-enforcer";
+
+public static final ConfigOption NOT_NULL_ENFORCER =
+
key("table.exec.sink.not-null-enforcer").defaultValue("ERROR").withDescription("Determines
 how Flink "
++ "enforces NOT NULL column constraints when inserting 
null values.\n"
++ "\n"

Review Comment:
   extra \n



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] fuweng11 commented on a diff in pull request #8100: [INLONG-8098][Manager] Support Inlong user permission control

2023-05-29 Thread via GitHub


fuweng11 commented on code in PR #8100:
URL: https://github.com/apache/inlong/pull/8100#discussion_r1209647609


##
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongUserRoleEntityMapper.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.inlong.manager.dao.entity.InlongUserRoleEntity;
+import org.apache.inlong.manager.pojo.user.InlongRolePageRequest;
+
+import com.github.pagehelper.Page;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface InlongUserRoleEntityMapper {
+
+int insert(InlongUserRoleEntity record);
+
+InlongUserRoleEntity selectById(Integer id);
+
+int updateById(InlongUserRoleEntity record);
+
+Page listByCondition(InlongRolePageRequest request);

Review Comment:
   selectByCondition()



##
inlong-manager/manager-dao/src/main/resources/mappers/InlongUserRoleEntityMapper.xml:
##
@@ -0,0 +1,88 @@
+
+
+
+http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+id, user_name, role_code, disabled, is_deleted, creator, modifier, 
create_time, modify_time, version
+
+
+
+insert into inlong_user_role (id, user_name, role_code,
+  disabled, creator, modifier)
+values (#{id,jdbcType=INTEGER}, #{username,jdbcType=VARCHAR}, 
#{roleCode,jdbcType=VARCHAR},
+#{disabled,jdbcType=SMALLINT},

Review Comment:
   #{disabled,jdbcType=SMALLINT},#{creator,jdbcType=VARCHAR}, 
#{modifier,jdbcType=VARCHAR})



##
inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongRoleController.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.user.InlongRoleInfo;
+import org.apache.inlong.manager.pojo.user.InlongRolePageRequest;
+import org.apache.inlong.manager.pojo.user.InlongRoleRequest;
+import org.apache.inlong.manager.service.operationlog.OperationLog;
+import org.apache.inlong.manager.service.user.InlongRoleService;
+
+import com.github.pagehelper.PageInfo;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/api")
+@Api(tags = "INLONG-USER-API")
+public class InlongRoleController {
+
+@Autowired
+private InlongRoleService inlongRoleService;
+
+@RequestMapping(value = "/role/inlong/get/{id}", method = 
RequestMethod.GET)
+@ApiOperation(value = "Get tenant role")
+@ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = 
true)
+public Response get(@PathVariable int id

[GitHub] [inlong] vernedeng commented on a diff in pull request #8124: [INLONG-8121][Manager] Supports cluster node status management in the case of multiple manager nodes

2023-05-29 Thread via GitHub


vernedeng commented on code in PR #8124:
URL: https://github.com/apache/inlong/pull/8124#discussion_r1209660751


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java:
##
@@ -223,6 +227,13 @@ private void evictClusterNode(HeartbeatMsg heartbeat) {
 protocolTypes = null;
 }
 }
+// If the manager has multiple nodes, need to determine that the 
heartbeat is updated
+ComponentHeartbeatEntity componentHeartbeatEntity = 
componentHeartbeatMapper.selectTimeOutHeartBeat(
+componentHeartbeat.getComponentType(), 
componentHeartbeat.getIp(), heartbeatInterval() * 2L);

Review Comment:
   magic number



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #8109: [INLONG-8108][Manager] WorkflowApprover API Permissions Optimization

2023-05-29 Thread via GitHub


dockerzhang merged PR #8109:
URL: https://github.com/apache/inlong/pull/8109


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-8108][Manager] WorkflowApprover API Permissions Optimization (#8109)

2023-05-29 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new a39e03cc5 [INLONG-8108][Manager] WorkflowApprover API Permissions 
Optimization (#8109)
a39e03cc5 is described below

commit a39e03cc5d1bde2ad9fd3ed583866430be7378aa
Author: Hao <1780095+hnrai...@users.noreply.github.com>
AuthorDate: Tue May 30 11:15:54 2023 +0800

[INLONG-8108][Manager] WorkflowApprover API Permissions Optimization (#8109)
---
 .../inlong/manager/service/core/WorkflowApproverService.java   |  3 ++-
 .../manager/service/core/impl/WorkflowApproverServiceImpl.java | 10 +-
 .../service/workflow/WorkflowApproverServiceImplTest.java  |  2 +-
 .../manager/web/controller/WorkflowApproverController.java |  7 ++-
 4 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/WorkflowApproverService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/WorkflowApproverService.java
index 8c99be293..7473b23ad 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/WorkflowApproverService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/WorkflowApproverService.java
@@ -41,9 +41,10 @@ public interface WorkflowApproverService {
  * Get workflow approver by ID
  *
  * @param id approver id
+ * @param operator operator name
  * @return approver info
  */
-ApproverResponse get(Integer id);
+ApproverResponse get(Integer id, String operator);
 
 /**
  * Get process approver by the process name and task name.
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
index 5d53367ff..874b0efb1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.inlong.manager.pojo.workflow.ApproverPageRequest;
 import org.apache.inlong.manager.pojo.workflow.ApproverRequest;
 import org.apache.inlong.manager.pojo.workflow.ApproverResponse;
 import org.apache.inlong.manager.service.core.WorkflowApproverService;
+import org.apache.inlong.manager.service.user.UserService;
 import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
 import org.apache.inlong.manager.workflow.definition.UserTask;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
@@ -60,6 +61,8 @@ public class WorkflowApproverServiceImpl implements 
WorkflowApproverService {
 private WorkflowApproverEntityMapper approverMapper;
 @Autowired
 private ProcessDefinitionService processDefinitionService;
+@Autowired
+private UserService userService;
 
 @Override
 public Integer save(ApproverRequest request, String operator) {
@@ -87,13 +90,18 @@ public class WorkflowApproverServiceImpl implements 
WorkflowApproverService {
 }
 
 @Override
-public ApproverResponse get(Integer id) {
+public ApproverResponse get(Integer id, String operator) {
 Preconditions.expectNotNull(id, "approver id cannot be null");
+
 WorkflowApproverEntity approverEntity = approverMapper.selectById(id);
 if (approverEntity == null) {
 LOGGER.error("workflow approver not found by id={}", id);
 throw new 
BusinessException(ErrorCodeEnum.WORKFLOW_APPROVER_NOT_FOUND);
 }
+
+userService.checkUser(approverEntity.getApprovers(), operator,
+"Current user does not have permission to get this workflow 
approver info");
+
 return CommonBeanUtils.copyProperties(approverEntity, 
ApproverResponse::new);
 }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowApproverServiceImplTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowApproverServiceImplTest.java
index 76b4a1b63..5182e628b 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowApproverServiceImplTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowApproverServiceImplTest.java
@@ -43,7 +43,7 @@ public class WorkflowApproverServiceImplTest extends 
ServiceBaseTest {
 Assertions.assertTrue(approverList.getList().size() > 0);
 
 Integer id = approverList.getList().get(0).getId();

[GitHub] [inlong] thexiay commented on a diff in pull request #8120: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


thexiay commented on code in PR #8120:
URL: https://github.com/apache/inlong/pull/8120#discussion_r1209667336


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java:
##
@@ -52,49 +54,85 @@ public class SchemaChangeUtils {
 static List diffSchema(Schema oldSchema, Schema newSchema) {
 List oldFields = 
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
 List newFields = 
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
-int oi = 0;
-int ni = 0;
+Set oldFieldSet = new HashSet<>(oldFields);
+Set newFieldSet = new HashSet<>(newFields);
+
+Set intersectColSet = Sets.intersection(oldFieldSet, 
newFieldSet);
+Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
 List tableChanges = new ArrayList<>();
-while (ni < newFields.size()) {
-if (oi < oldFields.size() && 
oldFields.get(oi).equals(newFields.get(ni))) {
-oi++;
-ni++;
-} else {
-NestedField newField = newSchema.findField(newFields.get(ni));
+
+// step0: judge whether unknown change
+// just diff two different schema can not distinguish(add + delete) vs 
modify
+// Example first [a, b, c] -> then delete c [a, b] -> add d [a, b, d], 
currently it is only judged as unknown
+// change.
+// In next version,we will judge it is [delete and add] or rename by 
using information extracted from DDL
+if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {

Review Comment:
   [a,b,c]->[a,c,b]
   it'a possible to happen,so you should consider it as an abnormal case or 
just ignore it.It should clearly described in the method comment. And you 
should add some ut to validate the input and output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu merged pull request #8105: [INLONG-7853][Sort] Add common handle for schema-change in sink

2023-05-29 Thread via GitHub


yunqingmoswu merged PR #8105:
URL: https://github.com/apache/inlong/pull/8105


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (a39e03cc5 -> b9446908f)

2023-05-29 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from a39e03cc5 [INLONG-8108][Manager] WorkflowApprover API Permissions 
Optimization (#8109)
 add b9446908f [INLONG-7853][Sort] Add common handle for schema-change in 
sink (#8105)

No new revisions were added by this update.

Summary of changes:
 .../sort/protocol/enums/SchemaChangePolicy.java|  73 +++
 .../sort/protocol/enums/SchemaChangeType.java  |  94 +
 .../apache/inlong/sort/util/SchemaChangeUtils.java | 229 +
 .../inlong/sort/util/SchemaChangeUtilsTest.java|  62 ++
 .../org/apache/inlong/sort/base/Constants.java |  13 ++
 .../apache/inlong/sort/base/dirty/DirtyType.java   |  12 ++
 .../sort/base/format/JsonDynamicSchemaFormat.java  |   2 +-
 .../base/schema/SchemaChangeHandleException.java   |  20 +-
 8 files changed, 493 insertions(+), 12 deletions(-)
 create mode 100644 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/SchemaChangePolicy.java
 create mode 100644 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/SchemaChangeType.java
 create mode 100644 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
 create mode 100644 
inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/util/SchemaChangeUtilsTest.java
 copy 
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ProxysdkException.java
 => 
inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHandleException.java
 (72%)



[GitHub] [inlong] hejiay commented on a diff in pull request #8120: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg

2023-05-29 Thread via GitHub


hejiay commented on code in PR #8120:
URL: https://github.com/apache/inlong/pull/8120#discussion_r1209672436


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java:
##
@@ -52,49 +54,85 @@ public class SchemaChangeUtils {
 static List diffSchema(Schema oldSchema, Schema newSchema) {
 List oldFields = 
oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
 List newFields = 
newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
-int oi = 0;
-int ni = 0;
+Set oldFieldSet = new HashSet<>(oldFields);
+Set newFieldSet = new HashSet<>(newFields);
+
+Set intersectColSet = Sets.intersection(oldFieldSet, 
newFieldSet);
+Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet);
+Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet);
+
 List tableChanges = new ArrayList<>();
-while (ni < newFields.size()) {
-if (oi < oldFields.size() && 
oldFields.get(oi).equals(newFields.get(ni))) {
-oi++;
-ni++;
-} else {
-NestedField newField = newSchema.findField(newFields.get(ni));
+
+// step0: judge whether unknown change
+// just diff two different schema can not distinguish(add + delete) vs 
modify
+// Example first [a, b, c] -> then delete c [a, b] -> add d [a, b, d], 
currently it is only judged as unknown
+// change.
+// In next version,we will judge it is [delete and add] or rename by 
using information extracted from DDL
+if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) {

Review Comment:
   Thanks,I get it and I will perfect it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #8044: [INLONG-8038][Sort] Optimize MySQL CDC chunk splitting

2023-05-29 Thread via GitHub


gong commented on code in PR #8044:
URL: https://github.com/apache/inlong/pull/8044#discussion_r1209672869


##
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java:
##
@@ -397,6 +397,14 @@ private void closeExecutorService() {
 }
 }
 
+private void addNewlyAddedSplits(List 
schemaLessSnapshotSplits) {
+int size = schemaLessSnapshotSplits.size();
+// move the last snapshot split to the front of the remaining splits 
to prevent OOM
+// caused by the excessive data of the last snapshot split.
+remainingSplits.add(0, schemaLessSnapshotSplits.get(size - 1));
+remainingSplits.addAll(schemaLessSnapshotSplits.subList(0, size - 1));

Review Comment:
   `remainSplits` is all splits, `remainingSplits.add(0, 
schemaLessSnapshotSplits.get(size - 1));` will cause other table split lost



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #8102: [INLONG-8101][Sort] Support multi-version packaging of sort-connectors

2023-05-29 Thread via GitHub


dockerzhang merged PR #8102:
URL: https://github.com/apache/inlong/pull/8102


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (b9446908f -> 383a28a34)

2023-05-29 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from b9446908f [INLONG-7853][Sort] Add common handle for schema-change in 
sink (#8105)
 add 383a28a34 [INLONG-8101][Sort] Support multi-version packaging of 
sort-connectors (#8102)

No new revisions were added by this update.

Summary of changes:
 inlong-sort/sort-flink/pom.xml| 11 ++-
 .../sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml |  2 +-
 .../sort-flink-v1.13/sort-connectors/elasticsearch-6/pom.xml  |  2 +-
 .../sort-flink-v1.13/sort-connectors/elasticsearch-7/pom.xml  |  2 +-
 .../sort-connectors/elasticsearch-base/pom.xml|  2 +-
 .../sort-flink-v1.13/sort-connectors/filesystem/pom.xml   |  2 +-
 .../sort-flink/sort-flink-v1.13/sort-connectors/hbase/pom.xml |  2 +-
 .../sort-flink/sort-flink-v1.13/sort-connectors/hive/pom.xml  |  2 +-
 .../sort-flink/sort-flink-v1.13/sort-connectors/hudi/pom.xml  |  2 +-
 .../sort-flink-v1.13/sort-connectors/iceberg/pom.xml  |  2 +-
 .../sort-flink/sort-flink-v1.13/sort-connectors/jdbc/pom.xml  |  2 +-
 .../sort-flink/sort-flink-v1.13/sort-connectors/kafka/pom.xml |  2 +-
 .../sort-flink/sort-flink-v1.13/sort-connectors/kudu/pom.xml  |  2 +-
 .../sort-flink-v1.13/sort-connectors/mongodb-cdc/pom.xml  |  2 +-
 .../sort-flink-v1.13/sort-connectors/mysql-cdc/pom.xml|  2 +-
 .../sort-flink-v1.13/sort-connectors/oracle-cdc/pom.xml   |  2 +-
 .../sort-flink/sort-flink-v1.13/sort-connectors/pom.xml   |  2 +-
 .../sort-flink-v1.13/sort-connectors/postgres-cdc/pom.xml |  2 +-
 .../sort-flink-v1.13/sort-connectors/pulsar/pom.xml   |  2 +-
 .../sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml |  2 +-
 .../sort-flink-v1.13/sort-connectors/sqlserver-cdc/pom.xml|  2 +-
 .../sort-flink-v1.13/sort-connectors/starrocks/pom.xml|  2 +-
 .../sort-flink-v1.13/sort-connectors/tubemq/pom.xml   |  2 +-
 .../sort-flink/sort-flink-v1.15/sort-connectors/pom.xml   |  3 ++-
 .../sort-flink-v1.15/sort-flink-dependencies/pom.xml  | 10 ++
 25 files changed, 44 insertions(+), 24 deletions(-)



[GitHub] [inlong] healchow commented on a diff in pull request #8119: [INLONG-8118][Manager] Support tenant user permission control

2023-05-29 Thread via GitHub


healchow commented on code in PR #8119:
URL: https://github.com/apache/inlong/pull/8119#discussion_r1209683669


##
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/InlongUserTypeEnum.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.enums;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+import lombok.Getter;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public enum InlongUserTypeEnum implements IntListValuable {
+
+/**
+ * Has all privilege
+ */
+INLONG_ADMIN(0),
+/**
+ * No privilege to manage the system
+ */
+INLONG_OPERATOR(1),
+;
+
+@Getter
+@JsonValue
+private final Integer code;
+
+InlongUserTypeEnum(Integer code) {
+this.code = code;
+}
+
+private static final List ARRAYS = Arrays.stream(values())

Review Comment:
   `ARRAYS` -> `TYPE_CODE_LIST`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #8119: [INLONG-8118][Manager] Support tenant user permission control

2023-05-29 Thread via GitHub


healchow commented on code in PR #8119:
URL: https://github.com/apache/inlong/pull/8119#discussion_r1209684589


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/TenantRoleService.java:
##
@@ -15,20 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.service.user;
 
-import java.util.List;
+import org.apache.inlong.manager.pojo.user.TenantRoleInfo;
+import org.apache.inlong.manager.pojo.user.TenantRolePageRequest;
+import org.apache.inlong.manager.pojo.user.TenantRoleRequest;
+
+import com.github.pagehelper.PageInfo;
 
 /**
  * Role service
  */
-public interface RoleService {
-
-/**
- * Get roles based on username
- *
- * @param username username
- * @return Role list
- */
-List listByUser(String username);
+public interface TenantRoleService {
+
+PageInfo listByCondition(TenantRolePageRequest request);

Review Comment:
   Please add Javadoc for those methods in interface class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #8107: [INLONG-8106][DataProxy] Optimize ConfigManager implementation ( part one )

2023-05-29 Thread via GitHub


healchow commented on code in PR #8107:
URL: https://github.com/apache/inlong/pull/8107#discussion_r1209687344


##
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BlackListConfigHolder.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+/**
+ * save black list configure to list
+ */
+public class BlackListConfigHolder extends VisitConfigHolder {

Review Comment:
   > Blacklist and whitelist are commonly used conventions
   
   Yes, but now more new usages are changed to allow lists and block lists.
   
   Similarly, the default master branch of GitHub is changed from the master 
branch to the main branch~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #8124: [INLONG-8121][Manager] Supports cluster node status management in the case of multiple manager nodes

2023-05-29 Thread via GitHub


dockerzhang merged PR #8124:
URL: https://github.com/apache/inlong/pull/8124


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-8121][Manager] Supports cluster node status management in the case of multiple manager nodes (#8124)

2023-05-29 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 45e6cea21 [INLONG-8121][Manager] Supports cluster node status 
management in the case of multiple manager nodes (#8124)
45e6cea21 is described below

commit 45e6cea2175b3a8d752402190916e35ace1d3a68
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Tue May 30 12:28:32 2023 +0800

[INLONG-8121][Manager] Supports cluster node status management in the case 
of multiple manager nodes (#8124)
---
 .../manager/dao/mapper/ComponentHeartbeatEntityMapper.java  | 10 ++
 .../resources/mappers/ComponentHeartbeatEntityMapper.xml|  9 +
 .../inlong/manager/service/heartbeat/HeartbeatManager.java  | 13 +
 .../src/main/resources/application-dev.properties   |  2 +-
 .../src/main/resources/application-prod.properties  |  2 +-
 .../src/main/resources/application-test.properties  |  2 +-
 6 files changed, 35 insertions(+), 3 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
index 2c35006bd..49607486e 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
@@ -38,6 +38,16 @@ public interface ComponentHeartbeatEntityMapper {
 
 List selectByCondition(@Param("request") 
HeartbeatPageRequest request);
 
+/**
+ * Get the heartbeat by heartbeat interval
+ *
+ * @param component component type
+ * @param instance component address
+ * @param beforeSeconds the modified time was beforeSeconds seconds ago
+ */
+ComponentHeartbeatEntity selectTimeOutHeartBeat(@Param("component") String 
component,
+@Param("instance") String instance, @Param("beforeSeconds") Long 
beforeSeconds);
+
 int deleteByPrimaryKey(Integer id);
 
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
index 6046e82dc..077ae3efd 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
@@ -77,6 +77,15 @@
 order by modify_time desc
 
 
+
+select
+
+from component_heartbeat
+where component = #{component, jdbcType=VARCHAR}
+and instance = #{instance,jdbcType=VARCHAR}
+and modify_time >= DATE_ADD(NOW(), INTERVAL -#{beforeSeconds, 
jdbcType=INTEGER} SECOND)
+
+
 
 delete
 from component_heartbeat
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index c7d20cdcf..894a2df0a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -28,8 +28,10 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.NodeStatus;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
@@ -87,6 +89,8 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
 private InlongClusterNodeEntityMapper clusterNodeMapper;
 @Autowired
 private StreamSourceEntityMapper sourceMapper;
+@Autowired
+private ComponentHeartbeatEntityMapper componentHeartbeatMapper;
 
 @Value("${cluster.heartbeat.interval:30}")
 private Long heartbeatIntervalFactor;
@@ -223,6 +227,15 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
 protocolTypes = null;
 }
 }
+// If t

[inlong] branch master updated (45e6cea21 -> 6fd0f0a1b)

2023-05-29 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 45e6cea21 [INLONG-8121][Manager] Supports cluster node status 
management in the case of multiple manager nodes (#8124)
 add 6fd0f0a1b [INLONG-8110][Sort] Only whole database migration need table 
level metric (#8113)

No new revisions were added by this update.

Summary of changes:
 .../sort/base/metric/sub/SourceTableMetricData.java|  2 ++
 .../sort/cdc/mysql/source/MySqlSourceBuilder.java  |  5 +
 .../cdc/mysql/source/config/MySqlSourceConfig.java |  9 -
 .../mysql/source/config/MySqlSourceConfigFactory.java  |  9 -
 .../cdc/mysql/source/reader/MySqlRecordEmitter.java| 18 --
 .../inlong/sort/cdc/mysql/table/MySqlTableSource.java  |  1 +
 6 files changed, 36 insertions(+), 8 deletions(-)



[inlong] branch master updated: [INLONG-8038][Sort] Optimize MySQL CDC chunk splitting to prevent large chunk (#8044)

2023-05-29 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 792001139 [INLONG-8038][Sort] Optimize MySQL CDC chunk splitting to 
prevent large chunk (#8044)
792001139 is described below

commit 79200113935c88745b5ea22d41440bec7ac4aac7
Author: emhui <111486498+e-m...@users.noreply.github.com>
AuthorDate: Tue May 30 14:27:46 2023 +0800

[INLONG-8038][Sort] Optimize MySQL CDC chunk splitting to prevent large 
chunk (#8044)
---
 .../cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java | 10 +-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index dc17534c2..0d7b14c42 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -230,7 +230,7 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
 
.map(MySqlSnapshotSplit::toSchemaLessSnapshotSplit)
 .collect(Collectors.toList());
 synchronized (lock) {
-
remainingSplits.addAll(schemaLessSnapshotSplits);
+
addNewlyAddedSplits(schemaLessSnapshotSplits);
 remainingTables.remove(nextTable);
 
addAlreadyProcessedTablesIfNotExists(nextTable);
 lock.notify();
@@ -399,6 +399,14 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
 }
 }
 
+private void addNewlyAddedSplits(List 
schemaLessSnapshotSplits) {
+int size = schemaLessSnapshotSplits.size();
+// move the last snapshot split to the front of the remaining splits 
to prevent OOM
+// caused by the excessive data of the last snapshot split.
+remainingSplits.add(0, schemaLessSnapshotSplits.get(size - 1));
+remainingSplits.addAll(schemaLessSnapshotSplits.subList(0, size - 1));
+}
+
 private void addAlreadyProcessedTablesIfNotExists(TableId tableId) {
 if (!alreadyProcessedTables.contains(tableId)) {
 alreadyProcessedTables.add(tableId);



[GitHub] [inlong] vernedeng commented on a diff in pull request #8100: [INLONG-8098][Manager] Support Inlong user permission control

2023-05-29 Thread via GitHub


vernedeng commented on code in PR #8100:
URL: https://github.com/apache/inlong/pull/8100#discussion_r1209767088


##
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongUserRoleEntityMapper.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.inlong.manager.dao.entity.InlongUserRoleEntity;
+import org.apache.inlong.manager.pojo.user.InlongRolePageRequest;
+
+import com.github.pagehelper.Page;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface InlongUserRoleEntityMapper {
+
+int insert(InlongUserRoleEntity record);
+
+InlongUserRoleEntity selectById(Integer id);
+
+int updateById(InlongUserRoleEntity record);
+
+Page listByCondition(InlongRolePageRequest request);

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] vernedeng commented on a diff in pull request #8100: [INLONG-8098][Manager] Support Inlong user permission control

2023-05-29 Thread via GitHub


vernedeng commented on code in PR #8100:
URL: https://github.com/apache/inlong/pull/8100#discussion_r1209768625


##
inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongRoleController.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.user.InlongRoleInfo;
+import org.apache.inlong.manager.pojo.user.InlongRolePageRequest;
+import org.apache.inlong.manager.pojo.user.InlongRoleRequest;
+import org.apache.inlong.manager.service.operationlog.OperationLog;
+import org.apache.inlong.manager.service.user.InlongRoleService;
+
+import com.github.pagehelper.PageInfo;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/api")
+@Api(tags = "INLONG-USER-API")
+public class InlongRoleController {
+
+@Autowired
+private InlongRoleService inlongRoleService;
+
+@RequestMapping(value = "/role/inlong/get/{id}", method = 
RequestMethod.GET)
+@ApiOperation(value = "Get tenant role")
+@ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = 
true)

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] vernedeng commented on a diff in pull request #8100: [INLONG-8098][Manager] Support Inlong user permission control

2023-05-29 Thread via GitHub


vernedeng commented on code in PR #8100:
URL: https://github.com/apache/inlong/pull/8100#discussion_r1209769843


##
inlong-manager/manager-dao/src/main/resources/mappers/InlongUserRoleEntityMapper.xml:
##
@@ -0,0 +1,88 @@
+
+
+
+http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+id, user_name, role_code, disabled, is_deleted, creator, modifier, 
create_time, modify_time, version
+
+
+
+insert into inlong_user_role (id, user_name, role_code,
+  disabled, creator, modifier)
+values (#{id,jdbcType=INTEGER}, #{username,jdbcType=VARCHAR}, 
#{roleCode,jdbcType=VARCHAR},
+#{disabled,jdbcType=SMALLINT},

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] vernedeng commented on a diff in pull request #8100: [INLONG-8098][Manager] Support Inlong user permission control

2023-05-29 Thread via GitHub


vernedeng commented on code in PR #8100:
URL: https://github.com/apache/inlong/pull/8100#discussion_r1209770774


##
inlong-manager/manager-web/sql/apache_inlong_manager.sql:
##
@@ -580,6 +580,29 @@ CREATE TABLE IF NOT EXISTS `user_role`
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='User Role Table';
 
+-- 
+-- Table structure for inlong_user_role
+-- 
+CREATE TABLE IF NOT EXISTS `inlong_user_role`
+(
+`id`  int(11)  NOT NULL AUTO_INCREMENT,
+`user_name`   varchar(256) NOT NULL COMMENT 'Username',
+`role_code`   varchar(256) NOT NULL COMMENT 'User role code',
+`disabled`tinyint(1)   NOT NULL DEFAULT '0' COMMENT 'Whether to 
disabled, 0: enabled, 1: disabled',
+`is_deleted`  int(11)   DEFAULT '0' COMMENT 'Whether to 
delete, 0 is not deleted, if greater than 0, delete',
+`creator` varchar(256) NOT NULL COMMENT 'Creator name',
+`modifier`varchar(256)  DEFAULT NULL COMMENT 'Modifier name',
+`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 
'Create time',
+`modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'Modify time',
+`version` int(11)  NOT NULL DEFAULT '1' COMMENT 'Version number, 
which will be incremented by 1 after modification',
+PRIMARY KEY (`id`),
+UNIQUE KEY `unique_user_role` (`user_name`, `role_code`, `is_deleted`)

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] vernedeng commented on a diff in pull request #8100: [INLONG-8098][Manager] Support Inlong user permission control

2023-05-29 Thread via GitHub


vernedeng commented on code in PR #8100:
URL: https://github.com/apache/inlong/pull/8100#discussion_r1209772418


##
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongUserRoleEntityMapper.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.inlong.manager.dao.entity.InlongUserRoleEntity;
+import org.apache.inlong.manager.pojo.user.InlongRolePageRequest;
+
+import com.github.pagehelper.Page;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface InlongUserRoleEntityMapper {
+
+int insert(InlongUserRoleEntity record);
+
+InlongUserRoleEntity selectById(Integer id);

Review Comment:
   At least not now, there are only two roles in inlong user role managerment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] vernedeng commented on a diff in pull request #8119: [INLONG-8118][Manager] Support tenant user permission control

2023-05-29 Thread via GitHub


vernedeng commented on code in PR #8119:
URL: https://github.com/apache/inlong/pull/8119#discussion_r1209782719


##
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/InlongUserTypeEnum.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.enums;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+import lombok.Getter;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public enum InlongUserTypeEnum implements IntListValuable {
+
+/**
+ * Has all privilege
+ */
+INLONG_ADMIN(0),
+/**
+ * No privilege to manage the system
+ */
+INLONG_OPERATOR(1),
+;
+
+@Getter
+@JsonValue
+private final Integer code;
+
+InlongUserTypeEnum(Integer code) {
+this.code = code;
+}
+
+private static final List ARRAYS = Arrays.stream(values())

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] fuweng11 commented on a diff in pull request #8100: [INLONG-8098][Manager] Support Inlong user permission control

2023-05-29 Thread via GitHub


fuweng11 commented on code in PR #8100:
URL: https://github.com/apache/inlong/pull/8100#discussion_r1209779574


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/InlongRoleServiceImpl.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.user;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongUserRoleEntity;
+import org.apache.inlong.manager.dao.mapper.InlongUserRoleEntityMapper;
+import org.apache.inlong.manager.pojo.user.InlongRoleInfo;
+import org.apache.inlong.manager.pojo.user.InlongRolePageRequest;
+import org.apache.inlong.manager.pojo.user.InlongRoleRequest;
+
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class InlongRoleServiceImpl implements InlongRoleService {
+
+@Autowired
+private InlongUserRoleEntityMapper inlongUserMapper;
+
+@Override
+public PageInfo listByCondition(InlongRolePageRequest 
request) {
+PageHelper.startPage(request.getPageNum(), request.getPageSize());
+Page entityPage = 
inlongUserMapper.selectByCondition(request);
+return entityPage.toPageInfo(entity -> 
CommonBeanUtils.copyProperties(entity, InlongRoleInfo::new));
+}
+
+@Override
+public int save(InlongRoleRequest request) {
+String username = request.getUsername();
+Preconditions.expectNotBlank(username, "Failed to save tenant user 
role, user should not be blank");

Review Comment:
   "Failed to save inlong user role, user should not be blank"



##
inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongRoleController.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.user.InlongRoleInfo;
+import org.apache.inlong.manager.pojo.user.InlongRolePageRequest;
+import org.apache.inlong.manager.pojo.user.InlongRoleRequest;
+import org.apache.inlong.manager.pojo.user.UserRoleCode;
+import org.apache.inlong.manager.service.operationlog.OperationLog;
+import org.apache.inlong.manager.service.user.InlongRoleService;
+
+import com.github.pagehelper.PageInfo;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiOperation;
+import org.apache.shiro.authz.annotation.RequiresRoles;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annota