(inlong) 03/03: [INLONG-9822][Manager] Support flink job runtime execution mode configuration (#9823)

2024-03-18 Thread aloyszhang
This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 1323736a0b54ad402d12104dcfba61398ff6cbda
Author: AloysZhang 
AuthorDate: Fri Mar 15 15:12:53 2024 +0800

[INLONG-9822][Manager] Support flink job runtime execution mode 
configuration (#9823)
---
 .../inlong/manager/common/consts/InlongConstants.java  |  3 +++
 .../apache/inlong/manager/plugin/flink/FlinkService.java   |  2 ++
 .../apache/inlong/manager/plugin/flink/dto/FlinkInfo.java  |  2 ++
 .../inlong/manager/plugin/listener/DeleteSortListener.java |  1 +
 .../manager/plugin/listener/RestartSortListener.java   |  8 +++-
 .../manager/plugin/listener/StartupSortListener.java   | 14 ++
 .../org/apache/inlong/sort/configuration/Constants.java|  8 
 .../src/main/java/org/apache/inlong/sort/Entrance.java | 11 ++-
 8 files changed, 43 insertions(+), 6 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 581ebb3098..e84dcc6602 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -90,6 +90,9 @@ public class InlongConstants {
 public static final Integer DATASYNC_REALTIME_MODE = 1;
 public static final Integer DATASYNC_OFFLINE_MODE = 2;
 
+public static final String RUNTIME_EXECUTION_MODE_STREAMING = "streaming";
+public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch";
+
 public static final Integer DISABLE_ZK = 0;
 public static final Integer ENABLE_ZK = 1;
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 69a32b73fa..33a7ca59cd 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -258,6 +258,8 @@ public class FlinkService {
 list.add(flinkInfo.getLocalConfPath());
 list.add("-checkpoint.interval");
 list.add("6");
+list.add("-runtime.execution.mode");
+list.add(flinkInfo.getRuntimeExecutionMode());
 list.add("-metrics.audit.proxy.hosts");
 list.add(flinkConfig.getAuditProxyHosts());
 return list.toArray(new String[0]);
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
index f7071ceb4b..4c3c75f855 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
@@ -52,4 +52,6 @@ public class FlinkInfo {
 private boolean isException = false;
 
 private String exceptionMsg;
+
+private String runtimeExecutionMode;
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 009374e2a6..5e30ad8cb5 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -119,6 +119,7 @@ public class DeleteSortListener implements 
SortOperateListener {
 FlinkOperation flinkOperation = FlinkOperation.getInstance();
 try {
 flinkOperation.delete(flinkInfo);
+// TODO if the job is OFFLINE, should delete the scheduler 
information
 log.info("job delete success for jobId={}", jobId);
 } catch (Exception e) {
 flinkInfo.setException(true);
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 0c6828c984..242138c1e4 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache

(inlong) 02/03: [INLONG-9813][Manager] Support offline data sync management (#9814)

2024-03-18 Thread aloyszhang
This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 5cdd94d4c9d927342b45570c86523948f115d3b2
Author: AloysZhang 
AuthorDate: Wed Mar 13 16:18:04 2024 +0800

[INLONG-9813][Manager] Support offline data sync management (#9814)
---
 .../inlong/manager/plugin/listener/StartupSortListener.java | 13 +
 .../inlong/manager/service/core/impl/AuditServiceImpl.java  |  6 --
 .../service/listener/group/InitGroupCompleteListener.java   |  3 ++-
 .../service/listener/group/UpdateGroupCompleteListener.java |  3 ++-
 .../listener/group/apply/ApproveApplyProcessListener.java   |  1 -
 .../service/listener/queue/QueueResourceListener.java   |  6 --
 .../service/listener/stream/InitStreamCompleteListener.java |  3 ++-
 .../manager/service/source/AbstractSourceOperator.java  |  1 +
 .../manager/service/source/StreamSourceServiceImpl.java |  3 ++-
 9 files changed, 26 insertions(+), 13 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index ab9e9b55d8..fae5faa278 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -73,7 +73,8 @@ public class StartupSortListener implements 
SortOperateListener {
 }
 
 log.info("add startup group listener for groupId [{}]", groupId);
-return 
InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
+return 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())
+|| 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()));
 }
 
 @Override
@@ -141,9 +142,13 @@ public class StartupSortListener implements 
SortOperateListener {
 FlinkOperation flinkOperation = FlinkOperation.getInstance();
 try {
 flinkOperation.genPath(flinkInfo, dataflow);
-flinkOperation.start(flinkInfo);
-log.info("job submit success for groupId = {}, streamId = {}, 
jobId = {}", groupId,
-streamInfo.getInlongStreamId(), flinkInfo.getJobId());
+// only start job for real-time mode
+if (InlongConstants.DATASYNC_REALTIME_MODE
+
.equals(groupResourceForm.getGroupInfo().getInlongGroupMode())) {
+flinkOperation.start(flinkInfo);
+log.info("job submit success for groupId = {}, streamId = 
{}, jobId = {}", groupId,
+streamInfo.getInlongStreamId(), 
flinkInfo.getJobId());
+}
 } catch (Exception e) {
 flinkInfo.setException(true);
 flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index c0086ada23..217f3b5d25 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -297,7 +297,8 @@ public class AuditServiceImpl implements AuditService {
 
 if (CollectionUtils.isEmpty(request.getAuditIds())) {
 // properly overwrite audit ids by role and stream config
-if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode()))
 {
+if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode())
+|| 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupEntity.getInlongGroupMode())) 
{
 auditIdMap.put(getAuditId(sourceNodeType, false), 
sourceNodeType);
 request.setAuditIds(getAuditIds(groupId, streamId, 
sourceNodeType, sinkNodeType));
 } else {
@@ -462,7 +463,8 @@ public class AuditServiceImpl implements AuditService {
 } else {
 auditSet.add(getAuditId(sinkNodeType, true));
 InlongGroupEntity inlongGroup = 
inlongGroupMapper.selectByGroupId(groupId);
-if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode()))
 {
+if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode())
+|| 
I

(inlong) 01/03: [INLONG-9781][Manager] Add offline sync task type definition (#9787)

2024-03-18 Thread aloyszhang
This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 875e07cd74e83ee2e0011eb84f6b81c53578601e
Author: AloysZhang 
AuthorDate: Thu Mar 7 11:44:20 2024 +0800

[INLONG-9781][Manager] Add offline sync task type definition (#9787)
---
 .../java/org/apache/inlong/manager/client/ut/BaseTest.java |  2 +-
 .../inlong/manager/common/consts/InlongConstants.java  |  3 ++-
 .../org/apache/inlong/manager/common/enums/GroupMode.java  | 14 +++---
 .../manager/plugin/listener/StartupSortListener.java   |  2 +-
 .../apache/inlong/manager/pojo/group/InlongGroupInfo.java  |  3 ++-
 .../inlong/manager/pojo/group/InlongGroupPageRequest.java  |  3 ++-
 .../inlong/manager/pojo/group/InlongGroupRequest.java  |  5 +++--
 .../inlong/manager/service/core/impl/AuditServiceImpl.java |  4 ++--
 .../manager/service/group/InlongGroupServiceImpl.java  |  2 +-
 .../service/listener/group/InitGroupCompleteListener.java  |  2 +-
 .../listener/group/UpdateGroupCompleteListener.java|  2 +-
 .../service/listener/queue/QueueResourceListener.java  |  2 +-
 .../listener/stream/InitStreamCompleteListener.java|  2 +-
 .../manager/service/source/AbstractSourceOperator.java |  2 +-
 .../manager/service/source/StreamSourceServiceImpl.java|  2 +-
 15 files changed, 31 insertions(+), 19 deletions(-)

diff --git 
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
index 4d218918f5..9961c16ef8 100644
--- 
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
+++ 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
@@ -110,7 +110,7 @@ public class BaseTest {
 // set enable zk, create resource, group mode, and cluster tag
 pulsarInfo.setEnableZookeeper(InlongConstants.DISABLE_ZK);
 
pulsarInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
-pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_MODE);
+pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_REALTIME_MODE);
 pulsarInfo.setInlongClusterTag("default_cluster");
 
 pulsarInfo.setDailyRecords(1000);
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index c3085972fd..581ebb3098 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -87,7 +87,8 @@ public class InlongConstants {
 public static final Integer DELETED_STATUS = 10;
 
 public static final Integer STANDARD_MODE = 0;
-public static final Integer DATASYNC_MODE = 1;
+public static final Integer DATASYNC_REALTIME_MODE = 1;
+public static final Integer DATASYNC_OFFLINE_MODE = 2;
 
 public static final Integer DISABLE_ZK = 0;
 public static final Integer ENABLE_ZK = 1;
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
index f0db2353b6..d4b417f39a 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
@@ -31,10 +31,18 @@ public enum GroupMode {
 STANDARD("standard"),
 
 /**
- * DataSync mode(only Data Synchronization): group init only with sort in 
InLong Cluster
- * StreamSource -> Sort -> StreamSink
+ * DataSync mode(only Data Synchronization): real-time data sync in stream 
way, group init only with
+ * sort in InLong Cluster.
+ * StreamSource -> Sort -> Sink
  */
-DATASYNC("datasync");
+DATASYNC("datasync"),
+
+/**
+ * DataSync mode(only Data Synchronization): offline data sync in batch 
way, group init only with sort
+ * in InLong Cluster.
+ * BatchSource -> Sort -> Sink
+ */
+DATASYNC_BATCH("datasync_offline");
 
 @Getter
 private final String mode;
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 0b0e55e369..ab9e9b55d8 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener

(inlong) branch dev-offline-sync updated (0ce3d755da -> 1323736a0b)

2024-03-18 Thread aloyszhang
This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a change to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git


 discard 0ce3d755da [INLONG-9822][Manager] Support flink job runtime execution 
mode configuration (#9823)
omit d5af6a94e2 [INLONG-9813][Manager] Support offline data sync management 
(#9814)
omit 9e96c4e37b [INLONG-9781][Manager] Add offline sync task type 
definition (#9787)
 add 293ab1ab3e [INLONG-9766][Audit] Support user-defined SocketAddress 
loader getting AuditProxy (#9767)
 add 55f639537d [INLONG-9784][Audit] Optimize sending memory management 
when the audit-proxy config is null (#9786)
 add 7c4adbf564 [INLONG-9773][Manager] SortSDK configuration support 
acquire tenant from InlongGroup (#9776)
 add 03933e4072 [INLONG-9783][Sort] Add compatibility processing of tid to 
streamId changes in the message deserialization base class (#9785)
 add 29574c04cf [INLONG-9774][Sort] Support rowdata way of sort InLong 
message tlog-kv format (#9780)
 add 2f0bab5e8d [INLONG-9735][Manager] Bump the Spring version 5.3.27 to 
5.3.32  (#9790)
 add 45e7afc108 [INLONG-9788][Sort] Supports data parse that contains 
delimiters in kv and csv data format (#9789)
 add 44f2b04677 [INLONG-9793][Manager] Fix the problem of Manager client 
workflowApi.listprocess failed to pass parameters correctly (#9794)
 add 5a58040909 [INLONG-9791][Sort] Return null instead of throwing an 
exception when deserialization by type fails (#9792)
 add 829a09bb9f [INLONG-9795][Sort] Optimize the definition of enumeration 
variables and remove the semicolon (#9764)
 add 0585278b81 [INLONG-9797][Audit] Audit-SDK reporting supports version 
number dimension (#9800)
 add 70e36029ea [INLONG-9807][Audit] Add debug log for audit-proxy (#9809)
 add 039e681f11 [INLONG-9008][Manager] Set the ignoreParseError field to 
null (#9810)
 add 3ae27abb3f [INLONG-9802][Agent] Add an agent installer module for 
agent installation (#9803)
 add 2ea39a2765 [INLONG-9809][Audit] SDK supports both singleton and 
non-singleton usage (#9812)
 add 5bd1010743 [INLONG-9804][Agent] Add Pulsar source for Agent (#9805)
 add b6e1064dcf [INLONG-9816][Agent] Add config class for installer (#9817)
 add b08d1612b0 [INLONG-9806][Agent] Add installer configuration file 
(#9815)
 add 58b2fc963c [INLONG-9818][Manager] Decode Msg based on the manager's 
configuration (#9819)
 add f84ad372a4 [INLONG-8676][Manager] Elasticsearch - Modify the calling 
method from SDK to HTTP  (#9057)
 add 7189f34d0b [INLONG-9755][Sort] NPE exception when using default 
configuration to start inlong-sort-standalone  (#9824)
 add 4649b8e546 [INLONG-9825][Manager] Reduce the creation of 
RestClusterClient  (#9826)
 add 43bcb9d859 [INLONG-9827][Manager] Fix the problem of failed to check 
if the consumption group exists (#9828)
 add fd95e72ba5 [INLONG-9833][Agent] Add module state to distinguish 
whether the module has been downloaded or installed (#9834)
 add 23bbce2de1 [INLONG-9831][Agent] Increase configuration acquisition 
capability (#9832)
 add 8e5dd5003e [INLONG-9829][Agent] Add guardian scripts (#9830)
 new 875e07cd74 [INLONG-9781][Manager] Add offline sync task type 
definition (#9787)
 new 5cdd94d4c9 [INLONG-9813][Manager] Support offline data sync management 
(#9814)
 new 1323736a0b [INLONG-9822][Manager] Support flink job runtime execution 
mode configuration (#9823)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (0ce3d755da)
\
 N -- N -- N   refs/heads/dev-offline-sync (1323736a0b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../inlong/agent/conf/AbstractConfiguration.java   |   2 +-
 .../inlong/agent/constant/FetcherConstants.java|  21 +-
 .../inlong/agent/constant/TaskConstants.java   |  10 +
 .../org/apache/inlong/agent/pojo/PulsarTask.java   |  36 ++-
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |  29 +++
 .../org/apache/inlong/agent/utils/HttpManager.java |  26 +-
 .../apache/inlong/agent/core/HeartbeatManager.java |   4 +-
 inlong-agent/agent-installer/assembly.xml  |  60 +
 i

Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


EMsnap commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1528064540


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java:
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.sink;
+
+import org.apache.inlong.sort.redis.common.config.RedisDataType;
+import org.apache.inlong.sort.redis.common.config.SchemaMappingMode;
+import org.apache.inlong.sort.redis.common.handler.InlongJedisConfigHandler;
+import org.apache.inlong.sort.redis.common.schema.RedisSchema;
+import org.apache.inlong.sort.redis.common.schema.RedisSchemaFactory;
+import org.apache.inlong.sort.redis.common.schema.StateEncoder;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandlerServices;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import java.time.Duration;
+import java.util.Map;
+
+import static org.apache.flink.util.TimeUtils.parseDuration;
+import static org.apache.inlong.sort.redis.common.config.RedisOptions.*;
+import static org.apache.inlong.sort.redis.common.config.SchemaMappingMode.*;
+
+public class RedisDynamicTableSink implements DynamicTableSink {
+
+private final FlinkJedisConfigBase flinkJedisConfigBase;
+
+private final ResolvedSchema resolvedSchema;
+
+private final EncodingFormat> format;
+
+private final RedisDataType dataType;
+private final SchemaMappingMode mappingMode;
+private final ReadableConfig config;
+private final Map properties;
+
+private final String inlongMetric;
+private final String auditHostAndPorts;
+
+public RedisDynamicTableSink(
+EncodingFormat> format,
+ResolvedSchema resolvedSchema,
+RedisDataType dataType,
+SchemaMappingMode schemaMappingMode,
+ReadableConfig config,
+Map properties,
+String inlongMetric,
+String auditHostAndPorts) {
+this.format = format;
+this.resolvedSchema = resolvedSchema;
+this.dataType = dataType;
+this.mappingMode = schemaMappingMode;
+this.config = config;
+this.properties = properties;
+
+this.inlongMetric = inlongMetric;
+this.auditHostAndPorts = auditHostAndPorts;
+
+flinkJedisConfigBase = RedisHandlerServices
+.findRedisHandler(InlongJedisConfigHandler.class, properties)
+.createFlinkJedisConfig(config);
+
+batchSize = config.get(SINK_BATCH_SIZE);
+flushInterval = parseDuration(config.get(SINK_FLUSH_INTERVAL));
+expireTime = parseDuration(config.get(EXPIRE_TIME));
+}
+
+private final Duration expireTime;
+private final Long batchSize;
+private final Duration flushInterval;
+
+@Override
+public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+// UPSERT mode
+ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+for (RowKind kind : requestedMode.getContainedKinds()) {
+if (kind != RowKind.UPDATE_BEFORE) {
+builder.addContainedKind(kind);
+}
+}
+return builder.build();
+}
+
+@Override
+public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+
+RedisSchema redisSchema = 
RedisSchemaFactory.createRedisSchema(dataType, mappingMode, resolvedSchema);
+redisSchema.validate(resolvedSchema);
+
+SerializationSchema serializationSchema =
+

[PR] [INLONG-928] Remove historical version documents [inlong-website]

2024-03-18 Thread via GitHub


bluewang opened a new pull request, #930:
URL: https://github.com/apache/inlong-website/pull/930

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes https://github.com/apache/inlong-website/issues/928
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve.*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-9837][Dashboard] Fix displays errors for Pulsar cluster management [inlong]

2024-03-18 Thread via GitHub


haifxu opened a new pull request, #9838:
URL: https://github.com/apache/inlong/pull/9838

   ### Prepare a Pull Request
   
   - Fixes #9837 
   
   ### Motivation
   
   Modify variable name
   
   ### Modifications
   
   Modify variable name
   
   ### Verifying this change
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


XiaoYou201 commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1528204801


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java:
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.sink;
+
+import org.apache.inlong.sort.redis.common.config.RedisDataType;
+import org.apache.inlong.sort.redis.common.config.SchemaMappingMode;
+import org.apache.inlong.sort.redis.common.handler.InlongJedisConfigHandler;
+import org.apache.inlong.sort.redis.common.schema.RedisSchema;
+import org.apache.inlong.sort.redis.common.schema.RedisSchemaFactory;
+import org.apache.inlong.sort.redis.common.schema.StateEncoder;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandlerServices;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import java.time.Duration;
+import java.util.Map;
+
+import static org.apache.flink.util.TimeUtils.parseDuration;
+import static org.apache.inlong.sort.redis.common.config.RedisOptions.*;
+import static org.apache.inlong.sort.redis.common.config.SchemaMappingMode.*;
+
+public class RedisDynamicTableSink implements DynamicTableSink {
+
+private final FlinkJedisConfigBase flinkJedisConfigBase;
+
+private final ResolvedSchema resolvedSchema;
+
+private final EncodingFormat> format;
+
+private final RedisDataType dataType;
+private final SchemaMappingMode mappingMode;
+private final ReadableConfig config;
+private final Map properties;
+
+private final String inlongMetric;
+private final String auditHostAndPorts;
+
+public RedisDynamicTableSink(
+EncodingFormat> format,
+ResolvedSchema resolvedSchema,
+RedisDataType dataType,
+SchemaMappingMode schemaMappingMode,
+ReadableConfig config,
+Map properties,
+String inlongMetric,
+String auditHostAndPorts) {
+this.format = format;
+this.resolvedSchema = resolvedSchema;
+this.dataType = dataType;
+this.mappingMode = schemaMappingMode;
+this.config = config;
+this.properties = properties;
+
+this.inlongMetric = inlongMetric;
+this.auditHostAndPorts = auditHostAndPorts;
+
+flinkJedisConfigBase = RedisHandlerServices
+.findRedisHandler(InlongJedisConfigHandler.class, properties)
+.createFlinkJedisConfig(config);
+
+batchSize = config.get(SINK_BATCH_SIZE);
+flushInterval = parseDuration(config.get(SINK_FLUSH_INTERVAL));
+expireTime = parseDuration(config.get(EXPIRE_TIME));
+}
+
+private final Duration expireTime;
+private final Long batchSize;
+private final Duration flushInterval;
+
+@Override
+public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+// UPSERT mode
+ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+for (RowKind kind : requestedMode.getContainedKinds()) {
+if (kind != RowKind.UPDATE_BEFORE) {
+builder.addContainedKind(kind);
+}
+}
+return builder.build();
+}
+
+@Override
+public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+
+RedisSchema redisSchema = 
RedisSchemaFactory.createRedisSchema(dataType, mappingMode, resolvedSchema);
+redisSchema.validate(resolvedSchema);
+
+SerializationSchema serializationSchema =
+

Re: [PR] [INLONG-9837][Dashboard] Fix displays errors for Pulsar cluster management [inlong]

2024-03-18 Thread via GitHub


dockerzhang merged PR #9838:
URL: https://github.com/apache/inlong/pull/9838


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-9837][Dashboard] Fix displays errors for Pulsar cluster management (#9838)

2024-03-18 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new c727f8834a [INLONG-9837][Dashboard] Fix displays errors for Pulsar 
cluster management (#9838)
c727f8834a is described below

commit c727f8834aaf652b525b0a6214a75c88e0eac301
Author: haifxu 
AuthorDate: Mon Mar 18 18:14:17 2024 +0800

[INLONG-9837][Dashboard] Fix displays errors for Pulsar cluster management 
(#9838)
---
 inlong-dashboard/src/plugins/clusters/defaults/Pulsar.ts | 4 ++--
 inlong-dashboard/src/ui/locales/cn.json  | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/inlong-dashboard/src/plugins/clusters/defaults/Pulsar.ts 
b/inlong-dashboard/src/plugins/clusters/defaults/Pulsar.ts
index bbde86c5f4..2519fe06a3 100644
--- a/inlong-dashboard/src/plugins/clusters/defaults/Pulsar.ts
+++ b/inlong-dashboard/src/plugins/clusters/defaults/Pulsar.ts
@@ -57,8 +57,8 @@ export default class PulsarCluster
 rules: [{ required: true }],
 initialValue: 'public',
   })
-  @I18n('pages.Clusters.Pulsar.Tenant')
-  tenant: string;
+  @I18n('pages.Clusters.Pulsar.PulsarTenant')
+  pulsarTenant: string;
 
   @FieldDecorator({
 type: 'input',
diff --git a/inlong-dashboard/src/ui/locales/cn.json 
b/inlong-dashboard/src/ui/locales/cn.json
index 1d6d5a035f..fa46d6f1ca 100644
--- a/inlong-dashboard/src/ui/locales/cn.json
+++ b/inlong-dashboard/src/ui/locales/cn.json
@@ -754,7 +754,7 @@
   "pages.Clusters.Node.PortRule": "请输入正确的端口",
   "pages.Clusters.Node.ProtocolTypeRule": "请输入正确的协议类型",
   "pages.Clusters.Node.Online": "在线",
-  "pages.Clusters.Pulsar.Tenant": "默认租户",
+  "pages.Clusters.Pulsar.PulsarTenant": "默认租户",
   "pages.Clusters.Pulsar.TokenPlaceholder": "如果群集配置了令牌,则为必需",
   "pages.Clusters.Kafka.ClusterUrl": "集群 URL",
   "pages.Clusters.Pulsar.ServiceUrlHelper": "用于生产和消费数据",



Re: [PR] [INLONG-9820][Dashboard][Manager] Update Pulsar source field for Ingestion [inlong]

2024-03-18 Thread via GitHub


dockerzhang merged PR #9821:
URL: https://github.com/apache/inlong/pull/9821


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


aloyszhang commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1528312373


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisDataType.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Redis data type Enum.
+ * See detail: @see https://redis.io/topics/data-types-intro";>redis 
data type
+ */
+@Internal
+public enum RedisDataType {
+/**
+ * Redis strings commands are used for managing string values in Redis.
+ */
+PLAIN,
+/**
+ * A Redis hash is a data type that represents a mapping between a string 
field and a string value.
+ * There are two members in hash DataType: 
+ * the first member is Redis hash field.
+ * the second member is Redis hash value.
+ */
+HASH,
+/**
+ * Redis Sets are an unordered collection of unique strings.
+ * Unique means sets does not allow repetition of data in a key.
+ */
+SET,
+
+/**
+ * Bitmaps are not an actual data type, but a set of bit-oriented 
operations defined on the String type. 
+ * Since strings are binary safe blobs and their maximum length is 512 MB, 

+ * they are suitable to set up to 2^32 different bits.
+ */
+BITMAP,
+;

Review Comment:
   ```suggestion
   BITMAP;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated (c727f8834a -> fe982bc2b8)

2024-03-18 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from c727f8834a [INLONG-9837][Dashboard] Fix displays errors for Pulsar 
cluster management (#9838)
 add fe982bc2b8 [INLONG-9820][Dashboard][Manager] Update Pulsar source 
field for Ingestion (#9821)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/inlong/agent/pojo/PulsarTask.java   |  6 +-
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |  2 +-
 .../plugins/sources/common/SourceDefaultInfo.ts|  1 +
 .../src/plugins/sources/defaults/Pulsar.ts | 91 +-
 inlong-dashboard/src/ui/locales/cn.json|  5 ++
 inlong-dashboard/src/ui/locales/en.json|  5 ++
 .../pojo/source/pulsar/PulsarSourceRequest.java|  3 +
 7 files changed, 105 insertions(+), 8 deletions(-)



Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


aloyszhang commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1528315770


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml:
##
@@ -43,6 +43,8 @@
 hbase
 hudi
 kafka
+redis
+

Review Comment:
   ```suggestion
   redis
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


aloyszhang commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1528320907


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/pom.xml:
##
@@ -0,0 +1,176 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+4.0.0
+
+org.apache.inlong
+sort-connectors-v1.15
+1.12.0-SNAPSHOT
+
+
+sort-connector-redis-v1.15
+Apache InLong - Sort-connector-redis
+
+
+
${project.parent.parent.parent.parent.parent.basedir}
+
+
+
+
+org.apache.flink
+flink-shaded-jackson
+2.12.4-15.0
+
+
+
+
+org.apache.flink
+flink-table-api-java-bridge
+1.15.4

Review Comment:
   replace with ${flink.version}



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


aloyszhang commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1528322815


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisLookupOptions.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Options for the Redis lookup.
+ */
+@Internal
+public class RedisLookupOptions implements Serializable {
+
+private static final long serialVersionUID = 1L;
+private static final int DEFAULT_MAX_RETRY_TIMES = 3;
+
+private final long cacheMaxSize;
+private final long cacheExpireMs;
+private final int maxRetryTimes;
+/**
+ * Asynchronous processing has not been implemented yet, but the entry is 
reserved
+ */
+private final boolean lookupAsync;
+
+public RedisLookupOptions(
+long cacheMaxSize, long cacheExpireMs, int maxRetryTimes, boolean 
lookupAsync) {
+this.cacheMaxSize = cacheMaxSize;
+this.cacheExpireMs = cacheExpireMs;
+this.maxRetryTimes = maxRetryTimes;
+this.lookupAsync = lookupAsync;
+}
+
+public static Builder builder() {
+return new Builder();
+}
+
+public long getCacheMaxSize() {
+return cacheMaxSize;
+}
+
+public long getCacheExpireMs() {
+return cacheExpireMs;
+}
+
+public int getMaxRetryTimes() {
+return maxRetryTimes;
+}
+
+public boolean getLookupAsync() {

Review Comment:
   Not used, plz remove 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


aloyszhang commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1528322815


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisLookupOptions.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Options for the Redis lookup.
+ */
+@Internal
+public class RedisLookupOptions implements Serializable {
+
+private static final long serialVersionUID = 1L;
+private static final int DEFAULT_MAX_RETRY_TIMES = 3;
+
+private final long cacheMaxSize;
+private final long cacheExpireMs;
+private final int maxRetryTimes;
+/**
+ * Asynchronous processing has not been implemented yet, but the entry is 
reserved
+ */
+private final boolean lookupAsync;
+
+public RedisLookupOptions(
+long cacheMaxSize, long cacheExpireMs, int maxRetryTimes, boolean 
lookupAsync) {
+this.cacheMaxSize = cacheMaxSize;
+this.cacheExpireMs = cacheExpireMs;
+this.maxRetryTimes = maxRetryTimes;
+this.lookupAsync = lookupAsync;
+}
+
+public static Builder builder() {
+return new Builder();
+}
+
+public long getCacheMaxSize() {
+return cacheMaxSize;
+}
+
+public long getCacheExpireMs() {
+return cacheExpireMs;
+}
+
+public int getMaxRetryTimes() {
+return maxRetryTimes;
+}
+
+public boolean getLookupAsync() {

Review Comment:
   Not used, plz remove 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


aloyszhang commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1528327113


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/InlongJedisConfigHandler.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.handler;
+
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler;
+
+/**
+ * handler to create flink jedis config.
+ */
+public interface InlongJedisConfigHandler extends RedisHandler {
+
+/**
+ * create flink jedis config use sepecified properties.

Review Comment:
   ```suggestion
* create flink jedis config use specified properties.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


XiaoYou201 commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1528407502


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisDataType.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Redis data type Enum.
+ * See detail: @see https://redis.io/topics/data-types-intro";>redis 
data type
+ */
+@Internal
+public enum RedisDataType {
+/**
+ * Redis strings commands are used for managing string values in Redis.
+ */
+PLAIN,
+/**
+ * A Redis hash is a data type that represents a mapping between a string 
field and a string value.
+ * There are two members in hash DataType: 
+ * the first member is Redis hash field.
+ * the second member is Redis hash value.
+ */
+HASH,
+/**
+ * Redis Sets are an unordered collection of unique strings.
+ * Unique means sets does not allow repetition of data in a key.
+ */
+SET,
+
+/**
+ * Bitmaps are not an actual data type, but a set of bit-oriented 
operations defined on the String type. 
+ * Since strings are binary safe blobs and their maximum length is 512 MB, 

+ * they are suitable to set up to 2^32 different bits.
+ */
+BITMAP,
+;

Review Comment:
   thanks suggestion, fixed



##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml:
##
@@ -43,6 +43,8 @@
 hbase
 hudi
 kafka
+redis
+

Review Comment:
   thanks suggestion, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


XiaoYou201 commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1528407874


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/pom.xml:
##
@@ -0,0 +1,176 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+4.0.0
+
+org.apache.inlong
+sort-connectors-v1.15
+1.12.0-SNAPSHOT
+
+
+sort-connector-redis-v1.15
+Apache InLong - Sort-connector-redis
+
+
+
${project.parent.parent.parent.parent.parent.basedir}
+
+
+
+
+org.apache.flink
+flink-shaded-jackson
+2.12.4-15.0
+
+
+
+
+org.apache.flink
+flink-table-api-java-bridge
+1.15.4

Review Comment:
   thanks suggestion, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


XiaoYou201 commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1528408728


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/InlongJedisConfigHandler.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.handler;
+
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler;
+
+/**
+ * handler to create flink jedis config.
+ */
+public interface InlongJedisConfigHandler extends RedisHandler {
+
+/**
+ * create flink jedis config use sepecified properties.

Review Comment:
   thanks suggestion, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-9839][Manager] Optimize the auto assign logic of SortStandalone cluster [inlong]

2024-03-18 Thread via GitHub


vernedeng opened a new pull request, #9840:
URL: https://github.com/apache/inlong/pull/9840

   
   
   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes #9839 
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve?*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-9753][Sort] module inlong-sort-standalone's conf/sid_es_v3.conf file does not support adding Apache license [inlong]

2024-03-18 Thread via GitHub


haibo-duan opened a new pull request, #9841:
URL: https://github.com/apache/inlong/pull/9841

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes #9753
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve?*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9839][Manager] Optimize the auto assign logic of SortStandalone cluster [inlong]

2024-03-18 Thread via GitHub


aloyszhang commented on code in PR #9840:
URL: https://github.com/apache/inlong/pull/9840#discussion_r1529589155


##
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java:
##
@@ -161,6 +161,6 @@ List selectExistsStreamId(@Param("groupId") String 
groupId, @Param("sink
  */
 int deleteByInlongGroupIds(@Param("groupIdList") List groupIdList);
 
-String selectAssignedCluster(@Param("dataNodeName") String dataNodeName);
+String selectAssignedCluster(@Param("dataNodeName") String dataNodeName, 
@Param("clusterTag") String clusterTag);

Review Comment:
   Does this modification only affect SortStandalone?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


EMsnap commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1529610943


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java:
##
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.NetUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.embedded.RedisServer;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.*;
+
+public class RedisTableTest {
+
+private static int redisPort;
+
+private static RedisServer redisServer;
+
+@BeforeClass
+public static void setup() {
+redisPort = NetUtils.getAvailablePort().getPort();
+// redisPort = 6379;
+redisServer = new RedisServer(redisPort);
+redisServer.start();
+}
+
+@AfterClass
+public static void cleanup() {
+if (redisServer != null) {
+redisServer.stop();
+}
+}
+
+@Before
+public void prepare() {
+Jedis jedis = new Jedis("localhost", redisPort);
+// Deletes all keys from all databases.
+jedis.flushAll();
+}
+@Test
+public void testSourceWithGet() {
+StreamExecutionEnvironment executionEnv =
+StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv =
+StreamTableEnvironment.create(executionEnv);
+Jedis jedis = new Jedis("127.0.0.1", redisPort);
+
+jedis.set("1", "value_1");
+jedis.set("2", "value_2");
+
+String dim = "CREATE TABLE dim (" +
+"aaa varchar, bbb varchar" +
+// " PRIMARY KEY (`key`) NOT ENFORCED" +
+") WITH (" +
+"  'connector' = 'redis-inlong'," +
+"  'command' = 'get'," +
+"  'host' = 'localhost'," +
+"  'port' = '" + redisPort + "'," +
+"  'maxIdle' = '8'," +
+"  'minIdle' = '1'," +
+"  'maxTotal' = '2'," +
+"  'timeout' = '2000'" +
+")";
+String source =
+"create table source(aaa varchar, proctime as procTime()) "
++ "with ('connector'='datagen',  
'rows-per-second'='1', "
++ "'fields.aaa.kind'='sequence',  
'fields.aaa.start'='1',  'fields.aaa.end'='2'"
++ ")";
+String sink = "CREATE TABLE sink (" +
+"aaa STRING," +
+"bbb STRING," +
+"PRIMARY KEY (`aaa`) NOT ENFORCED" +
+") WITH (" +
+"  'connector' = 'redis-inlong'," +
+"  'sink.batch-size' = '1'," +
+"  'format' = 'csv'," +
+"  'data-type' = 'PLAIN'," +
+"  'redis-mode' = 'standalone'," +
+"  'host' = '127.0.0.1'," +
+"  'port' = '" + redisPort + "'," +
+"  'maxIdle' = '8'," +
+"  'minIdle' = '1'," +
+"  'maxTotal' = '2'," +
+"  'timeout' = '2000'" +
+")";
+
+tableEnv.executeSql(dim);
+tableEnv.executeSql(source);
+tableEnv.executeSql(sink);
+String sql =
+" insert into sink"
++ " select concat_ws('_', s.aaa, s.aaa),concat_ws('_', 
d.bbb, s.aaa) from source s"
++ "  left join dim for system_time as of s.proctime as 
d "
++ " on d.aaa = s.aaa";
+
+tableEnv.executeSql(sql);
+
+await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+asser

Re: [PR] [INLONG-9835][Sort] Add Redis connector on flink 1.15 [inlong]

2024-03-18 Thread via GitHub


EMsnap commented on code in PR #9836:
URL: https://github.com/apache/inlong/pull/9836#discussion_r1529615263


##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java:
##
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.NetUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.embedded.RedisServer;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.*;
+
+public class RedisTableTest {
+
+private static int redisPort;
+
+private static RedisServer redisServer;
+
+@BeforeClass
+public static void setup() {
+redisPort = NetUtils.getAvailablePort().getPort();
+// redisPort = 6379;

Review Comment:
   remove useless code 



##
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java:
##
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.NetUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.embedded.RedisServer;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.*;
+
+public class RedisTableTest {
+
+private static int redisPort;
+
+private static RedisServer redisServer;
+
+@BeforeClass
+public static void setup() {
+redisPort = NetUtils.getAvailablePort().getPort();
+// redisPort = 6379;
+redisServer = new RedisServer(redisPort);
+redisServer.start();
+}
+
+@AfterClass
+public static void cleanup() {
+if (redisServer != null) {
+redisServer.stop();
+}
+}
+
+@Before
+public void prepare() {
+Jedis jedis = new Jedis("localhost", redisPort);
+// Deletes all keys from all databases.
+jedis.flushAll();
+}
+@Test
+public void testSourceWithGet() {
+StreamExecutionEnvironment executionEnv =
+StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv =
+StreamTableEnvironment.create(executionEnv);
+Jedis jedis = new Jedis("127.0.0.1", redisPort);
+
+jedis.set("1", "value_1");
+jedis.set("2", "value_2");
+
+String dim = "CREATE TABLE dim (" +
+"aaa varchar, bbb varchar" +
+// " PRIMARY KEY (`key`) NOT ENFORCED" +
+") WITH (" +
+"  'conn

Re: [PR] [INLONG-9753][Sort] Module inlong-sort-standalone's conf/sid_es_v3.conf file does not support adding Apache license [inlong]

2024-03-18 Thread via GitHub


dockerzhang merged PR #9841:
URL: https://github.com/apache/inlong/pull/9841


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated (fe982bc2b8 -> b82f908cbf)

2024-03-18 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from fe982bc2b8 [INLONG-9820][Dashboard][Manager] Update Pulsar source 
field for Ingestion (#9821)
 add b82f908cbf [INLONG-9753][Sort] Module inlong-sort-standalone's 
conf/sid_es_v3.conf file does not support adding Apache license (#9841)

No new revisions were added by this update.

Summary of changes:
 .../standalone/config/loader/ClassResourceQueryConsumeConfig.java | 4 
 1 file changed, 4 insertions(+)