Re: [PR] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 [inlong]
XiaoYou201 commented on PR #10647: URL: https://github.com/apache/inlong/pull/10647#issuecomment-2247113634 > Please add test result on es 6 or 7 As follow image, I implement es 6 connector and postgreSQL to ElasticSearch6 test successfully . But es6 base on this pr, so, the es6 connector pr will be submitted after this pr merged.    -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 [inlong]
aloyszhang commented on code in PR #10647: URL: https://github.com/apache/inlong/pull/10647#discussion_r1689426722 ## licenses/inlong-sort-connectors/LICENSE: ## @@ -938,6 +938,28 @@ License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE Source : org.apache.flink:flink-connector-jdbc-1.15.4.jar (Please note that the software have been modified.) License : https://github.com/apache/flink/blob/master/LICENSE +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java Review Comment: ```suggestion inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/so
Re: [PR] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 [inlong]
aloyszhang commented on code in PR #10647: URL: https://github.com/apache/inlong/pull/10647#discussion_r1689426722 ## licenses/inlong-sort-connectors/LICENSE: ## @@ -938,6 +938,28 @@ License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE Source : org.apache.flink:flink-connector-jdbc-1.15.4.jar (Please note that the software have been modified.) License : https://github.com/apache/flink/blob/master/LICENSE +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java +1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java Review Comment: ```suggestion 1.3.29 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/ap
[PR] [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task [inlong]
fuweng11 opened a new pull request, #10715: URL: https://github.com/apache/inlong/pull/10715 Fixes #10714 ### Motivation Fix the problem of incorrect deletion of data source task. ### Modifications Fix the problem of incorrect deletion of data source task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10716][SDK] Inlong Transform usage optimization [inlong]
vernedeng opened a new pull request, #10717: URL: https://github.com/apache/inlong/pull/10717 Fixes #10716 ### Motivation ### Modifications ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10716][SDK] Inlong Transform usage optimization [inlong]
aloyszhang commented on code in PR #10717: URL: https://github.com/apache/inlong/pull/10717#discussion_r1689524909 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java: ## @@ -28,17 +29,18 @@ * CsvSourceInfo */ @JsonIgnoreProperties(ignoreUnknown = true) +@SuperBuilder public class CsvSourceInfo extends SourceInfo { -private String delimiter; -private String escapeChar; +private Character delimiter; +private Character escapeChar; Review Comment: Why change the type to `Character`, what if the delimiter is not a `Character` like `\r\n`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10716][SDK] Inlong Transform usage optimization [inlong]
aloyszhang commented on code in PR #10717: URL: https://github.com/apache/inlong/pull/10717#discussion_r1689525564 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java: ## @@ -30,15 +30,15 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class CsvSinkInfo extends SinkInfo { -private String delimiter; -private String escapeChar; +private Character delimiter; +private Character escapeChar; Review Comment: Why change the type to Character? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10716][SDK] Inlong Transform usage optimization [inlong]
aloyszhang commented on code in PR #10717: URL: https://github.com/apache/inlong/pull/10717#discussion_r1689526655 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java: ## @@ -28,8 +30,15 @@ * KvSourceInfo */ @JsonIgnoreProperties(ignoreUnknown = true) +@SuperBuilder +@Data public class KvSourceInfo extends SourceInfo { +private Character entryDelimiter; +private Character kvDelimiter; +private Character escapeChar; +private Character quoteChar; +private Character lineDelimiter; Review Comment: Ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 [inlong]
aloyszhang commented on PR #10647: URL: https://github.com/apache/inlong/pull/10647#issuecomment-2247552859 Flink-1.15 needs this feature too, would you like to work on it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10716][SDK] Inlong Transform usage optimization [inlong]
vernedeng commented on code in PR #10717: URL: https://github.com/apache/inlong/pull/10717#discussion_r1689603608 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java: ## @@ -28,17 +29,18 @@ * CsvSourceInfo */ @JsonIgnoreProperties(ignoreUnknown = true) +@SuperBuilder public class CsvSourceInfo extends SourceInfo { -private String delimiter; -private String escapeChar; +private Character delimiter; +private Character escapeChar; Review Comment: There will be a new decoder to extract messages with multi-characters as one delimiter. The performance gap between a one-character scenario and a multi-character scenario is about 5-7 times -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task [inlong]
fuweng11 merged PR #10715: URL: https://github.com/apache/inlong/pull/10715 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch branch-1.13 updated: [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task (#10715)
This is an automated email from the ASF dual-hosted git repository. wakefu pushed a commit to branch branch-1.13 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/branch-1.13 by this push: new ecfae06b9c [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task (#10715) ecfae06b9c is described below commit ecfae06b9ce1d54c85c685ea30eda20c35c855c1 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed Jul 24 19:31:15 2024 +0800 [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task (#10715) * [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task * [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task --- .../source/AbstractSourceOperateListener.java | 60 ++ .../service/source/AbstractSourceOperator.java | 8 --- .../service/source/StreamSourceServiceImpl.java| 9 ++-- 3 files changed, 7 insertions(+), 70 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java index ccefd5cf10..6b19648b11 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java @@ -18,9 +18,7 @@ package org.apache.inlong.manager.service.listener.source; import org.apache.inlong.manager.common.enums.GroupOperateType; -import org.apache.inlong.manager.common.enums.SourceStatus; import org.apache.inlong.manager.common.enums.TaskEvent; -import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; @@ -33,15 +31,11 @@ import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.event.ListenerResult; import org.apache.inlong.manager.workflow.event.task.SourceOperateListener; -import com.google.common.collect.Lists; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; -import java.util.concurrent.TimeUnit; /** * Event listener of operate resources, such as delete, stop, restart sources. @@ -67,67 +61,21 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList InlongGroupInfo groupInfo = getGroupInfo(context.getProcessForm()); final String groupId = groupInfo.getInlongGroupId(); List streamResponses = streamService.listBriefWithSink(groupId); -List unOperatedSources = Lists.newArrayList(); -streamResponses.forEach(stream -> operateStreamSources(groupId, stream.getInlongStreamId(), -context.getOperator(), unOperatedSources)); - -if (CollectionUtils.isNotEmpty(unOperatedSources)) { -GroupOperateType operateType = getOperateType(context.getProcessForm()); -StringBuilder builder = new StringBuilder("Unsupported operate ").append(operateType).append(" for ("); -unOperatedSources.forEach(source -> builder.append(" ").append(source.getSourceName()).append(" ")); -String errMsg = builder.append(")").toString(); -throw new WorkflowListenerException(errMsg); -} - +streamResponses +.forEach(stream -> operateStreamSources(groupId, stream.getInlongStreamId(), context.getOperator())); return ListenerResult.success(); } /** * Operate stream sources, such as delete, stop, restart. */ -protected void operateStreamSources(String groupId, String streamId, String operator, -List unOperatedSources) { +protected void operateStreamSources(String groupId, String streamId, String operator) { List sources = streamSourceService.listSource(groupId, streamId); sources.forEach(source -> { -if (checkIfOp(source, unOperatedSources)) { -operateStreamSource(source.genSourceRequest(), operator); -} +operateStreamSource(source.genSourceRequest(), operator); }); } -/** - * Check source status. - */ -@SneakyThrows -public boolean checkIfOp(StreamSource streamSource, List unOperatedSources) { -for (int retry = 0; retry < 60; retry++) { -int status = streamSou
(inlong) branch master updated (4425586894 -> cba6abd8a0)
This is an automated email from the ASF dual-hosted git repository. wakefu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 4425586894 [INLONG-10632][Dashboard] Dashboard Add Oceanbase Support (#10698) add cba6abd8a0 [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task (#10715) No new revisions were added by this update. Summary of changes: .../source/AbstractSourceOperateListener.java | 60 ++ .../service/source/AbstractSourceOperator.java | 8 --- .../service/source/StreamSourceServiceImpl.java| 9 ++-- 3 files changed, 7 insertions(+), 70 deletions(-)
Re: [PR] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 [inlong]
XiaoYou201 commented on PR #10647: URL: https://github.com/apache/inlong/pull/10647#issuecomment-2247681190 > BTW, Flink-1.15 needs this feature too, would you like to work on it? yes, I would like to do that, it will done soon~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10716][SDK] Inlong Transform usage optimization [inlong]
aloyszhang merged PR #10717: URL: https://github.com/apache/inlong/pull/10717 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated (cba6abd8a0 -> a4e688171d)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from cba6abd8a0 [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task (#10715) add a4e688171d [INLONG-10716][SDK] Inlong Transform usage optimization (#10717) No new revisions were added by this update. Summary of changes: .../common/pojo/sort/dataflow/DataFlowConfig.java | 1 + .../pojo/sort/dataflow/dataType/CsvConfig.java | 2 +- .../pojo/sort/dataflow/dataType/KvConfig.java | 4 +- .../sdk/transform/decode/CsvSourceDecoder.java | 8 +-- .../sdk/transform/decode/KvSourceDecoder.java | 24 - .../sdk/transform/encode/CsvSinkEncoder.java | 8 +-- .../inlong/sdk/transform/encode/KvSinkEncoder.java | 12 - .../inlong/sdk/transform/pojo/CsvSinkInfo.java | 16 +++--- .../inlong/sdk/transform/pojo/CsvSourceInfo.java | 18 --- .../inlong/sdk/transform/pojo/KvSinkInfo.java | 6 +++ .../inlong/sdk/transform/pojo/KvSourceInfo.java| 9 .../apache/inlong/sdk/transform/pojo/SinkInfo.java | 2 + .../inlong/sdk/transform/pojo/SourceInfo.java | 2 + .../inlong/sdk/transform/process/Context.java | 8 +++ .../transform/process/operator/OperatorTools.java | 4 ++ .../TestTransformArithmeticFunctionsProcessor.java | 2 +- .../transform/process/TestTransformProcessor.java | 22 .../sort-standalone-source/pom.xml | 5 ++ .../inlong/sort/standalone/sink/SinkContext.java | 61 ++ 19 files changed, 172 insertions(+), 42 deletions(-)
Re: [PR] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 [inlong]
EMsnap merged PR #10647: URL: https://github.com/apache/inlong/pull/10647 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 (#10647)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 2d487362d5 [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 (#10647) 2d487362d5 is described below commit 2d487362d5425e7e0d94473257827f44525bf3db Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Thu Jul 25 14:19:28 2024 +0800 [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 (#10647) --- .../sort-connectors/elasticsearch-base/pom.xml | 140 ++ .../elasticsearch/ActionRequestFailureHandler.java | 79 .../elasticsearch/BufferingNoOpRequestIndexer.java | 73 +++ .../elasticsearch/ElasticsearchApiCallBridge.java | 118 + .../sort/elasticsearch/ElasticsearchSinkBase.java | 515 + .../elasticsearch/ElasticsearchSinkFunction.java | 89 .../inlong/sort/elasticsearch/RequestIndexer.java | 79 .../table/AbstractTimeIndexGenerator.java | 40 ++ .../table/ElasticsearchConfiguration.java | 162 +++ .../table/ElasticsearchConnectorOptions.java | 169 +++ .../table/ElasticsearchValidationUtils.java| 94 .../sort/elasticsearch/table/IndexGenerator.java | 39 ++ .../elasticsearch/table/IndexGeneratorBase.java| 51 ++ .../elasticsearch/table/IndexGeneratorFactory.java | 319 + .../sort/elasticsearch/table/KeyExtractor.java | 129 ++ .../sort/elasticsearch/table/RequestFactory.java | 54 +++ .../table/RowElasticsearchSinkFunction.java| 142 ++ .../elasticsearch/table/StaticIndexGenerator.java | 34 ++ .../elasticsearch/util/IgnoringFailureHandler.java | 37 ++ .../elasticsearch/util/NoOpFailureHandler.java | 54 +++ .../util/RetryRejectedExecutionFailureHandler.java | 56 +++ .../sort-flink-v1.18/sort-connectors/pom.xml | 1 + licenses/inlong-sort-connectors/LICENSE| 21 + 23 files changed, 2495 insertions(+) diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml new file mode 100644 index 00..709eefdb06 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml @@ -0,0 +1,140 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + +org.apache.inlong +sort-connectors-v1.18 +1.14.0-SNAPSHOT + +sort-connector-elasticsearch-base-v1.18 +jar +Apache InLong - Sort-connector-elasticsearch-base + + + ${project.parent.parent.parent.parent.parent.basedir} +7.10.2 + + + +org.apache.inlong +sort-flink-dependencies-v1.18 +${project.version} +provided + + + +org.apache.flink +flink-json +${flink.version} +provided + + + +org.elasticsearch +elasticsearch +${elasticsearch.version} + + + +org.ow2.asm +* + + + + + +org.elasticsearch.client +elasticsearch-rest-high-level-client +${elasticsearch.version} + + + +org.apache.flink +flink-runtime +${flink.version} +provided + + + + +org.apache.logging.log4j +log4j-api +provided + + + +org.apache.logging.log4j +log4j-core +provided + + + + + + +org.apache.maven.plugins +maven-shade-plugin + + +shade-flink + +shade + +package + + true + + + org.apache.inlong:sort-connector-* + +org/apache/inlong/** + META-INF/services/org.apache.flink.table.factories.Factory + + + +*:* + +
Re: [PR] [INLONG-10704][Sort] Sort Add Oceanbase Support [inlong]
XiaoYou201 commented on code in PR #10700: URL: https://github.com/apache/inlong/pull/10700#discussion_r1690876562 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/jdbc/JdbcDialectFactory.java: ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.dialect.jdbc; + +public class JdbcDialectFactory implements org.apache.inlong.sort.jdbc.dialect.JdbcDialectFactory { + +@Override +public boolean acceptsURL(String url) { +return url.startsWith("jdbc:mysql:"); +} Review Comment: I realize it, you can click the resolved button mask that this comment was resolved~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10704][Sort] Sort Add Oceanbase Support [inlong]
XiaoYou201 commented on code in PR #10700: URL: https://github.com/apache/inlong/pull/10700#discussion_r1690877425 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/jdbc/JdbcDialect.java: ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.dialect.jdbc; + +import org.apache.inlong.sort.jdbc.converter.clickhouse.ClickHouseRowConverter; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +/** + * JDBC dialect for ClickHouse SQL. + */ +public class JdbcDialect extends AbstractDialect { Review Comment: Plz modify the comment~ "JDBC dialect for MYSQL" will be better? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10704][Sort] Sort Add Oceanbase Support [inlong]
XiaoYou201 commented on code in PR #10700: URL: https://github.com/apache/inlong/pull/10700#discussion_r1690880917 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/jdbc/JdbcDialect.java: ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.dialect.jdbc; + +import org.apache.inlong.sort.jdbc.converter.clickhouse.ClickHouseRowConverter; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +/** + * JDBC dialect for ClickHouse SQL. + */ +public class JdbcDialect extends AbstractDialect { + +public static final Logger LOG = LoggerFactory.getLogger(JdbcDialect.class); + +// Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs: +// https://clickhouse.com/docs/zh/sql-reference/data-types/datetime64 +private static final int MAX_TIMESTAMP_PRECISION = 8; +private static final int MIN_TIMESTAMP_PRECISION = 0; + +// Define MAX/MIN precision of DECIMAL type according to ClickHouse docs: +// https://clickhouse.com/docs/zh/sql-reference/data-types/decimal/ +private static final int MAX_DECIMAL_PRECISION = 128; +private static final int MIN_DECIMAL_PRECISION = 32; +private static final String POINT = "."; + +@Override +public String dialectName() { +return "mysql"; +} Review Comment: LGTM~ ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/jdbc/JdbcDialect.java: ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.dialect.jdbc; + +import org.apache.inlong.sort.jdbc.converter.clickhouse.ClickHouseRowConverter; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +/** + * JDBC dialect for ClickHouse SQL. + */ +public class JdbcDialect extends AbstractDialect { + +public static final Logger LOG = LoggerFactory.getLogger(JdbcDialect.class); + +// Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs: +// https://clickhouse.com/docs/zh/sql-reference/data-types/datetime64 +private static final int MAX_TIMESTAMP_PRECISION = 8; +private static final int MIN_TIMESTAMP_PRECISION = 0; + +// Define MAX/MIN precision of DECIMAL type according to ClickHouse docs: +// https://clickhouse.com/docs/zh/sql-reference/data-types/decimal/
Re: [PR] [INLONG-10704][Sort] Sort Add Oceanbase Support [inlong]
XiaoYou201 commented on PR #10700: URL: https://github.com/apache/inlong/pull/10700#issuecomment-2249561835 > > please add test result with real data from any source to oceanbase > > What kind of test data is needed and where to put it You can refer to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java or package your code to manager container. Then start a job to verify the oceanbase connector work properly.If all right, plz provide some screenshots~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org