[PR] [INLONG-11030][SDK] Add AVRO formatted data source for Transform [inlong]

2024-09-11 Thread via GitHub


ying-hua opened a new pull request, #11082:
URL: https://github.com/apache/inlong/pull/11082

   
   
   
   
   Fixes #11030
   
   ### Motivation
   
   
   Transform add avro source data. Currently, the transform supports parsing 
Avro data types including int, float, long double, string, boolean, bytes, 
fixed, enum.
   
   Add some classes: 
   - AvroSourceInfo
   - AvroNode
   - AvroSourceData
   - AvroSourceDecoder
   - TestAvro2CsvProcessor
   
   Modified:
   - SourceDecoderFactory
   - AbstractProcessorTestBase
   
   ### Modifications
   
   
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-11064][SDK] Transform SQL supports NULLIF function [inlong]

2024-09-11 Thread via GitHub


luchunliang commented on code in PR #11078:
URL: https://github.com/apache/inlong/pull/11078#discussion_r1754107629


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NullIfFunction.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import lombok.extern.slf4j.Slf4j;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+
+/**
+ * NullIfFunction
+ * description: NULLIF(expr1,expr2)
+ * - return NULL if expr1 = expr2 is true
+ * - returns expr1 otherwise
+ */
+@Slf4j
+@TransformFunction(names = {"nullif", "null_if"})

Review Comment:
   Genenal function name is ifnull.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-11064][SDK] Transform SQL supports NULLIF function [inlong]

2024-09-11 Thread via GitHub


Zkplo commented on code in PR #11078:
URL: https://github.com/apache/inlong/pull/11078#discussion_r1754194450


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NullIfFunction.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import lombok.extern.slf4j.Slf4j;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+
+/**
+ * NullIfFunction
+ * description: NULLIF(expr1,expr2)
+ * - return NULL if expr1 = expr2 is true
+ * - returns expr1 otherwise
+ */
+@Slf4j
+@TransformFunction(names = {"nullif", "null_if"})

Review Comment:
   According to the MySQL official website documentation( 
https://dev.mysql.com/doc/refman/8.4/en/flow-control-functions.html#function_nullif
 ), both ifnull and nullif exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO closed pull request #11083: [INLONG-10287][Agent] Update the Redis 
Source
URL: https://github.com/apache/inlong/pull/11083


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO opened a new pull request, #11083:
URL: https://github.com/apache/inlong/pull/11083

   
   
   
   
   Fixes #10287 
   
   ### Motivation
   this pull request is a enhancement of the previous pr #10801 
   Complete the task requirement mentioned in pr #10801 
   * Two modes are provided. One uses command and the other uses the 
subscription . 
   * To select a command for subscription, you need to configure one more 
parameter `is_subscribe`
   * For cases where there are many keys, suggest to use a batch query 
interface like `mget` 
   * For cases where there are many keys but still use the `get` command, use 
`jedis.pipeline` to reduce io costs
   * Limit the maximum size of a single piece of data, if the data size exceeds 
500k , automatically skip and discard the data
   
   
   
   ### Modifications
   
   * Modify TaskProfileDto and RedisTask to accept and parse the configuration.
   * Add RedisTask, RedisInstance, and RedisSource classes
   * Provide Unit Test
   
   
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO opened a new pull request, #11084:
URL: https://github.com/apache/inlong/pull/11084

   
   
   
   
   Fixes #10287 
   
   ### Motivation
   this pull request is a enhancement of the previous pr #10801 
   Complete the task requirement mentioned in pr #10801 
   * Two modes are provided. One uses command and the other uses the 
subscription . 
   * To select a command for subscription, you need to configure one more 
parameter `is_subscribe`
   * For cases where there are many keys, suggest to use a batch query 
interface like `mget` 
   * For cases where there are many keys but still use the `get` command, use 
`jedis.pipeline` to reduce io costs
   * Limit the maximum size of a single piece of data, if the data size exceeds 
500k , automatically skip and discard the data
   
   
   
   ### Modifications
   
   * Modify TaskProfileDto and RedisTask to accept and parse the configuration.
   * Add RedisTask, RedisInstance, and RedisSource classes
   * Provide Unit Test
   
   
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-11005][SDK] Add YAML formatted data source for Transform [inlong]

2024-09-11 Thread via GitHub


emptyOVO opened a new pull request, #11085:
URL: https://github.com/apache/inlong/pull/11085

   
   
   
   
   Fixes #11005 
   
   ### Motivation
   add YamlSourceData, YamlSourceDecoder, YamlSourceInfo, provide unit tests
   
   
   ### Modifications
   
   
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


aloyszhang commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756081633


##
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.metric.MetricRegister;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Test cases for {@link RedisSource}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestRedisSource {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(TestRedisSource.class);
+
+private static AgentBaseTestsHelper helper;
+
+private final String instanceId = "s4bc475560bdbd4e9812ab1fd64d";
+
+private static Store taskBasicStore;
+private static Store instanceBasicStore;
+private static Store offsetBasicStore;
+
+@Mock
+private InstanceProfile profile;
+
+@Mock
+private Jedis jedis;
+
+@Mock
+private Pipeline pipeline;
+
+@Mock
+private ScheduledExecutorService executor;
+
+@InjectMocks
+private RedisSource redisSource;
+
+@Before
+public void setUp() {
+helper = new 
AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome();
+taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
+instanceBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
+offsetBasicStore =
+TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET);
+OffsetManager.init(taskBasicStore, instanceBasicStore, 
offsetBasicStore);
+mockStatic(Executors.class);
+when(Executors.newSingleThreadExecutor()).thenReturn(executor);
+when(Executors.newScheduledThreadPool(1)).thenReturn(executor);
+initProfile();
+}
+
+private void initProfile() {
+final String username = "";
+final String password = "123456";
+final String hostname = "127.0.0.1";
+final String port = "6379";
+final String groupId = "group01";
+final String streamId = "stream01";
+final String keys = "age,name,sex";
+final String command = "zscore";
+
+TaskProfile taskProfile = helper.getTaskP

Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


aloyszhang commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756081863


##
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.metric.MetricRegister;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Test cases for {@link RedisSource}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestRedisSource {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(TestRedisSource.class);
+
+private static AgentBaseTestsHelper helper;
+
+private final String instanceId = "s4bc475560bdbd4e9812ab1fd64d";
+
+private static Store taskBasicStore;
+private static Store instanceBasicStore;
+private static Store offsetBasicStore;
+
+@Mock
+private InstanceProfile profile;
+
+@Mock
+private Jedis jedis;
+
+@Mock
+private Pipeline pipeline;
+
+@Mock
+private ScheduledExecutorService executor;
+
+@InjectMocks
+private RedisSource redisSource;
+
+@Before
+public void setUp() {
+helper = new 
AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome();
+taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
+instanceBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
+offsetBasicStore =
+TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET);
+OffsetManager.init(taskBasicStore, instanceBasicStore, 
offsetBasicStore);
+mockStatic(Executors.class);
+when(Executors.newSingleThreadExecutor()).thenReturn(executor);
+when(Executors.newScheduledThreadPool(1)).thenReturn(executor);
+initProfile();
+}
+
+private void initProfile() {
+final String username = "";
+final String password = "123456";
+final String hostname = "127.0.0.1";
+final String port = "6379";
+final String groupId = "group01";
+final String streamId = "stream01";
+final String keys = "age,name,sex";
+final String command = "zscore";
+
+TaskProfile taskProfile = helper.getTaskP

Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


justinwwhuang commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756095808


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##
@@ -17,54 +17,279 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.cmd.CommandName;
+import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand;
+import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.exceptions.JedisConnectionException;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Redis source
  */
 public class RedisSource extends AbstractSource {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisSource.class);
+private static final long MAX_DATA_SIZE = 500 * 1024;
+private static final int REDIS_QUEUE_SIZE = 1;
+private Gson gson;
+
+public InstanceProfile profile;
+private String port;
+private Jedis jedis;
+private String hostName;
+private boolean ssl;
+private String authUser;
+private String authPassword;
+private String readTimeout;
+private String replId;
+private String snapShot;
+private String dbNumber;
+private String redisCommand;
+
+private String fieldOrMember;
+private boolean destroyed;
+private boolean isSubscribe;
+private Set keys;
+private Replicator redisReplicator;
+private BlockingQueue redisQueue;
+private ScheduledExecutorService executor;
 
 public RedisSource() {
 
 }
 
 @Override
 protected String getThreadName() {
-return null;
+return "redis-source-" + taskId + "-" + instanceId;
 }
 
 @Override
 protected void initSource(InstanceProfile profile) {
+LOGGER.info("Redis Source init: {}", profile.toJsonStr());
+this.port = profile.get(TaskConstants.TASK_REDIS_PORT);
+this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME);
+this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false);
+this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, "");
+this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, 
"");
+this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, 
"");
+this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, "");
+this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1");
+this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0");
+this.keys = new 
ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(",")));
+this.isSubscribe = 
profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false);
+this.instanceId = profile.getInstanceId();
+this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE);
+initGson();
+String uri = getRedisUri();
+try {
+if (isSubscribe) {
+// use subscribe mode
+this.execu

Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


justinwwhuang commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756099335


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##
@@ -17,54 +17,279 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.cmd.CommandName;
+import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand;
+import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.exceptions.JedisConnectionException;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Redis source
  */
 public class RedisSource extends AbstractSource {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisSource.class);
+private static final long MAX_DATA_SIZE = 500 * 1024;
+private static final int REDIS_QUEUE_SIZE = 1;
+private Gson gson;
+
+public InstanceProfile profile;
+private String port;
+private Jedis jedis;
+private String hostName;
+private boolean ssl;
+private String authUser;
+private String authPassword;
+private String readTimeout;
+private String replId;
+private String snapShot;
+private String dbNumber;
+private String redisCommand;
+
+private String fieldOrMember;
+private boolean destroyed;
+private boolean isSubscribe;
+private Set keys;
+private Replicator redisReplicator;
+private BlockingQueue redisQueue;
+private ScheduledExecutorService executor;
 
 public RedisSource() {
 
 }
 
 @Override
 protected String getThreadName() {
-return null;
+return "redis-source-" + taskId + "-" + instanceId;
 }
 
 @Override
 protected void initSource(InstanceProfile profile) {
+LOGGER.info("Redis Source init: {}", profile.toJsonStr());
+this.port = profile.get(TaskConstants.TASK_REDIS_PORT);
+this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME);
+this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false);
+this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, "");
+this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, 
"");
+this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, 
"");
+this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, "");
+this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1");
+this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0");
+this.keys = new 
ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(",")));
+this.isSubscribe = 
profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false);
+this.instanceId = profile.getInstanceId();
+this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE);
+initGson();
+String uri = getRedisUri();
+try {
+if (isSubscribe) {
+// use subscribe mode
+this.execu

Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


justinwwhuang commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756102202


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##
@@ -84,6 +321,272 @@ public boolean sourceFinish() {
 
 @Override
 public boolean sourceExist() {
-return false;
+return true;
+}
+
+private String getRedisUri() {
+StringBuffer sb = new StringBuffer("redis://");
+sb.append(hostName).append(":").append(port);
+if (!StringUtils.isEmpty(dbNumber)) {
+sb.append("/").append(dbNumber);
+}
+sb.append("?");
+if (!StringUtils.isEmpty(authPassword)) {
+sb.append("authPassword=").append(authPassword).append("&");
+}
+if (!StringUtils.isEmpty(authUser)) {
+sb.append("authUser=").append(authUser).append("&");
+}
+if (!StringUtils.isEmpty(readTimeout)) {
+sb.append("readTimeout=").append(readTimeout).append("&");
+}
+if (ssl) {
+sb.append("ssl=").append("yes").append("&");
+}
+if (!StringUtils.isEmpty(snapShot)) {
+sb.append("replOffset=").append(snapShot).append("&");
+}
+if (!StringUtils.isEmpty(replId)) {
+sb.append("replId=").append(replId).append("&");
+}
+if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == 
'&') {
+sb.deleteCharAt(sb.length() - 1);
+}
+return sb.toString();
+}
+
+private void initReplicator() {
+DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+this.redisReplicator.addCommandParser(CommandName.name("APPEND"), 
defaultCommandParser);

Review Comment:
   Is this subscribing to all operations? Can it be configured here? For 
example, we only need to subscribe to a few of these operations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756115922


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##
@@ -84,6 +321,272 @@ public boolean sourceFinish() {
 
 @Override
 public boolean sourceExist() {
-return false;
+return true;
+}
+
+private String getRedisUri() {
+StringBuffer sb = new StringBuffer("redis://");
+sb.append(hostName).append(":").append(port);
+if (!StringUtils.isEmpty(dbNumber)) {
+sb.append("/").append(dbNumber);
+}
+sb.append("?");
+if (!StringUtils.isEmpty(authPassword)) {
+sb.append("authPassword=").append(authPassword).append("&");
+}
+if (!StringUtils.isEmpty(authUser)) {
+sb.append("authUser=").append(authUser).append("&");
+}
+if (!StringUtils.isEmpty(readTimeout)) {
+sb.append("readTimeout=").append(readTimeout).append("&");
+}
+if (ssl) {
+sb.append("ssl=").append("yes").append("&");
+}
+if (!StringUtils.isEmpty(snapShot)) {
+sb.append("replOffset=").append(snapShot).append("&");
+}
+if (!StringUtils.isEmpty(replId)) {
+sb.append("replId=").append(replId).append("&");
+}
+if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == 
'&') {
+sb.deleteCharAt(sb.length() - 1);
+}
+return sb.toString();
+}
+
+private void initReplicator() {
+DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+this.redisReplicator.addCommandParser(CommandName.name("APPEND"), 
defaultCommandParser);

Review Comment:
   In theory, yes, but I think that since the user is using a subscription 
model, they should be more sensitive to real-time changes in the data, so I 
have all the Key modification commands listened in here, Do i need to change 
this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756120543


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##
@@ -84,6 +321,272 @@ public boolean sourceFinish() {
 
 @Override
 public boolean sourceExist() {
-return false;
+return true;
+}
+
+private String getRedisUri() {
+StringBuffer sb = new StringBuffer("redis://");
+sb.append(hostName).append(":").append(port);
+if (!StringUtils.isEmpty(dbNumber)) {
+sb.append("/").append(dbNumber);
+}
+sb.append("?");
+if (!StringUtils.isEmpty(authPassword)) {
+sb.append("authPassword=").append(authPassword).append("&");
+}
+if (!StringUtils.isEmpty(authUser)) {
+sb.append("authUser=").append(authUser).append("&");
+}
+if (!StringUtils.isEmpty(readTimeout)) {
+sb.append("readTimeout=").append(readTimeout).append("&");
+}
+if (ssl) {
+sb.append("ssl=").append("yes").append("&");
+}
+if (!StringUtils.isEmpty(snapShot)) {
+sb.append("replOffset=").append(snapShot).append("&");
+}
+if (!StringUtils.isEmpty(replId)) {
+sb.append("replId=").append(replId).append("&");
+}
+if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == 
'&') {
+sb.deleteCharAt(sb.length() - 1);
+}
+return sb.toString();
+}
+
+private void initReplicator() {
+DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+this.redisReplicator.addCommandParser(CommandName.name("APPEND"), 
defaultCommandParser);

Review Comment:
   Is there a causal order problem with just subscribing to some of these 
operations?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756136763


##
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.metric.MetricRegister;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Test cases for {@link RedisSource}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestRedisSource {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(TestRedisSource.class);
+
+private static AgentBaseTestsHelper helper;
+
+private final String instanceId = "s4bc475560bdbd4e9812ab1fd64d";
+
+private static Store taskBasicStore;
+private static Store instanceBasicStore;
+private static Store offsetBasicStore;
+
+@Mock
+private InstanceProfile profile;
+
+@Mock
+private Jedis jedis;
+
+@Mock
+private Pipeline pipeline;
+
+@Mock
+private ScheduledExecutorService executor;
+
+@InjectMocks
+private RedisSource redisSource;
+
+@Before
+public void setUp() {
+helper = new 
AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome();
+taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
+instanceBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
+offsetBasicStore =
+TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET);
+OffsetManager.init(taskBasicStore, instanceBasicStore, 
offsetBasicStore);
+mockStatic(Executors.class);
+when(Executors.newSingleThreadExecutor()).thenReturn(executor);
+when(Executors.newScheduledThreadPool(1)).thenReturn(executor);
+initProfile();
+}
+
+private void initProfile() {
+final String username = "";
+final String password = "123456";
+final String hostname = "127.0.0.1";
+final String port = "6379";
+final String groupId = "group01";
+final String streamId = "stream01";
+final String keys = "age,name,sex";
+final String command = "zscore";
+
+TaskProfile taskProfile = helper.getTaskPro

Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756137158


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##
@@ -17,54 +17,279 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.cmd.CommandName;
+import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand;
+import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.exceptions.JedisConnectionException;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Redis source
  */
 public class RedisSource extends AbstractSource {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisSource.class);
+private static final long MAX_DATA_SIZE = 500 * 1024;
+private static final int REDIS_QUEUE_SIZE = 1;
+private Gson gson;
+
+public InstanceProfile profile;
+private String port;
+private Jedis jedis;
+private String hostName;
+private boolean ssl;
+private String authUser;
+private String authPassword;
+private String readTimeout;
+private String replId;
+private String snapShot;
+private String dbNumber;
+private String redisCommand;
+
+private String fieldOrMember;
+private boolean destroyed;
+private boolean isSubscribe;
+private Set keys;
+private Replicator redisReplicator;
+private BlockingQueue redisQueue;
+private ScheduledExecutorService executor;
 
 public RedisSource() {
 
 }
 
 @Override
 protected String getThreadName() {
-return null;
+return "redis-source-" + taskId + "-" + instanceId;
 }
 
 @Override
 protected void initSource(InstanceProfile profile) {
+LOGGER.info("Redis Source init: {}", profile.toJsonStr());
+this.port = profile.get(TaskConstants.TASK_REDIS_PORT);
+this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME);
+this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false);
+this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, "");
+this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, 
"");
+this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, 
"");
+this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, "");
+this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1");
+this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0");
+this.keys = new 
ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(",")));
+this.isSubscribe = 
profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false);
+this.instanceId = profile.getInstanceId();
+this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE);
+initGson();
+String uri = getRedisUri();
+try {
+if (isSubscribe) {
+// use subscribe mode
+this.executor =

Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756138334


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##
@@ -17,54 +17,279 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.cmd.CommandName;
+import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand;
+import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.exceptions.JedisConnectionException;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Redis source
  */
 public class RedisSource extends AbstractSource {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisSource.class);
+private static final long MAX_DATA_SIZE = 500 * 1024;
+private static final int REDIS_QUEUE_SIZE = 1;
+private Gson gson;
+
+public InstanceProfile profile;
+private String port;
+private Jedis jedis;
+private String hostName;
+private boolean ssl;
+private String authUser;
+private String authPassword;
+private String readTimeout;
+private String replId;
+private String snapShot;
+private String dbNumber;
+private String redisCommand;
+
+private String fieldOrMember;
+private boolean destroyed;
+private boolean isSubscribe;
+private Set keys;
+private Replicator redisReplicator;
+private BlockingQueue redisQueue;
+private ScheduledExecutorService executor;
 
 public RedisSource() {
 
 }
 
 @Override
 protected String getThreadName() {
-return null;
+return "redis-source-" + taskId + "-" + instanceId;
 }
 
 @Override
 protected void initSource(InstanceProfile profile) {
+LOGGER.info("Redis Source init: {}", profile.toJsonStr());
+this.port = profile.get(TaskConstants.TASK_REDIS_PORT);
+this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME);
+this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false);
+this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, "");
+this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, 
"");
+this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, 
"");
+this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, "");
+this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1");
+this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0");
+this.keys = new 
ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(",")));
+this.isSubscribe = 
profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false);
+this.instanceId = profile.getInstanceId();
+this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE);
+initGson();
+String uri = getRedisUri();
+try {
+if (isSubscribe) {
+// use subscribe mode
+this.executor =

Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756145612


##
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.metric.MetricRegister;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Test cases for {@link RedisSource}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestRedisSource {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(TestRedisSource.class);
+
+private static AgentBaseTestsHelper helper;
+
+private final String instanceId = "s4bc475560bdbd4e9812ab1fd64d";
+
+private static Store taskBasicStore;
+private static Store instanceBasicStore;
+private static Store offsetBasicStore;
+
+@Mock
+private InstanceProfile profile;
+
+@Mock
+private Jedis jedis;
+
+@Mock
+private Pipeline pipeline;
+
+@Mock
+private ScheduledExecutorService executor;
+
+@InjectMocks
+private RedisSource redisSource;
+
+@Before
+public void setUp() {
+helper = new 
AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome();
+taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
+instanceBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
+offsetBasicStore =
+TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET);
+OffsetManager.init(taskBasicStore, instanceBasicStore, 
offsetBasicStore);
+mockStatic(Executors.class);
+when(Executors.newSingleThreadExecutor()).thenReturn(executor);
+when(Executors.newScheduledThreadPool(1)).thenReturn(executor);
+initProfile();
+}
+
+private void initProfile() {
+final String username = "";
+final String password = "123456";
+final String hostname = "127.0.0.1";
+final String port = "6379";
+final String groupId = "group01";
+final String streamId = "stream01";
+final String keys = "age,name,sex";
+final String command = "zscore";
+
+TaskProfile taskProfile = helper.getTaskPro

Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756115922


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##
@@ -84,6 +321,272 @@ public boolean sourceFinish() {
 
 @Override
 public boolean sourceExist() {
-return false;
+return true;
+}
+
+private String getRedisUri() {
+StringBuffer sb = new StringBuffer("redis://");
+sb.append(hostName).append(":").append(port);
+if (!StringUtils.isEmpty(dbNumber)) {
+sb.append("/").append(dbNumber);
+}
+sb.append("?");
+if (!StringUtils.isEmpty(authPassword)) {
+sb.append("authPassword=").append(authPassword).append("&");
+}
+if (!StringUtils.isEmpty(authUser)) {
+sb.append("authUser=").append(authUser).append("&");
+}
+if (!StringUtils.isEmpty(readTimeout)) {
+sb.append("readTimeout=").append(readTimeout).append("&");
+}
+if (ssl) {
+sb.append("ssl=").append("yes").append("&");
+}
+if (!StringUtils.isEmpty(snapShot)) {
+sb.append("replOffset=").append(snapShot).append("&");
+}
+if (!StringUtils.isEmpty(replId)) {
+sb.append("replId=").append(replId).append("&");
+}
+if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == 
'&') {
+sb.deleteCharAt(sb.length() - 1);
+}
+return sb.toString();
+}
+
+private void initReplicator() {
+DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+this.redisReplicator.addCommandParser(CommandName.name("APPEND"), 
defaultCommandParser);

Review Comment:
   In theory, yes, but I think that since the user is using a subscription 
model, they should be more sensitive to real-time changes in the data, so I 
have all the Key modification commands listened in here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756154385


##
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.metric.MetricRegister;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Test cases for {@link RedisSource}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestRedisSource {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(TestRedisSource.class);
+
+private static AgentBaseTestsHelper helper;
+
+private final String instanceId = "s4bc475560bdbd4e9812ab1fd64d";
+
+private static Store taskBasicStore;
+private static Store instanceBasicStore;
+private static Store offsetBasicStore;
+
+@Mock
+private InstanceProfile profile;
+
+@Mock
+private Jedis jedis;
+
+@Mock
+private Pipeline pipeline;
+
+@Mock
+private ScheduledExecutorService executor;
+
+@InjectMocks
+private RedisSource redisSource;
+
+@Before
+public void setUp() {
+helper = new 
AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome();
+taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
+instanceBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
+offsetBasicStore =
+TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET);
+OffsetManager.init(taskBasicStore, instanceBasicStore, 
offsetBasicStore);
+mockStatic(Executors.class);
+when(Executors.newSingleThreadExecutor()).thenReturn(executor);
+when(Executors.newScheduledThreadPool(1)).thenReturn(executor);
+initProfile();
+}
+
+private void initProfile() {
+final String username = "";
+final String password = "123456";
+final String hostname = "127.0.0.1";
+final String port = "6379";
+final String groupId = "group01";
+final String streamId = "stream01";
+final String keys = "age,name,sex";
+final String command = "zscore";
+
+TaskProfile taskProfile = helper.getTaskPro

Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756115922


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##
@@ -84,6 +321,272 @@ public boolean sourceFinish() {
 
 @Override
 public boolean sourceExist() {
-return false;
+return true;
+}
+
+private String getRedisUri() {
+StringBuffer sb = new StringBuffer("redis://");
+sb.append(hostName).append(":").append(port);
+if (!StringUtils.isEmpty(dbNumber)) {
+sb.append("/").append(dbNumber);
+}
+sb.append("?");
+if (!StringUtils.isEmpty(authPassword)) {
+sb.append("authPassword=").append(authPassword).append("&");
+}
+if (!StringUtils.isEmpty(authUser)) {
+sb.append("authUser=").append(authUser).append("&");
+}
+if (!StringUtils.isEmpty(readTimeout)) {
+sb.append("readTimeout=").append(readTimeout).append("&");
+}
+if (ssl) {
+sb.append("ssl=").append("yes").append("&");
+}
+if (!StringUtils.isEmpty(snapShot)) {
+sb.append("replOffset=").append(snapShot).append("&");
+}
+if (!StringUtils.isEmpty(replId)) {
+sb.append("replId=").append(replId).append("&");
+}
+if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == 
'&') {
+sb.deleteCharAt(sb.length() - 1);
+}
+return sb.toString();
+}
+
+private void initReplicator() {
+DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+this.redisReplicator.addCommandParser(CommandName.name("APPEND"), 
defaultCommandParser);

Review Comment:
   In theory, yes, but I think that since the user is using a subscription 
model, they should be more sensitive to real-time changes in the data, so I 
have all the Key modification commands listened in here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-09-11 Thread via GitHub


emptyOVO commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756120543


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##
@@ -84,6 +321,272 @@ public boolean sourceFinish() {
 
 @Override
 public boolean sourceExist() {
-return false;
+return true;
+}
+
+private String getRedisUri() {
+StringBuffer sb = new StringBuffer("redis://");
+sb.append(hostName).append(":").append(port);
+if (!StringUtils.isEmpty(dbNumber)) {
+sb.append("/").append(dbNumber);
+}
+sb.append("?");
+if (!StringUtils.isEmpty(authPassword)) {
+sb.append("authPassword=").append(authPassword).append("&");
+}
+if (!StringUtils.isEmpty(authUser)) {
+sb.append("authUser=").append(authUser).append("&");
+}
+if (!StringUtils.isEmpty(readTimeout)) {
+sb.append("readTimeout=").append(readTimeout).append("&");
+}
+if (ssl) {
+sb.append("ssl=").append("yes").append("&");
+}
+if (!StringUtils.isEmpty(snapShot)) {
+sb.append("replOffset=").append(snapShot).append("&");
+}
+if (!StringUtils.isEmpty(replId)) {
+sb.append("replId=").append(replId).append("&");
+}
+if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == 
'&') {
+sb.deleteCharAt(sb.length() - 1);
+}
+return sb.toString();
+}
+
+private void initReplicator() {
+DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+this.redisReplicator.addCommandParser(CommandName.name("APPEND"), 
defaultCommandParser);

Review Comment:
   Is there a causal order problem with just subscribing to some of these 
operations?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-11081][SDK] Transform SQL supports INTERVAL parse [inlong]

2024-09-11 Thread via GitHub


Zkplo opened a new pull request, #11086:
URL: https://github.com/apache/inlong/pull/11086

   
   
   
   
   Fixes #11081 
   
   ### Motivation
   Based on 
[Mysql](https://dev.mysql.com/doc/refman/8.4/en/expressions.html#temporal-intervals):
   1. Added IntervalParser class
   ```
   
   IntervalParser  <-> INTERVAL expr unit ->  
Pair(factor,Map(ChronoField,Count)):
   `factor`:
  1) `expr` can accept strings starting with '-', representing the meaning 
of subtraction.
 So the positive or negative sign of `factor` indicates whether `expr` 
starts with a '-' or not.
  2) For units like WEEK and QUARTER, it is not easy to parse,
 so WEEK -> ( unit=DAY, adb(factor)=7 ); QUARTER -> ( unit=MONTH, 
adb(factor)=3 ).
   `Map(ChronoField,Count)`:
  IntervalParser will automatically match the corresponding 
DateTimeFormatter based on the input `expr`,
  Based on DateTimeFormatter, IntervalParser will parse the incoming units 
and store them in a Map.
   
In addition,acceptable expression parsing and specifying parameters in two 
ways:
1) interval rowName year -> expression
3) interval 2 year   -> fixed parameter
   ```
   2. Modified AdditionParser and SubtractionParser to have the ability of time 
calculation.
   3. Also, add the corresponding unit test classe: TestAdditionParser, 
TestSubtractionParser
   
   ### Modifications
   
   
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-11044][SDK] Transform SQL supports string judgment functions(IS_ALPHA、IS_DECIMAL、IS_DIGIT) [inlong]

2024-09-11 Thread via GitHub


dockerzhang merged PR #11052:
URL: https://github.com/apache/inlong/pull/11052


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-11044][SDK] Transform SQL supports string judgment functions(IS_ALPHA、IS_DECIMAL、IS_DIGIT) (#11052)

2024-09-11 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 4cdce2c585 [INLONG-11044][SDK] Transform SQL supports string judgment 
functions(IS_ALPHA、IS_DECIMAL、IS_DIGIT) (#11052)
4cdce2c585 is described below

commit 4cdce2c5855b1c872372d9e5ffae02e50452cb23
Author: Zkplo <87751516+zk...@users.noreply.github.com>
AuthorDate: Thu Sep 12 14:40:47 2024 +0800

[INLONG-11044][SDK] Transform SQL supports string judgment 
functions(IS_ALPHA、IS_DECIMAL、IS_DIGIT) (#11052)

Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com>
---
 .../process/function/IsAlphaFunction.java  | 60 
 .../process/function/IsDecimalFunction.java| 54 +++
 .../process/function/IsDigitFunction.java  | 59 
 .../function/string/TestIsAlphaFunction.java   | 74 
 .../function/string/TestIsDecimalFunction.java | 80 ++
 .../function/string/TestIsDigitFunction.java   | 80 ++
 6 files changed, 407 insertions(+)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsAlphaFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsAlphaFunction.java
new file mode 100644
index 00..54f54aee9c
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsAlphaFunction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Function;
+
+/**
+ * IsAlphaFunction
+ * description: is_alpha(string)
+ * - return true if all characters in string are letter
+ * - return false otherwise (Including cases where string is null and '').
+ */
+@TransformFunction(names = {"is_alpha"})
+public class IsAlphaFunction implements ValueParser {
+
+private final ValueParser stringParser;
+
+public IsAlphaFunction(Function expr) {
+stringParser = 
OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
+}
+
+@Override
+public Object parse(SourceData sourceData, int rowIndex, Context context) {
+Object stringObject = stringParser.parse(sourceData, rowIndex, 
context);
+if (stringObject == null) {
+return false;
+}
+String string = OperatorTools.parseString(stringObject);
+if (string.isEmpty()) {
+return false;
+}
+for (char chr : string.toCharArray()) {
+if ((chr >= 'a' && chr <= 'z') || (chr >= 'A' && chr <= 'Z')) {
+continue;
+}
+return false;
+}
+return true;
+}
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsDecimalFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsDecimalFunction.java
new file mode 100644
index 00..d22f3ddaf6
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsDecimalFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the Lic

Re: [PR] [INLONG-11064][SDK] Transform SQL supports NULLIF function [inlong]

2024-09-11 Thread via GitHub


dockerzhang merged PR #11078:
URL: https://github.com/apache/inlong/pull/11078


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-11064][SDK] Transform SQL supports NULLIF function (#11078)

2024-09-11 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 2af7480220 [INLONG-11064][SDK] Transform SQL supports NULLIF function 
(#11078)
2af7480220 is described below

commit 2af74802209b36197cb2f4938122d4c52132d1bf
Author: Zkplo <87751516+zk...@users.noreply.github.com>
AuthorDate: Thu Sep 12 14:44:12 2024 +0800

[INLONG-11064][SDK] Transform SQL supports NULLIF function (#11078)

Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com>
---
 .../transform/process/function/NullIfFunction.java | 63 
 .../function/flowcontrol/TestNullIfFunction.java   | 69 ++
 2 files changed, 132 insertions(+)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NullIfFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NullIfFunction.java
new file mode 100644
index 00..f64f8faba5
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NullIfFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import lombok.extern.slf4j.Slf4j;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+
+/**
+ * NullIfFunction
+ * description: NULLIF(expr1,expr2)
+ * - return NULL if expr1 = expr2 is true
+ * - returns expr1 otherwise
+ */
+@Slf4j
+@TransformFunction(names = {"nullif", "null_if"})
+public class NullIfFunction implements ValueParser {
+
+private final ValueParser firstExprParser;
+private final ValueParser secondExprParser;
+
+public NullIfFunction(Function expr) {
+List expressions = expr.getParameters().getExpressions();
+firstExprParser = OperatorTools.buildParser(expressions.get(0));
+secondExprParser = OperatorTools.buildParser(expressions.get(1));
+}
+
+@Override
+public Object parse(SourceData sourceData, int rowIndex, Context context) {
+Object firstExprObj = firstExprParser.parse(sourceData, rowIndex, 
context);
+if (firstExprObj == null) {
+return null;
+}
+Object secondExprObj = secondExprParser.parse(sourceData, rowIndex, 
context);
+if (secondExprObj == null) {
+return firstExprObj;
+}
+int cmp = OperatorTools.compareValue((Comparable) firstExprObj, 
(Comparable) secondExprObj);
+return cmp == 0 ? null : firstExprObj;
+}
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestNullIfFunction.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestNullIfFunction.java
new file mode 100644
index 00..868e922cc9
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestNullIfFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the spec

Re: [PR] [INLONG-11000][SDK] Add XML formatted data source for Transform [inlong]

2024-09-11 Thread via GitHub


dockerzhang commented on PR #11001:
URL: https://github.com/apache/inlong/pull/11001#issuecomment-2345413366

   @Zkplo please fix the failed workflows and UTs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10882][SDK] Transform SQL support ASCII function [inlong]

2024-09-11 Thread via GitHub


dockerzhang merged PR #10994:
URL: https://github.com/apache/inlong/pull/10994


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-10882][SDK] Transform SQL support ASCII function (#10994)

2024-09-11 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new e33a5e8816 [INLONG-10882][SDK] Transform SQL support ASCII function 
(#10994)
e33a5e8816 is described below

commit e33a5e88166f7c2633400fb40e57015e9bb0d398
Author: MOONSakura0614 <151456101+moonsakura0...@users.noreply.github.com>
AuthorDate: Thu Sep 12 14:55:05 2024 +0800

[INLONG-10882][SDK] Transform SQL support ASCII function (#10994)
---
 .../transform/process/function/AsciiFunction.java  | 66 ++
 .../process/function/string/TestAsciiFunction.java | 62 
 2 files changed, 128 insertions(+)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AsciiFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AsciiFunction.java
new file mode 100644
index 00..dac7cb3bab
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AsciiFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+
+/**
+ * ASCIIFunction
+ * description: ASCII(string) -- Returns the numeric value of the first 
character of string. Returns NULL if string is NULL.
+ */
+@TransformFunction(names = {"ascii"})
+public class AsciiFunction implements ValueParser {
+
+private final ValueParser stringParser;
+
+/**
+ * Constructor
+ * @param expr
+ */
+public AsciiFunction(Function expr) {
+List expressions = expr.getParameters().getExpressions();
+stringParser = OperatorTools.buildParser(expressions.get(0));
+}
+
+/**
+ * parse
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+@Override
+public Object parse(SourceData sourceData, int rowIndex, Context context) {
+Object stringObj = stringParser.parse(sourceData, rowIndex, context);
+if (stringObj == null) {
+return null;
+}
+String str = OperatorTools.parseString(stringObj);
+if (str == null || str.isEmpty()) {
+return null;
+}
+return (int) str.charAt(0);
+}
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestAsciiFunction.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestAsciiFunction.java
new file mode 100644
index 00..10d0c4032b
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestAsciiFunction.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.string;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.