Re: [PR] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 [inlong]

2024-07-24 Thread via GitHub


XiaoYou201 commented on PR #10647:
URL: https://github.com/apache/inlong/pull/10647#issuecomment-2247113634

   > Please add test result on es 6 or 7
   As follow image, I implement es 6 connector and  postgreSQL to 
ElasticSearch6 test successfully . But es6 base on this pr, so, the es6 
connector pr will be submitted after  this pr merged.
   
![image](https://github.com/user-attachments/assets/a0c32211-b68b-40d8-b719-5ba6a42c7a31)
   
![image](https://github.com/user-attachments/assets/afae04c9-4bef-4d7e-becb-3b27f9a71b7d)
   
![image](https://github.com/user-attachments/assets/6fb6adea-1b87-4f23-ac13-8036e3fb0bcd)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 [inlong]

2024-07-24 Thread via GitHub


aloyszhang commented on code in PR #10647:
URL: https://github.com/apache/inlong/pull/10647#discussion_r1689426722


##
licenses/inlong-sort-connectors/LICENSE:
##
@@ -938,6 +938,28 @@ License : 
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 Source  : org.apache.flink:flink-connector-jdbc-1.15.4.jar (Please note that 
the software have been modified.)
 License : https://github.com/apache/flink/blob/master/LICENSE
 
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java

Review Comment:
   ```suggestion
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/so

Re: [PR] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 [inlong]

2024-07-24 Thread via GitHub


aloyszhang commented on code in PR #10647:
URL: https://github.com/apache/inlong/pull/10647#discussion_r1689426722


##
licenses/inlong-sort-connectors/LICENSE:
##
@@ -938,6 +938,28 @@ License : 
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 Source  : org.apache.flink:flink-connector-jdbc-1.15.4.jar (Please note that 
the software have been modified.)
 License : https://github.com/apache/flink/blob/master/LICENSE
 
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
+1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java

Review Comment:
   ```suggestion
   1.3.29 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
  
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/ap

[PR] [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task [inlong]

2024-07-24 Thread via GitHub


fuweng11 opened a new pull request, #10715:
URL: https://github.com/apache/inlong/pull/10715

   
   
   
   
   
   
   Fixes #10714
   
   ### Motivation
   
   Fix the problem of incorrect deletion of data source task.
   
   ### Modifications
   
   Fix the problem of incorrect deletion of data source task.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-10716][SDK] Inlong Transform usage optimization [inlong]

2024-07-24 Thread via GitHub


vernedeng opened a new pull request, #10717:
URL: https://github.com/apache/inlong/pull/10717

   
   
   
   
   Fixes #10716 
   
   ### Motivation
   
   
   
   ### Modifications
   
   
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10716][SDK] Inlong Transform usage optimization [inlong]

2024-07-24 Thread via GitHub


aloyszhang commented on code in PR #10717:
URL: https://github.com/apache/inlong/pull/10717#discussion_r1689524909


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java:
##
@@ -28,17 +29,18 @@
  * CsvSourceInfo
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
+@SuperBuilder
 public class CsvSourceInfo extends SourceInfo {
 
-private String delimiter;
-private String escapeChar;
+private Character delimiter;
+private Character escapeChar;

Review Comment:
   Why change the type to `Character`, what if the delimiter is not a 
`Character` like `\r\n`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10716][SDK] Inlong Transform usage optimization [inlong]

2024-07-24 Thread via GitHub


aloyszhang commented on code in PR #10717:
URL: https://github.com/apache/inlong/pull/10717#discussion_r1689525564


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java:
##
@@ -30,15 +30,15 @@
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class CsvSinkInfo extends SinkInfo {
 
-private String delimiter;
-private String escapeChar;
+private Character delimiter;
+private Character escapeChar;

Review Comment:
   Why change the type to Character?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10716][SDK] Inlong Transform usage optimization [inlong]

2024-07-24 Thread via GitHub


aloyszhang commented on code in PR #10717:
URL: https://github.com/apache/inlong/pull/10717#discussion_r1689526655


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java:
##
@@ -28,8 +30,15 @@
  * KvSourceInfo
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
+@SuperBuilder
+@Data
 public class KvSourceInfo extends SourceInfo {
 
+private Character entryDelimiter;
+private Character kvDelimiter;
+private Character escapeChar;
+private Character quoteChar;
+private Character lineDelimiter;

Review Comment:
   Ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 [inlong]

2024-07-24 Thread via GitHub


aloyszhang commented on PR #10647:
URL: https://github.com/apache/inlong/pull/10647#issuecomment-2247552859

   Flink-1.15 needs this feature too, would you like to work on it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10716][SDK] Inlong Transform usage optimization [inlong]

2024-07-24 Thread via GitHub


vernedeng commented on code in PR #10717:
URL: https://github.com/apache/inlong/pull/10717#discussion_r1689603608


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java:
##
@@ -28,17 +29,18 @@
  * CsvSourceInfo
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
+@SuperBuilder
 public class CsvSourceInfo extends SourceInfo {
 
-private String delimiter;
-private String escapeChar;
+private Character delimiter;
+private Character escapeChar;

Review Comment:
   There will be a new decoder to extract messages with multi-characters as one 
delimiter. 
   The performance gap between a one-character scenario and a multi-character 
scenario is about 5-7 times



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task [inlong]

2024-07-24 Thread via GitHub


fuweng11 merged PR #10715:
URL: https://github.com/apache/inlong/pull/10715


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch branch-1.13 updated: [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task (#10715)

2024-07-24 Thread wakefu
This is an automated email from the ASF dual-hosted git repository.

wakefu pushed a commit to branch branch-1.13
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/branch-1.13 by this push:
 new ecfae06b9c [INLONG-10714][Manager] Fix the problem of incorrect 
deletion of data source task (#10715)
ecfae06b9c is described below

commit ecfae06b9ce1d54c85c685ea30eda20c35c855c1
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Jul 24 19:31:15 2024 +0800

[INLONG-10714][Manager] Fix the problem of incorrect deletion of data 
source task (#10715)

* [INLONG-10714][Manager] Fix the problem of incorrect deletion of data 
source task

* [INLONG-10714][Manager] Fix the problem of incorrect deletion of data 
source task
---
 .../source/AbstractSourceOperateListener.java  | 60 ++
 .../service/source/AbstractSourceOperator.java |  8 ---
 .../service/source/StreamSourceServiceImpl.java|  9 ++--
 3 files changed, 7 insertions(+), 70 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index ccefd5cf10..6b19648b11 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -18,9 +18,7 @@
 package org.apache.inlong.manager.service.listener.source;
 
 import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -33,15 +31,11 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 
-import com.google.common.collect.Lists;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Event listener of operate resources, such as delete, stop, restart sources.
@@ -67,67 +61,21 @@ public abstract class AbstractSourceOperateListener 
implements SourceOperateList
 InlongGroupInfo groupInfo = getGroupInfo(context.getProcessForm());
 final String groupId = groupInfo.getInlongGroupId();
 List streamResponses = 
streamService.listBriefWithSink(groupId);
-List unOperatedSources = Lists.newArrayList();
-streamResponses.forEach(stream -> operateStreamSources(groupId, 
stream.getInlongStreamId(),
-context.getOperator(), unOperatedSources));
-
-if (CollectionUtils.isNotEmpty(unOperatedSources)) {
-GroupOperateType operateType = 
getOperateType(context.getProcessForm());
-StringBuilder builder = new StringBuilder("Unsupported operate 
").append(operateType).append(" for (");
-unOperatedSources.forEach(source -> builder.append(" 
").append(source.getSourceName()).append(" "));
-String errMsg = builder.append(")").toString();
-throw new WorkflowListenerException(errMsg);
-}
-
+streamResponses
+.forEach(stream -> operateStreamSources(groupId, 
stream.getInlongStreamId(), context.getOperator()));
 return ListenerResult.success();
 }
 
 /**
  * Operate stream sources, such as delete, stop, restart.
  */
-protected void operateStreamSources(String groupId, String streamId, 
String operator,
-List unOperatedSources) {
+protected void operateStreamSources(String groupId, String streamId, 
String operator) {
 List sources = streamSourceService.listSource(groupId, 
streamId);
 sources.forEach(source -> {
-if (checkIfOp(source, unOperatedSources)) {
-operateStreamSource(source.genSourceRequest(), operator);
-}
+operateStreamSource(source.genSourceRequest(), operator);
 });
 }
 
-/**
- * Check source status.
- */
-@SneakyThrows
-public boolean checkIfOp(StreamSource streamSource, List 
unOperatedSources) {
-for (int retry = 0; retry < 60; retry++) {
-int status = streamSou

(inlong) branch master updated (4425586894 -> cba6abd8a0)

2024-07-24 Thread wakefu
This is an automated email from the ASF dual-hosted git repository.

wakefu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 4425586894 [INLONG-10632][Dashboard] Dashboard Add Oceanbase Support 
(#10698)
 add cba6abd8a0 [INLONG-10714][Manager] Fix the problem of incorrect 
deletion of data source task (#10715)

No new revisions were added by this update.

Summary of changes:
 .../source/AbstractSourceOperateListener.java  | 60 ++
 .../service/source/AbstractSourceOperator.java |  8 ---
 .../service/source/StreamSourceServiceImpl.java|  9 ++--
 3 files changed, 7 insertions(+), 70 deletions(-)



Re: [PR] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 [inlong]

2024-07-24 Thread via GitHub


XiaoYou201 commented on PR #10647:
URL: https://github.com/apache/inlong/pull/10647#issuecomment-2247681190

   > BTW, Flink-1.15 needs this feature too, would you like to work on it?
   
   yes, I would like to do that, it will done soon~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10716][SDK] Inlong Transform usage optimization [inlong]

2024-07-24 Thread via GitHub


aloyszhang merged PR #10717:
URL: https://github.com/apache/inlong/pull/10717


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated (cba6abd8a0 -> a4e688171d)

2024-07-24 Thread aloyszhang
This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from cba6abd8a0 [INLONG-10714][Manager] Fix the problem of incorrect 
deletion of data source task (#10715)
 add a4e688171d [INLONG-10716][SDK] Inlong Transform usage optimization 
(#10717)

No new revisions were added by this update.

Summary of changes:
 .../common/pojo/sort/dataflow/DataFlowConfig.java  |  1 +
 .../pojo/sort/dataflow/dataType/CsvConfig.java |  2 +-
 .../pojo/sort/dataflow/dataType/KvConfig.java  |  4 +-
 .../sdk/transform/decode/CsvSourceDecoder.java |  8 +--
 .../sdk/transform/decode/KvSourceDecoder.java  | 24 -
 .../sdk/transform/encode/CsvSinkEncoder.java   |  8 +--
 .../inlong/sdk/transform/encode/KvSinkEncoder.java | 12 -
 .../inlong/sdk/transform/pojo/CsvSinkInfo.java | 16 +++---
 .../inlong/sdk/transform/pojo/CsvSourceInfo.java   | 18 ---
 .../inlong/sdk/transform/pojo/KvSinkInfo.java  |  6 +++
 .../inlong/sdk/transform/pojo/KvSourceInfo.java|  9 
 .../apache/inlong/sdk/transform/pojo/SinkInfo.java |  2 +
 .../inlong/sdk/transform/pojo/SourceInfo.java  |  2 +
 .../inlong/sdk/transform/process/Context.java  |  8 +++
 .../transform/process/operator/OperatorTools.java  |  4 ++
 .../TestTransformArithmeticFunctionsProcessor.java |  2 +-
 .../transform/process/TestTransformProcessor.java  | 22 
 .../sort-standalone-source/pom.xml |  5 ++
 .../inlong/sort/standalone/sink/SinkContext.java   | 61 ++
 19 files changed, 172 insertions(+), 42 deletions(-)



Re: [PR] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 [inlong]

2024-07-24 Thread via GitHub


EMsnap merged PR #10647:
URL: https://github.com/apache/inlong/pull/10647


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 (#10647)

2024-07-24 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 2d487362d5 [INLONG-10644][Sort] Add the elasticsearch-base module to 
implement elasticsearch connector 6&7 (#10647)
2d487362d5 is described below

commit 2d487362d5425e7e0d94473257827f44525bf3db
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Thu Jul 25 14:19:28 2024 +0800

[INLONG-10644][Sort] Add the elasticsearch-base module to implement 
elasticsearch connector 6&7 (#10647)
---
 .../sort-connectors/elasticsearch-base/pom.xml | 140 ++
 .../elasticsearch/ActionRequestFailureHandler.java |  79 
 .../elasticsearch/BufferingNoOpRequestIndexer.java |  73 +++
 .../elasticsearch/ElasticsearchApiCallBridge.java  | 118 +
 .../sort/elasticsearch/ElasticsearchSinkBase.java  | 515 +
 .../elasticsearch/ElasticsearchSinkFunction.java   |  89 
 .../inlong/sort/elasticsearch/RequestIndexer.java  |  79 
 .../table/AbstractTimeIndexGenerator.java  |  40 ++
 .../table/ElasticsearchConfiguration.java  | 162 +++
 .../table/ElasticsearchConnectorOptions.java   | 169 +++
 .../table/ElasticsearchValidationUtils.java|  94 
 .../sort/elasticsearch/table/IndexGenerator.java   |  39 ++
 .../elasticsearch/table/IndexGeneratorBase.java|  51 ++
 .../elasticsearch/table/IndexGeneratorFactory.java | 319 +
 .../sort/elasticsearch/table/KeyExtractor.java | 129 ++
 .../sort/elasticsearch/table/RequestFactory.java   |  54 +++
 .../table/RowElasticsearchSinkFunction.java| 142 ++
 .../elasticsearch/table/StaticIndexGenerator.java  |  34 ++
 .../elasticsearch/util/IgnoringFailureHandler.java |  37 ++
 .../elasticsearch/util/NoOpFailureHandler.java |  54 +++
 .../util/RetryRejectedExecutionFailureHandler.java |  56 +++
 .../sort-flink-v1.18/sort-connectors/pom.xml   |   1 +
 licenses/inlong-sort-connectors/LICENSE|  21 +
 23 files changed, 2495 insertions(+)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml
new file mode 100644
index 00..709eefdb06
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml
@@ -0,0 +1,140 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+4.0.0
+
+org.apache.inlong
+sort-connectors-v1.18
+1.14.0-SNAPSHOT
+
+sort-connector-elasticsearch-base-v1.18
+jar
+Apache InLong - Sort-connector-elasticsearch-base
+
+
+
${project.parent.parent.parent.parent.parent.basedir}
+7.10.2
+
+
+
+org.apache.inlong
+sort-flink-dependencies-v1.18
+${project.version}
+provided
+
+
+
+org.apache.flink
+flink-json
+${flink.version}
+provided
+
+
+
+org.elasticsearch
+elasticsearch
+${elasticsearch.version}
+
+
+
+org.ow2.asm
+*
+
+
+
+
+
+org.elasticsearch.client
+elasticsearch-rest-high-level-client
+${elasticsearch.version}
+
+
+
+org.apache.flink
+flink-runtime
+${flink.version}
+provided
+
+
+
+
+org.apache.logging.log4j
+log4j-api
+provided
+
+
+
+org.apache.logging.log4j
+log4j-core
+provided
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-shade-plugin
+
+
+shade-flink
+
+shade
+
+package
+
+
true
+
+
+
org.apache.inlong:sort-connector-*
+
+org/apache/inlong/**
+
META-INF/services/org.apache.flink.table.factories.Factory
+
+
+
+*:*
+
+

Re: [PR] [INLONG-10704][Sort] Sort Add Oceanbase Support [inlong]

2024-07-24 Thread via GitHub


XiaoYou201 commented on code in PR #10700:
URL: https://github.com/apache/inlong/pull/10700#discussion_r1690876562


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/jdbc/JdbcDialectFactory.java:
##
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect.jdbc;
+
+public class JdbcDialectFactory implements 
org.apache.inlong.sort.jdbc.dialect.JdbcDialectFactory {
+
+@Override
+public boolean acceptsURL(String url) {
+return url.startsWith("jdbc:mysql:");
+}

Review Comment:
   I realize it, you can click the resolved button mask that this comment was 
resolved~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10704][Sort] Sort Add Oceanbase Support [inlong]

2024-07-24 Thread via GitHub


XiaoYou201 commented on code in PR #10700:
URL: https://github.com/apache/inlong/pull/10700#discussion_r1690877425


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/jdbc/JdbcDialect.java:
##
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect.jdbc;
+
+import org.apache.inlong.sort.jdbc.converter.clickhouse.ClickHouseRowConverter;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * JDBC dialect for ClickHouse SQL.
+ */
+public class JdbcDialect extends AbstractDialect {

Review Comment:
   Plz modify the comment~ "JDBC dialect for MYSQL" will be better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10704][Sort] Sort Add Oceanbase Support [inlong]

2024-07-24 Thread via GitHub


XiaoYou201 commented on code in PR #10700:
URL: https://github.com/apache/inlong/pull/10700#discussion_r1690880917


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/jdbc/JdbcDialect.java:
##
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect.jdbc;
+
+import org.apache.inlong.sort.jdbc.converter.clickhouse.ClickHouseRowConverter;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * JDBC dialect for ClickHouse SQL.
+ */
+public class JdbcDialect extends AbstractDialect {
+
+public static final Logger LOG = 
LoggerFactory.getLogger(JdbcDialect.class);
+
+// Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs:
+// https://clickhouse.com/docs/zh/sql-reference/data-types/datetime64
+private static final int MAX_TIMESTAMP_PRECISION = 8;
+private static final int MIN_TIMESTAMP_PRECISION = 0;
+
+// Define MAX/MIN precision of DECIMAL type according to ClickHouse docs:
+// https://clickhouse.com/docs/zh/sql-reference/data-types/decimal/
+private static final int MAX_DECIMAL_PRECISION = 128;
+private static final int MIN_DECIMAL_PRECISION = 32;
+private static final String POINT = ".";
+
+@Override
+public String dialectName() {
+return "mysql";
+}

Review Comment:
   LGTM~



##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/jdbc/JdbcDialect.java:
##
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect.jdbc;
+
+import org.apache.inlong.sort.jdbc.converter.clickhouse.ClickHouseRowConverter;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * JDBC dialect for ClickHouse SQL.
+ */
+public class JdbcDialect extends AbstractDialect {
+
+public static final Logger LOG = 
LoggerFactory.getLogger(JdbcDialect.class);
+
+// Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs:
+// https://clickhouse.com/docs/zh/sql-reference/data-types/datetime64
+private static final int MAX_TIMESTAMP_PRECISION = 8;
+private static final int MIN_TIMESTAMP_PRECISION = 0;
+
+// Define MAX/MIN precision of DECIMAL type according to ClickHouse docs:
+// https://clickhouse.com/docs/zh/sql-reference/data-types/decimal/

Re: [PR] [INLONG-10704][Sort] Sort Add Oceanbase Support [inlong]

2024-07-24 Thread via GitHub


XiaoYou201 commented on PR #10700:
URL: https://github.com/apache/inlong/pull/10700#issuecomment-2249561835

   > > please add test result with real data from any source to oceanbase
   > 
   > What kind of test data is needed and where to put it
   
   You can refer to 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java
   
   or package your code to manager container. Then start a job to verify the 
oceanbase connector work properly.If all right, plz 
   provide some screenshots~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org