(inlong) 03/03: [INLONG-9822][Manager] Support flink job runtime execution mode configuration (#9823)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git commit 1323736a0b54ad402d12104dcfba61398ff6cbda Author: AloysZhang AuthorDate: Fri Mar 15 15:12:53 2024 +0800 [INLONG-9822][Manager] Support flink job runtime execution mode configuration (#9823) --- .../inlong/manager/common/consts/InlongConstants.java | 3 +++ .../apache/inlong/manager/plugin/flink/FlinkService.java | 2 ++ .../apache/inlong/manager/plugin/flink/dto/FlinkInfo.java | 2 ++ .../inlong/manager/plugin/listener/DeleteSortListener.java | 1 + .../manager/plugin/listener/RestartSortListener.java | 8 +++- .../manager/plugin/listener/StartupSortListener.java | 14 ++ .../org/apache/inlong/sort/configuration/Constants.java| 8 .../src/main/java/org/apache/inlong/sort/Entrance.java | 11 ++- 8 files changed, 43 insertions(+), 6 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java index 581ebb3098..e84dcc6602 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java @@ -90,6 +90,9 @@ public class InlongConstants { public static final Integer DATASYNC_REALTIME_MODE = 1; public static final Integer DATASYNC_OFFLINE_MODE = 2; +public static final String RUNTIME_EXECUTION_MODE_STREAMING = "streaming"; +public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch"; + public static final Integer DISABLE_ZK = 0; public static final Integer ENABLE_ZK = 1; diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java index 69a32b73fa..33a7ca59cd 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java @@ -258,6 +258,8 @@ public class FlinkService { list.add(flinkInfo.getLocalConfPath()); list.add("-checkpoint.interval"); list.add("6"); +list.add("-runtime.execution.mode"); +list.add(flinkInfo.getRuntimeExecutionMode()); list.add("-metrics.audit.proxy.hosts"); list.add(flinkConfig.getAuditProxyHosts()); return list.toArray(new String[0]); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java index f7071ceb4b..4c3c75f855 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java @@ -52,4 +52,6 @@ public class FlinkInfo { private boolean isException = false; private String exceptionMsg; + +private String runtimeExecutionMode; } diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java index 009374e2a6..5e30ad8cb5 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java @@ -119,6 +119,7 @@ public class DeleteSortListener implements SortOperateListener { FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { flinkOperation.delete(flinkInfo); +// TODO if the job is OFFLINE, should delete the scheduler information log.info("job delete success for jobId={}", jobId); } catch (Exception e) { flinkInfo.setException(true); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java index 0c6828c984..242138c1e4 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache
(inlong) 02/03: [INLONG-9813][Manager] Support offline data sync management (#9814)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git commit 5cdd94d4c9d927342b45570c86523948f115d3b2 Author: AloysZhang AuthorDate: Wed Mar 13 16:18:04 2024 +0800 [INLONG-9813][Manager] Support offline data sync management (#9814) --- .../inlong/manager/plugin/listener/StartupSortListener.java | 13 + .../inlong/manager/service/core/impl/AuditServiceImpl.java | 6 -- .../service/listener/group/InitGroupCompleteListener.java | 3 ++- .../service/listener/group/UpdateGroupCompleteListener.java | 3 ++- .../listener/group/apply/ApproveApplyProcessListener.java | 1 - .../service/listener/queue/QueueResourceListener.java | 6 -- .../service/listener/stream/InitStreamCompleteListener.java | 3 ++- .../manager/service/source/AbstractSourceOperator.java | 1 + .../manager/service/source/StreamSourceServiceImpl.java | 3 ++- 9 files changed, 26 insertions(+), 13 deletions(-) diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index ab9e9b55d8..fae5faa278 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -73,7 +73,8 @@ public class StartupSortListener implements SortOperateListener { } log.info("add startup group listener for groupId [{}]", groupId); -return InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()); +return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()) +|| InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())); } @Override @@ -141,9 +142,13 @@ public class StartupSortListener implements SortOperateListener { FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { flinkOperation.genPath(flinkInfo, dataflow); -flinkOperation.start(flinkInfo); -log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId, -streamInfo.getInlongStreamId(), flinkInfo.getJobId()); +// only start job for real-time mode +if (InlongConstants.DATASYNC_REALTIME_MODE + .equals(groupResourceForm.getGroupInfo().getInlongGroupMode())) { +flinkOperation.start(flinkInfo); +log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId, +streamInfo.getInlongStreamId(), flinkInfo.getJobId()); +} } catch (Exception e) { flinkInfo.setException(true); flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java index c0086ada23..217f3b5d25 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java @@ -297,7 +297,8 @@ public class AuditServiceImpl implements AuditService { if (CollectionUtils.isEmpty(request.getAuditIds())) { // properly overwrite audit ids by role and stream config -if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode())) { +if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode()) +|| InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupEntity.getInlongGroupMode())) { auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType); request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType)); } else { @@ -462,7 +463,8 @@ public class AuditServiceImpl implements AuditService { } else { auditSet.add(getAuditId(sinkNodeType, true)); InlongGroupEntity inlongGroup = inlongGroupMapper.selectByGroupId(groupId); -if (InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode())) { +if (InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode()) +|| I
(inlong) 01/03: [INLONG-9781][Manager] Add offline sync task type definition (#9787)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git commit 875e07cd74e83ee2e0011eb84f6b81c53578601e Author: AloysZhang AuthorDate: Thu Mar 7 11:44:20 2024 +0800 [INLONG-9781][Manager] Add offline sync task type definition (#9787) --- .../java/org/apache/inlong/manager/client/ut/BaseTest.java | 2 +- .../inlong/manager/common/consts/InlongConstants.java | 3 ++- .../org/apache/inlong/manager/common/enums/GroupMode.java | 14 +++--- .../manager/plugin/listener/StartupSortListener.java | 2 +- .../apache/inlong/manager/pojo/group/InlongGroupInfo.java | 3 ++- .../inlong/manager/pojo/group/InlongGroupPageRequest.java | 3 ++- .../inlong/manager/pojo/group/InlongGroupRequest.java | 5 +++-- .../inlong/manager/service/core/impl/AuditServiceImpl.java | 4 ++-- .../manager/service/group/InlongGroupServiceImpl.java | 2 +- .../service/listener/group/InitGroupCompleteListener.java | 2 +- .../listener/group/UpdateGroupCompleteListener.java| 2 +- .../service/listener/queue/QueueResourceListener.java | 2 +- .../listener/stream/InitStreamCompleteListener.java| 2 +- .../manager/service/source/AbstractSourceOperator.java | 2 +- .../manager/service/source/StreamSourceServiceImpl.java| 2 +- 15 files changed, 31 insertions(+), 19 deletions(-) diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java index 4d218918f5..9961c16ef8 100644 --- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java +++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java @@ -110,7 +110,7 @@ public class BaseTest { // set enable zk, create resource, group mode, and cluster tag pulsarInfo.setEnableZookeeper(InlongConstants.DISABLE_ZK); pulsarInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE); -pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_MODE); +pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_REALTIME_MODE); pulsarInfo.setInlongClusterTag("default_cluster"); pulsarInfo.setDailyRecords(1000); diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java index c3085972fd..581ebb3098 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java @@ -87,7 +87,8 @@ public class InlongConstants { public static final Integer DELETED_STATUS = 10; public static final Integer STANDARD_MODE = 0; -public static final Integer DATASYNC_MODE = 1; +public static final Integer DATASYNC_REALTIME_MODE = 1; +public static final Integer DATASYNC_OFFLINE_MODE = 2; public static final Integer DISABLE_ZK = 0; public static final Integer ENABLE_ZK = 1; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java index f0db2353b6..d4b417f39a 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java @@ -31,10 +31,18 @@ public enum GroupMode { STANDARD("standard"), /** - * DataSync mode(only Data Synchronization): group init only with sort in InLong Cluster - * StreamSource -> Sort -> StreamSink + * DataSync mode(only Data Synchronization): real-time data sync in stream way, group init only with + * sort in InLong Cluster. + * StreamSource -> Sort -> Sink */ -DATASYNC("datasync"); +DATASYNC("datasync"), + +/** + * DataSync mode(only Data Synchronization): offline data sync in batch way, group init only with sort + * in InLong Cluster. + * BatchSource -> Sort -> Sink + */ +DATASYNC_BATCH("datasync_offline"); @Getter private final String mode; diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index 0b0e55e369..ab9e9b55d8 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener
(inlong) branch dev-offline-sync updated (0ce3d755da -> 1323736a0b)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a change to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git discard 0ce3d755da [INLONG-9822][Manager] Support flink job runtime execution mode configuration (#9823) omit d5af6a94e2 [INLONG-9813][Manager] Support offline data sync management (#9814) omit 9e96c4e37b [INLONG-9781][Manager] Add offline sync task type definition (#9787) add 293ab1ab3e [INLONG-9766][Audit] Support user-defined SocketAddress loader getting AuditProxy (#9767) add 55f639537d [INLONG-9784][Audit] Optimize sending memory management when the audit-proxy config is null (#9786) add 7c4adbf564 [INLONG-9773][Manager] SortSDK configuration support acquire tenant from InlongGroup (#9776) add 03933e4072 [INLONG-9783][Sort] Add compatibility processing of tid to streamId changes in the message deserialization base class (#9785) add 29574c04cf [INLONG-9774][Sort] Support rowdata way of sort InLong message tlog-kv format (#9780) add 2f0bab5e8d [INLONG-9735][Manager] Bump the Spring version 5.3.27 to 5.3.32 (#9790) add 45e7afc108 [INLONG-9788][Sort] Supports data parse that contains delimiters in kv and csv data format (#9789) add 44f2b04677 [INLONG-9793][Manager] Fix the problem of Manager client workflowApi.listprocess failed to pass parameters correctly (#9794) add 5a58040909 [INLONG-9791][Sort] Return null instead of throwing an exception when deserialization by type fails (#9792) add 829a09bb9f [INLONG-9795][Sort] Optimize the definition of enumeration variables and remove the semicolon (#9764) add 0585278b81 [INLONG-9797][Audit] Audit-SDK reporting supports version number dimension (#9800) add 70e36029ea [INLONG-9807][Audit] Add debug log for audit-proxy (#9809) add 039e681f11 [INLONG-9008][Manager] Set the ignoreParseError field to null (#9810) add 3ae27abb3f [INLONG-9802][Agent] Add an agent installer module for agent installation (#9803) add 2ea39a2765 [INLONG-9809][Audit] SDK supports both singleton and non-singleton usage (#9812) add 5bd1010743 [INLONG-9804][Agent] Add Pulsar source for Agent (#9805) add b6e1064dcf [INLONG-9816][Agent] Add config class for installer (#9817) add b08d1612b0 [INLONG-9806][Agent] Add installer configuration file (#9815) add 58b2fc963c [INLONG-9818][Manager] Decode Msg based on the manager's configuration (#9819) add f84ad372a4 [INLONG-8676][Manager] Elasticsearch - Modify the calling method from SDK to HTTP (#9057) add 7189f34d0b [INLONG-9755][Sort] NPE exception when using default configuration to start inlong-sort-standalone (#9824) add 4649b8e546 [INLONG-9825][Manager] Reduce the creation of RestClusterClient (#9826) add 43bcb9d859 [INLONG-9827][Manager] Fix the problem of failed to check if the consumption group exists (#9828) add fd95e72ba5 [INLONG-9833][Agent] Add module state to distinguish whether the module has been downloaded or installed (#9834) add 23bbce2de1 [INLONG-9831][Agent] Increase configuration acquisition capability (#9832) add 8e5dd5003e [INLONG-9829][Agent] Add guardian scripts (#9830) new 875e07cd74 [INLONG-9781][Manager] Add offline sync task type definition (#9787) new 5cdd94d4c9 [INLONG-9813][Manager] Support offline data sync management (#9814) new 1323736a0b [INLONG-9822][Manager] Support flink job runtime execution mode configuration (#9823) This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (0ce3d755da) \ N -- N -- N refs/heads/dev-offline-sync (1323736a0b) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../inlong/agent/conf/AbstractConfiguration.java | 2 +- .../inlong/agent/constant/FetcherConstants.java| 21 +- .../inlong/agent/constant/TaskConstants.java | 10 + .../org/apache/inlong/agent/pojo/PulsarTask.java | 36 ++- .../apache/inlong/agent/pojo/TaskProfileDto.java | 29 +++ .../org/apache/inlong/agent/utils/HttpManager.java | 26 +- .../apache/inlong/agent/core/HeartbeatManager.java | 4 +- inlong-agent/agent-installer/assembly.xml | 60 + i
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
EMsnap commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1528064540 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java: ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.redis.sink; + +import org.apache.inlong.sort.redis.common.config.RedisDataType; +import org.apache.inlong.sort.redis.common.config.SchemaMappingMode; +import org.apache.inlong.sort.redis.common.handler.InlongJedisConfigHandler; +import org.apache.inlong.sort.redis.common.schema.RedisSchema; +import org.apache.inlong.sort.redis.common.schema.RedisSchemaFactory; +import org.apache.inlong.sort.redis.common.schema.StateEncoder; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandlerServices; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; + +import java.time.Duration; +import java.util.Map; + +import static org.apache.flink.util.TimeUtils.parseDuration; +import static org.apache.inlong.sort.redis.common.config.RedisOptions.*; +import static org.apache.inlong.sort.redis.common.config.SchemaMappingMode.*; + +public class RedisDynamicTableSink implements DynamicTableSink { + +private final FlinkJedisConfigBase flinkJedisConfigBase; + +private final ResolvedSchema resolvedSchema; + +private final EncodingFormat> format; + +private final RedisDataType dataType; +private final SchemaMappingMode mappingMode; +private final ReadableConfig config; +private final Map properties; + +private final String inlongMetric; +private final String auditHostAndPorts; + +public RedisDynamicTableSink( +EncodingFormat> format, +ResolvedSchema resolvedSchema, +RedisDataType dataType, +SchemaMappingMode schemaMappingMode, +ReadableConfig config, +Map properties, +String inlongMetric, +String auditHostAndPorts) { +this.format = format; +this.resolvedSchema = resolvedSchema; +this.dataType = dataType; +this.mappingMode = schemaMappingMode; +this.config = config; +this.properties = properties; + +this.inlongMetric = inlongMetric; +this.auditHostAndPorts = auditHostAndPorts; + +flinkJedisConfigBase = RedisHandlerServices +.findRedisHandler(InlongJedisConfigHandler.class, properties) +.createFlinkJedisConfig(config); + +batchSize = config.get(SINK_BATCH_SIZE); +flushInterval = parseDuration(config.get(SINK_FLUSH_INTERVAL)); +expireTime = parseDuration(config.get(EXPIRE_TIME)); +} + +private final Duration expireTime; +private final Long batchSize; +private final Duration flushInterval; + +@Override +public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { +// UPSERT mode +ChangelogMode.Builder builder = ChangelogMode.newBuilder(); +for (RowKind kind : requestedMode.getContainedKinds()) { +if (kind != RowKind.UPDATE_BEFORE) { +builder.addContainedKind(kind); +} +} +return builder.build(); +} + +@Override +public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + +RedisSchema redisSchema = RedisSchemaFactory.createRedisSchema(dataType, mappingMode, resolvedSchema); +redisSchema.validate(resolvedSchema); + +SerializationSchema serializationSchema = +
[PR] [INLONG-928] Remove historical version documents [inlong-website]
bluewang opened a new pull request, #930: URL: https://github.com/apache/inlong-website/pull/930 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes https://github.com/apache/inlong-website/issues/928 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve.* ### Modifications *Describe the modifications you've done.* ### Verifying this change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-9837][Dashboard] Fix displays errors for Pulsar cluster management [inlong]
haifxu opened a new pull request, #9838: URL: https://github.com/apache/inlong/pull/9838 ### Prepare a Pull Request - Fixes #9837 ### Motivation Modify variable name ### Modifications Modify variable name ### Verifying this change - [ ] This change is a trivial rework/code cleanup without any test coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
XiaoYou201 commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1528204801 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java: ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.redis.sink; + +import org.apache.inlong.sort.redis.common.config.RedisDataType; +import org.apache.inlong.sort.redis.common.config.SchemaMappingMode; +import org.apache.inlong.sort.redis.common.handler.InlongJedisConfigHandler; +import org.apache.inlong.sort.redis.common.schema.RedisSchema; +import org.apache.inlong.sort.redis.common.schema.RedisSchemaFactory; +import org.apache.inlong.sort.redis.common.schema.StateEncoder; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandlerServices; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; + +import java.time.Duration; +import java.util.Map; + +import static org.apache.flink.util.TimeUtils.parseDuration; +import static org.apache.inlong.sort.redis.common.config.RedisOptions.*; +import static org.apache.inlong.sort.redis.common.config.SchemaMappingMode.*; + +public class RedisDynamicTableSink implements DynamicTableSink { + +private final FlinkJedisConfigBase flinkJedisConfigBase; + +private final ResolvedSchema resolvedSchema; + +private final EncodingFormat> format; + +private final RedisDataType dataType; +private final SchemaMappingMode mappingMode; +private final ReadableConfig config; +private final Map properties; + +private final String inlongMetric; +private final String auditHostAndPorts; + +public RedisDynamicTableSink( +EncodingFormat> format, +ResolvedSchema resolvedSchema, +RedisDataType dataType, +SchemaMappingMode schemaMappingMode, +ReadableConfig config, +Map properties, +String inlongMetric, +String auditHostAndPorts) { +this.format = format; +this.resolvedSchema = resolvedSchema; +this.dataType = dataType; +this.mappingMode = schemaMappingMode; +this.config = config; +this.properties = properties; + +this.inlongMetric = inlongMetric; +this.auditHostAndPorts = auditHostAndPorts; + +flinkJedisConfigBase = RedisHandlerServices +.findRedisHandler(InlongJedisConfigHandler.class, properties) +.createFlinkJedisConfig(config); + +batchSize = config.get(SINK_BATCH_SIZE); +flushInterval = parseDuration(config.get(SINK_FLUSH_INTERVAL)); +expireTime = parseDuration(config.get(EXPIRE_TIME)); +} + +private final Duration expireTime; +private final Long batchSize; +private final Duration flushInterval; + +@Override +public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { +// UPSERT mode +ChangelogMode.Builder builder = ChangelogMode.newBuilder(); +for (RowKind kind : requestedMode.getContainedKinds()) { +if (kind != RowKind.UPDATE_BEFORE) { +builder.addContainedKind(kind); +} +} +return builder.build(); +} + +@Override +public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + +RedisSchema redisSchema = RedisSchemaFactory.createRedisSchema(dataType, mappingMode, resolvedSchema); +redisSchema.validate(resolvedSchema); + +SerializationSchema serializationSchema = +
Re: [PR] [INLONG-9837][Dashboard] Fix displays errors for Pulsar cluster management [inlong]
dockerzhang merged PR #9838: URL: https://github.com/apache/inlong/pull/9838 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9837][Dashboard] Fix displays errors for Pulsar cluster management (#9838)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new c727f8834a [INLONG-9837][Dashboard] Fix displays errors for Pulsar cluster management (#9838) c727f8834a is described below commit c727f8834aaf652b525b0a6214a75c88e0eac301 Author: haifxu AuthorDate: Mon Mar 18 18:14:17 2024 +0800 [INLONG-9837][Dashboard] Fix displays errors for Pulsar cluster management (#9838) --- inlong-dashboard/src/plugins/clusters/defaults/Pulsar.ts | 4 ++-- inlong-dashboard/src/ui/locales/cn.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/inlong-dashboard/src/plugins/clusters/defaults/Pulsar.ts b/inlong-dashboard/src/plugins/clusters/defaults/Pulsar.ts index bbde86c5f4..2519fe06a3 100644 --- a/inlong-dashboard/src/plugins/clusters/defaults/Pulsar.ts +++ b/inlong-dashboard/src/plugins/clusters/defaults/Pulsar.ts @@ -57,8 +57,8 @@ export default class PulsarCluster rules: [{ required: true }], initialValue: 'public', }) - @I18n('pages.Clusters.Pulsar.Tenant') - tenant: string; + @I18n('pages.Clusters.Pulsar.PulsarTenant') + pulsarTenant: string; @FieldDecorator({ type: 'input', diff --git a/inlong-dashboard/src/ui/locales/cn.json b/inlong-dashboard/src/ui/locales/cn.json index 1d6d5a035f..fa46d6f1ca 100644 --- a/inlong-dashboard/src/ui/locales/cn.json +++ b/inlong-dashboard/src/ui/locales/cn.json @@ -754,7 +754,7 @@ "pages.Clusters.Node.PortRule": "请输入正确的端口", "pages.Clusters.Node.ProtocolTypeRule": "请输入正确的协议类型", "pages.Clusters.Node.Online": "在线", - "pages.Clusters.Pulsar.Tenant": "默认租户", + "pages.Clusters.Pulsar.PulsarTenant": "默认租户", "pages.Clusters.Pulsar.TokenPlaceholder": "如果群集配置了令牌,则为必需", "pages.Clusters.Kafka.ClusterUrl": "集群 URL", "pages.Clusters.Pulsar.ServiceUrlHelper": "用于生产和消费数据",
Re: [PR] [INLONG-9820][Dashboard][Manager] Update Pulsar source field for Ingestion [inlong]
dockerzhang merged PR #9821: URL: https://github.com/apache/inlong/pull/9821 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
aloyszhang commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1528312373 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisDataType.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.redis.common.config; + +import org.apache.flink.annotation.Internal; + +/** + * Redis data type Enum. + * See detail: @see https://redis.io/topics/data-types-intro";>redis data type + */ +@Internal +public enum RedisDataType { +/** + * Redis strings commands are used for managing string values in Redis. + */ +PLAIN, +/** + * A Redis hash is a data type that represents a mapping between a string field and a string value. + * There are two members in hash DataType: + * the first member is Redis hash field. + * the second member is Redis hash value. + */ +HASH, +/** + * Redis Sets are an unordered collection of unique strings. + * Unique means sets does not allow repetition of data in a key. + */ +SET, + +/** + * Bitmaps are not an actual data type, but a set of bit-oriented operations defined on the String type. + * Since strings are binary safe blobs and their maximum length is 512 MB, + * they are suitable to set up to 2^32 different bits. + */ +BITMAP, +; Review Comment: ```suggestion BITMAP; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated (c727f8834a -> fe982bc2b8)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from c727f8834a [INLONG-9837][Dashboard] Fix displays errors for Pulsar cluster management (#9838) add fe982bc2b8 [INLONG-9820][Dashboard][Manager] Update Pulsar source field for Ingestion (#9821) No new revisions were added by this update. Summary of changes: .../org/apache/inlong/agent/pojo/PulsarTask.java | 6 +- .../apache/inlong/agent/pojo/TaskProfileDto.java | 2 +- .../plugins/sources/common/SourceDefaultInfo.ts| 1 + .../src/plugins/sources/defaults/Pulsar.ts | 91 +- inlong-dashboard/src/ui/locales/cn.json| 5 ++ inlong-dashboard/src/ui/locales/en.json| 5 ++ .../pojo/source/pulsar/PulsarSourceRequest.java| 3 + 7 files changed, 105 insertions(+), 8 deletions(-)
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
aloyszhang commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1528315770 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml: ## @@ -43,6 +43,8 @@ hbase hudi kafka +redis + Review Comment: ```suggestion redis ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
aloyszhang commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1528320907 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/pom.xml: ## @@ -0,0 +1,176 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + +org.apache.inlong +sort-connectors-v1.15 +1.12.0-SNAPSHOT + + +sort-connector-redis-v1.15 +Apache InLong - Sort-connector-redis + + + ${project.parent.parent.parent.parent.parent.basedir} + + + + +org.apache.flink +flink-shaded-jackson +2.12.4-15.0 + + + + +org.apache.flink +flink-table-api-java-bridge +1.15.4 Review Comment: replace with ${flink.version} -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
aloyszhang commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1528322815 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisLookupOptions.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.redis.common.config; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Options for the Redis lookup. + */ +@Internal +public class RedisLookupOptions implements Serializable { + +private static final long serialVersionUID = 1L; +private static final int DEFAULT_MAX_RETRY_TIMES = 3; + +private final long cacheMaxSize; +private final long cacheExpireMs; +private final int maxRetryTimes; +/** + * Asynchronous processing has not been implemented yet, but the entry is reserved + */ +private final boolean lookupAsync; + +public RedisLookupOptions( +long cacheMaxSize, long cacheExpireMs, int maxRetryTimes, boolean lookupAsync) { +this.cacheMaxSize = cacheMaxSize; +this.cacheExpireMs = cacheExpireMs; +this.maxRetryTimes = maxRetryTimes; +this.lookupAsync = lookupAsync; +} + +public static Builder builder() { +return new Builder(); +} + +public long getCacheMaxSize() { +return cacheMaxSize; +} + +public long getCacheExpireMs() { +return cacheExpireMs; +} + +public int getMaxRetryTimes() { +return maxRetryTimes; +} + +public boolean getLookupAsync() { Review Comment: Not used, plz remove -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
aloyszhang commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1528322815 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisLookupOptions.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.redis.common.config; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Options for the Redis lookup. + */ +@Internal +public class RedisLookupOptions implements Serializable { + +private static final long serialVersionUID = 1L; +private static final int DEFAULT_MAX_RETRY_TIMES = 3; + +private final long cacheMaxSize; +private final long cacheExpireMs; +private final int maxRetryTimes; +/** + * Asynchronous processing has not been implemented yet, but the entry is reserved + */ +private final boolean lookupAsync; + +public RedisLookupOptions( +long cacheMaxSize, long cacheExpireMs, int maxRetryTimes, boolean lookupAsync) { +this.cacheMaxSize = cacheMaxSize; +this.cacheExpireMs = cacheExpireMs; +this.maxRetryTimes = maxRetryTimes; +this.lookupAsync = lookupAsync; +} + +public static Builder builder() { +return new Builder(); +} + +public long getCacheMaxSize() { +return cacheMaxSize; +} + +public long getCacheExpireMs() { +return cacheExpireMs; +} + +public int getMaxRetryTimes() { +return maxRetryTimes; +} + +public boolean getLookupAsync() { Review Comment: Not used, plz remove -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
aloyszhang commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1528327113 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/InlongJedisConfigHandler.java: ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.redis.common.handler; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler; + +/** + * handler to create flink jedis config. + */ +public interface InlongJedisConfigHandler extends RedisHandler { + +/** + * create flink jedis config use sepecified properties. Review Comment: ```suggestion * create flink jedis config use specified properties. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
XiaoYou201 commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1528407502 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisDataType.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.redis.common.config; + +import org.apache.flink.annotation.Internal; + +/** + * Redis data type Enum. + * See detail: @see https://redis.io/topics/data-types-intro";>redis data type + */ +@Internal +public enum RedisDataType { +/** + * Redis strings commands are used for managing string values in Redis. + */ +PLAIN, +/** + * A Redis hash is a data type that represents a mapping between a string field and a string value. + * There are two members in hash DataType: + * the first member is Redis hash field. + * the second member is Redis hash value. + */ +HASH, +/** + * Redis Sets are an unordered collection of unique strings. + * Unique means sets does not allow repetition of data in a key. + */ +SET, + +/** + * Bitmaps are not an actual data type, but a set of bit-oriented operations defined on the String type. + * Since strings are binary safe blobs and their maximum length is 512 MB, + * they are suitable to set up to 2^32 different bits. + */ +BITMAP, +; Review Comment: thanks suggestion, fixed ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml: ## @@ -43,6 +43,8 @@ hbase hudi kafka +redis + Review Comment: thanks suggestion, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
XiaoYou201 commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1528407874 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/pom.xml: ## @@ -0,0 +1,176 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + +org.apache.inlong +sort-connectors-v1.15 +1.12.0-SNAPSHOT + + +sort-connector-redis-v1.15 +Apache InLong - Sort-connector-redis + + + ${project.parent.parent.parent.parent.parent.basedir} + + + + +org.apache.flink +flink-shaded-jackson +2.12.4-15.0 + + + + +org.apache.flink +flink-table-api-java-bridge +1.15.4 Review Comment: thanks suggestion, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
XiaoYou201 commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1528408728 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/InlongJedisConfigHandler.java: ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.redis.common.handler; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler; + +/** + * handler to create flink jedis config. + */ +public interface InlongJedisConfigHandler extends RedisHandler { + +/** + * create flink jedis config use sepecified properties. Review Comment: thanks suggestion, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-9839][Manager] Optimize the auto assign logic of SortStandalone cluster [inlong]
vernedeng opened a new pull request, #9840: URL: https://github.com/apache/inlong/pull/9840 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #9839 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications *Describe the modifications you've done.* ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-9753][Sort] module inlong-sort-standalone's conf/sid_es_v3.conf file does not support adding Apache license [inlong]
haibo-duan opened a new pull request, #9841: URL: https://github.com/apache/inlong/pull/9841 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #9753 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications *Describe the modifications you've done.* ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9839][Manager] Optimize the auto assign logic of SortStandalone cluster [inlong]
aloyszhang commented on code in PR #9840: URL: https://github.com/apache/inlong/pull/9840#discussion_r1529589155 ## inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java: ## @@ -161,6 +161,6 @@ List selectExistsStreamId(@Param("groupId") String groupId, @Param("sink */ int deleteByInlongGroupIds(@Param("groupIdList") List groupIdList); -String selectAssignedCluster(@Param("dataNodeName") String dataNodeName); +String selectAssignedCluster(@Param("dataNodeName") String dataNodeName, @Param("clusterTag") String clusterTag); Review Comment: Does this modification only affect SortStandalone? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
EMsnap commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1529610943 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java: ## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.redis; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.NetUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import redis.clients.jedis.Jedis; +import redis.embedded.RedisServer; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.*; + +public class RedisTableTest { + +private static int redisPort; + +private static RedisServer redisServer; + +@BeforeClass +public static void setup() { +redisPort = NetUtils.getAvailablePort().getPort(); +// redisPort = 6379; +redisServer = new RedisServer(redisPort); +redisServer.start(); +} + +@AfterClass +public static void cleanup() { +if (redisServer != null) { +redisServer.stop(); +} +} + +@Before +public void prepare() { +Jedis jedis = new Jedis("localhost", redisPort); +// Deletes all keys from all databases. +jedis.flushAll(); +} +@Test +public void testSourceWithGet() { +StreamExecutionEnvironment executionEnv = +StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tableEnv = +StreamTableEnvironment.create(executionEnv); +Jedis jedis = new Jedis("127.0.0.1", redisPort); + +jedis.set("1", "value_1"); +jedis.set("2", "value_2"); + +String dim = "CREATE TABLE dim (" + +"aaa varchar, bbb varchar" + +// " PRIMARY KEY (`key`) NOT ENFORCED" + +") WITH (" + +" 'connector' = 'redis-inlong'," + +" 'command' = 'get'," + +" 'host' = 'localhost'," + +" 'port' = '" + redisPort + "'," + +" 'maxIdle' = '8'," + +" 'minIdle' = '1'," + +" 'maxTotal' = '2'," + +" 'timeout' = '2000'" + +")"; +String source = +"create table source(aaa varchar, proctime as procTime()) " ++ "with ('connector'='datagen', 'rows-per-second'='1', " ++ "'fields.aaa.kind'='sequence', 'fields.aaa.start'='1', 'fields.aaa.end'='2'" ++ ")"; +String sink = "CREATE TABLE sink (" + +"aaa STRING," + +"bbb STRING," + +"PRIMARY KEY (`aaa`) NOT ENFORCED" + +") WITH (" + +" 'connector' = 'redis-inlong'," + +" 'sink.batch-size' = '1'," + +" 'format' = 'csv'," + +" 'data-type' = 'PLAIN'," + +" 'redis-mode' = 'standalone'," + +" 'host' = '127.0.0.1'," + +" 'port' = '" + redisPort + "'," + +" 'maxIdle' = '8'," + +" 'minIdle' = '1'," + +" 'maxTotal' = '2'," + +" 'timeout' = '2000'" + +")"; + +tableEnv.executeSql(dim); +tableEnv.executeSql(source); +tableEnv.executeSql(sink); +String sql = +" insert into sink" ++ " select concat_ws('_', s.aaa, s.aaa),concat_ws('_', d.bbb, s.aaa) from source s" ++ " left join dim for system_time as of s.proctime as d " ++ " on d.aaa = s.aaa"; + +tableEnv.executeSql(sql); + +await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { +asser
Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]
EMsnap commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1529615263 ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java: ## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.redis; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.NetUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import redis.clients.jedis.Jedis; +import redis.embedded.RedisServer; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.*; + +public class RedisTableTest { + +private static int redisPort; + +private static RedisServer redisServer; + +@BeforeClass +public static void setup() { +redisPort = NetUtils.getAvailablePort().getPort(); +// redisPort = 6379; Review Comment: remove useless code ## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java: ## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.redis; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.NetUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import redis.clients.jedis.Jedis; +import redis.embedded.RedisServer; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.*; + +public class RedisTableTest { + +private static int redisPort; + +private static RedisServer redisServer; + +@BeforeClass +public static void setup() { +redisPort = NetUtils.getAvailablePort().getPort(); +// redisPort = 6379; +redisServer = new RedisServer(redisPort); +redisServer.start(); +} + +@AfterClass +public static void cleanup() { +if (redisServer != null) { +redisServer.stop(); +} +} + +@Before +public void prepare() { +Jedis jedis = new Jedis("localhost", redisPort); +// Deletes all keys from all databases. +jedis.flushAll(); +} +@Test +public void testSourceWithGet() { +StreamExecutionEnvironment executionEnv = +StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tableEnv = +StreamTableEnvironment.create(executionEnv); +Jedis jedis = new Jedis("127.0.0.1", redisPort); + +jedis.set("1", "value_1"); +jedis.set("2", "value_2"); + +String dim = "CREATE TABLE dim (" + +"aaa varchar, bbb varchar" + +// " PRIMARY KEY (`key`) NOT ENFORCED" + +") WITH (" + +" 'conn
Re: [PR] [INLONG-9753][Sort] Module inlong-sort-standalone's conf/sid_es_v3.conf file does not support adding Apache license [inlong]
dockerzhang merged PR #9841: URL: https://github.com/apache/inlong/pull/9841 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated (fe982bc2b8 -> b82f908cbf)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from fe982bc2b8 [INLONG-9820][Dashboard][Manager] Update Pulsar source field for Ingestion (#9821) add b82f908cbf [INLONG-9753][Sort] Module inlong-sort-standalone's conf/sid_es_v3.conf file does not support adding Apache license (#9841) No new revisions were added by this update. Summary of changes: .../standalone/config/loader/ClassResourceQueryConsumeConfig.java | 4 1 file changed, 4 insertions(+)