[PR] [INLONG-11030][SDK] Add AVRO formatted data source for Transform [inlong]
ying-hua opened a new pull request, #11082: URL: https://github.com/apache/inlong/pull/11082 Fixes #11030 ### Motivation Transform add avro source data. Currently, the transform supports parsing Avro data types including int, float, long double, string, boolean, bytes, fixed, enum. Add some classes: - AvroSourceInfo - AvroNode - AvroSourceData - AvroSourceDecoder - TestAvro2CsvProcessor Modified: - SourceDecoderFactory - AbstractProcessorTestBase ### Modifications ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-11064][SDK] Transform SQL supports NULLIF function [inlong]
luchunliang commented on code in PR #11078: URL: https://github.com/apache/inlong/pull/11078#discussion_r1754107629 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NullIfFunction.java: ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * NullIfFunction + * description: NULLIF(expr1,expr2) + * - return NULL if expr1 = expr2 is true + * - returns expr1 otherwise + */ +@Slf4j +@TransformFunction(names = {"nullif", "null_if"}) Review Comment: Genenal function name is ifnull. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-11064][SDK] Transform SQL supports NULLIF function [inlong]
Zkplo commented on code in PR #11078: URL: https://github.com/apache/inlong/pull/11078#discussion_r1754194450 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NullIfFunction.java: ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * NullIfFunction + * description: NULLIF(expr1,expr2) + * - return NULL if expr1 = expr2 is true + * - returns expr1 otherwise + */ +@Slf4j +@TransformFunction(names = {"nullif", "null_if"}) Review Comment: According to the MySQL official website documentation( https://dev.mysql.com/doc/refman/8.4/en/flow-control-functions.html#function_nullif ), both ifnull and nullif exist. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO closed pull request #11083: [INLONG-10287][Agent] Update the Redis Source URL: https://github.com/apache/inlong/pull/11083 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO opened a new pull request, #11083: URL: https://github.com/apache/inlong/pull/11083 Fixes #10287 ### Motivation this pull request is a enhancement of the previous pr #10801 Complete the task requirement mentioned in pr #10801 * Two modes are provided. One uses command and the other uses the subscription . * To select a command for subscription, you need to configure one more parameter `is_subscribe` * For cases where there are many keys, suggest to use a batch query interface like `mget` * For cases where there are many keys but still use the `get` command, use `jedis.pipeline` to reduce io costs * Limit the maximum size of a single piece of data, if the data size exceeds 500k , automatically skip and discard the data ### Modifications * Modify TaskProfileDto and RedisTask to accept and parse the configuration. * Add RedisTask, RedisInstance, and RedisSource classes * Provide Unit Test ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO opened a new pull request, #11084: URL: https://github.com/apache/inlong/pull/11084 Fixes #10287 ### Motivation this pull request is a enhancement of the previous pr #10801 Complete the task requirement mentioned in pr #10801 * Two modes are provided. One uses command and the other uses the subscription . * To select a command for subscription, you need to configure one more parameter `is_subscribe` * For cases where there are many keys, suggest to use a batch query interface like `mget` * For cases where there are many keys but still use the `get` command, use `jedis.pipeline` to reduce io costs * Limit the maximum size of a single piece of data, if the data size exceeds 500k , automatically skip and discard the data ### Modifications * Modify TaskProfileDto and RedisTask to accept and parse the configuration. * Add RedisTask, RedisInstance, and RedisSource classes * Provide Unit Test ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-11005][SDK] Add YAML formatted data source for Transform [inlong]
emptyOVO opened a new pull request, #11085: URL: https://github.com/apache/inlong/pull/11085 Fixes #11005 ### Motivation add YamlSourceData, YamlSourceDecoder, YamlSourceInfo, provide unit tests ### Modifications ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
aloyszhang commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756081633 ## inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.task.OffsetManager; +import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.store.Store; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.enums.TaskStateEnum; +import org.apache.inlong.common.metric.MetricRegister; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Test cases for {@link RedisSource}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class}) +@PowerMockIgnore({"javax.management.*"}) +public class TestRedisSource { + +private static final Logger LOGGER = LoggerFactory.getLogger(TestRedisSource.class); + +private static AgentBaseTestsHelper helper; + +private final String instanceId = "s4bc475560bdbd4e9812ab1fd64d"; + +private static Store taskBasicStore; +private static Store instanceBasicStore; +private static Store offsetBasicStore; + +@Mock +private InstanceProfile profile; + +@Mock +private Jedis jedis; + +@Mock +private Pipeline pipeline; + +@Mock +private ScheduledExecutorService executor; + +@InjectMocks +private RedisSource redisSource; + +@Before +public void setUp() { +helper = new AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome(); +taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK); +instanceBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE); +offsetBasicStore = +TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET); +OffsetManager.init(taskBasicStore, instanceBasicStore, offsetBasicStore); +mockStatic(Executors.class); +when(Executors.newSingleThreadExecutor()).thenReturn(executor); +when(Executors.newScheduledThreadPool(1)).thenReturn(executor); +initProfile(); +} + +private void initProfile() { +final String username = ""; +final String password = "123456"; +final String hostname = "127.0.0.1"; +final String port = "6379"; +final String groupId = "group01"; +final String streamId = "stream01"; +final String keys = "age,name,sex"; +final String command = "zscore"; + +TaskProfile taskProfile = helper.getTaskP
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
aloyszhang commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756081863 ## inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.task.OffsetManager; +import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.store.Store; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.enums.TaskStateEnum; +import org.apache.inlong.common.metric.MetricRegister; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Test cases for {@link RedisSource}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class}) +@PowerMockIgnore({"javax.management.*"}) +public class TestRedisSource { + +private static final Logger LOGGER = LoggerFactory.getLogger(TestRedisSource.class); + +private static AgentBaseTestsHelper helper; + +private final String instanceId = "s4bc475560bdbd4e9812ab1fd64d"; + +private static Store taskBasicStore; +private static Store instanceBasicStore; +private static Store offsetBasicStore; + +@Mock +private InstanceProfile profile; + +@Mock +private Jedis jedis; + +@Mock +private Pipeline pipeline; + +@Mock +private ScheduledExecutorService executor; + +@InjectMocks +private RedisSource redisSource; + +@Before +public void setUp() { +helper = new AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome(); +taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK); +instanceBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE); +offsetBasicStore = +TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET); +OffsetManager.init(taskBasicStore, instanceBasicStore, offsetBasicStore); +mockStatic(Executors.class); +when(Executors.newSingleThreadExecutor()).thenReturn(executor); +when(Executors.newScheduledThreadPool(1)).thenReturn(executor); +initProfile(); +} + +private void initProfile() { +final String username = ""; +final String password = "123456"; +final String hostname = "127.0.0.1"; +final String port = "6379"; +final String groupId = "group01"; +final String streamId = "stream01"; +final String keys = "age,name,sex"; +final String command = "zscore"; + +TaskProfile taskProfile = helper.getTaskP
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
justinwwhuang commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756095808 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ## @@ -17,54 +17,279 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.moilioncircle.redis.replicator.RedisReplicator; +import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.cmd.CommandName; +import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand; +import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser; +import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; +import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.exceptions.JedisConnectionException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Redis source */ public class RedisSource extends AbstractSource { private static final Logger LOGGER = LoggerFactory.getLogger(RedisSource.class); +private static final long MAX_DATA_SIZE = 500 * 1024; +private static final int REDIS_QUEUE_SIZE = 1; +private Gson gson; + +public InstanceProfile profile; +private String port; +private Jedis jedis; +private String hostName; +private boolean ssl; +private String authUser; +private String authPassword; +private String readTimeout; +private String replId; +private String snapShot; +private String dbNumber; +private String redisCommand; + +private String fieldOrMember; +private boolean destroyed; +private boolean isSubscribe; +private Set keys; +private Replicator redisReplicator; +private BlockingQueue redisQueue; +private ScheduledExecutorService executor; public RedisSource() { } @Override protected String getThreadName() { -return null; +return "redis-source-" + taskId + "-" + instanceId; } @Override protected void initSource(InstanceProfile profile) { +LOGGER.info("Redis Source init: {}", profile.toJsonStr()); +this.port = profile.get(TaskConstants.TASK_REDIS_PORT); +this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME); +this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false); +this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, ""); +this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, ""); +this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, ""); +this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, ""); +this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1"); +this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0"); +this.keys = new ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(","))); +this.isSubscribe = profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false); +this.instanceId = profile.getInstanceId(); +this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE); +initGson(); +String uri = getRedisUri(); +try { +if (isSubscribe) { +// use subscribe mode +this.execu
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
justinwwhuang commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756099335 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ## @@ -17,54 +17,279 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.moilioncircle.redis.replicator.RedisReplicator; +import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.cmd.CommandName; +import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand; +import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser; +import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; +import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.exceptions.JedisConnectionException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Redis source */ public class RedisSource extends AbstractSource { private static final Logger LOGGER = LoggerFactory.getLogger(RedisSource.class); +private static final long MAX_DATA_SIZE = 500 * 1024; +private static final int REDIS_QUEUE_SIZE = 1; +private Gson gson; + +public InstanceProfile profile; +private String port; +private Jedis jedis; +private String hostName; +private boolean ssl; +private String authUser; +private String authPassword; +private String readTimeout; +private String replId; +private String snapShot; +private String dbNumber; +private String redisCommand; + +private String fieldOrMember; +private boolean destroyed; +private boolean isSubscribe; +private Set keys; +private Replicator redisReplicator; +private BlockingQueue redisQueue; +private ScheduledExecutorService executor; public RedisSource() { } @Override protected String getThreadName() { -return null; +return "redis-source-" + taskId + "-" + instanceId; } @Override protected void initSource(InstanceProfile profile) { +LOGGER.info("Redis Source init: {}", profile.toJsonStr()); +this.port = profile.get(TaskConstants.TASK_REDIS_PORT); +this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME); +this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false); +this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, ""); +this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, ""); +this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, ""); +this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, ""); +this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1"); +this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0"); +this.keys = new ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(","))); +this.isSubscribe = profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false); +this.instanceId = profile.getInstanceId(); +this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE); +initGson(); +String uri = getRedisUri(); +try { +if (isSubscribe) { +// use subscribe mode +this.execu
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
justinwwhuang commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756102202 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ## @@ -84,6 +321,272 @@ public boolean sourceFinish() { @Override public boolean sourceExist() { -return false; +return true; +} + +private String getRedisUri() { +StringBuffer sb = new StringBuffer("redis://"); +sb.append(hostName).append(":").append(port); +if (!StringUtils.isEmpty(dbNumber)) { +sb.append("/").append(dbNumber); +} +sb.append("?"); +if (!StringUtils.isEmpty(authPassword)) { +sb.append("authPassword=").append(authPassword).append("&"); +} +if (!StringUtils.isEmpty(authUser)) { +sb.append("authUser=").append(authUser).append("&"); +} +if (!StringUtils.isEmpty(readTimeout)) { +sb.append("readTimeout=").append(readTimeout).append("&"); +} +if (ssl) { +sb.append("ssl=").append("yes").append("&"); +} +if (!StringUtils.isEmpty(snapShot)) { +sb.append("replOffset=").append(snapShot).append("&"); +} +if (!StringUtils.isEmpty(replId)) { +sb.append("replId=").append(replId).append("&"); +} +if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == '&') { +sb.deleteCharAt(sb.length() - 1); +} +return sb.toString(); +} + +private void initReplicator() { +DefaultCommandParser defaultCommandParser = new DefaultCommandParser(); +this.redisReplicator.addCommandParser(CommandName.name("APPEND"), defaultCommandParser); Review Comment: Is this subscribing to all operations? Can it be configured here? For example, we only need to subscribe to a few of these operations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756115922 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ## @@ -84,6 +321,272 @@ public boolean sourceFinish() { @Override public boolean sourceExist() { -return false; +return true; +} + +private String getRedisUri() { +StringBuffer sb = new StringBuffer("redis://"); +sb.append(hostName).append(":").append(port); +if (!StringUtils.isEmpty(dbNumber)) { +sb.append("/").append(dbNumber); +} +sb.append("?"); +if (!StringUtils.isEmpty(authPassword)) { +sb.append("authPassword=").append(authPassword).append("&"); +} +if (!StringUtils.isEmpty(authUser)) { +sb.append("authUser=").append(authUser).append("&"); +} +if (!StringUtils.isEmpty(readTimeout)) { +sb.append("readTimeout=").append(readTimeout).append("&"); +} +if (ssl) { +sb.append("ssl=").append("yes").append("&"); +} +if (!StringUtils.isEmpty(snapShot)) { +sb.append("replOffset=").append(snapShot).append("&"); +} +if (!StringUtils.isEmpty(replId)) { +sb.append("replId=").append(replId).append("&"); +} +if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == '&') { +sb.deleteCharAt(sb.length() - 1); +} +return sb.toString(); +} + +private void initReplicator() { +DefaultCommandParser defaultCommandParser = new DefaultCommandParser(); +this.redisReplicator.addCommandParser(CommandName.name("APPEND"), defaultCommandParser); Review Comment: In theory, yes, but I think that since the user is using a subscription model, they should be more sensitive to real-time changes in the data, so I have all the Key modification commands listened in here, Do i need to change this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756120543 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ## @@ -84,6 +321,272 @@ public boolean sourceFinish() { @Override public boolean sourceExist() { -return false; +return true; +} + +private String getRedisUri() { +StringBuffer sb = new StringBuffer("redis://"); +sb.append(hostName).append(":").append(port); +if (!StringUtils.isEmpty(dbNumber)) { +sb.append("/").append(dbNumber); +} +sb.append("?"); +if (!StringUtils.isEmpty(authPassword)) { +sb.append("authPassword=").append(authPassword).append("&"); +} +if (!StringUtils.isEmpty(authUser)) { +sb.append("authUser=").append(authUser).append("&"); +} +if (!StringUtils.isEmpty(readTimeout)) { +sb.append("readTimeout=").append(readTimeout).append("&"); +} +if (ssl) { +sb.append("ssl=").append("yes").append("&"); +} +if (!StringUtils.isEmpty(snapShot)) { +sb.append("replOffset=").append(snapShot).append("&"); +} +if (!StringUtils.isEmpty(replId)) { +sb.append("replId=").append(replId).append("&"); +} +if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == '&') { +sb.deleteCharAt(sb.length() - 1); +} +return sb.toString(); +} + +private void initReplicator() { +DefaultCommandParser defaultCommandParser = new DefaultCommandParser(); +this.redisReplicator.addCommandParser(CommandName.name("APPEND"), defaultCommandParser); Review Comment: Is there a causal order problem with just subscribing to some of these operations? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756136763 ## inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.task.OffsetManager; +import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.store.Store; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.enums.TaskStateEnum; +import org.apache.inlong.common.metric.MetricRegister; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Test cases for {@link RedisSource}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class}) +@PowerMockIgnore({"javax.management.*"}) +public class TestRedisSource { + +private static final Logger LOGGER = LoggerFactory.getLogger(TestRedisSource.class); + +private static AgentBaseTestsHelper helper; + +private final String instanceId = "s4bc475560bdbd4e9812ab1fd64d"; + +private static Store taskBasicStore; +private static Store instanceBasicStore; +private static Store offsetBasicStore; + +@Mock +private InstanceProfile profile; + +@Mock +private Jedis jedis; + +@Mock +private Pipeline pipeline; + +@Mock +private ScheduledExecutorService executor; + +@InjectMocks +private RedisSource redisSource; + +@Before +public void setUp() { +helper = new AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome(); +taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK); +instanceBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE); +offsetBasicStore = +TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET); +OffsetManager.init(taskBasicStore, instanceBasicStore, offsetBasicStore); +mockStatic(Executors.class); +when(Executors.newSingleThreadExecutor()).thenReturn(executor); +when(Executors.newScheduledThreadPool(1)).thenReturn(executor); +initProfile(); +} + +private void initProfile() { +final String username = ""; +final String password = "123456"; +final String hostname = "127.0.0.1"; +final String port = "6379"; +final String groupId = "group01"; +final String streamId = "stream01"; +final String keys = "age,name,sex"; +final String command = "zscore"; + +TaskProfile taskProfile = helper.getTaskPro
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756137158 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ## @@ -17,54 +17,279 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.moilioncircle.redis.replicator.RedisReplicator; +import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.cmd.CommandName; +import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand; +import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser; +import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; +import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.exceptions.JedisConnectionException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Redis source */ public class RedisSource extends AbstractSource { private static final Logger LOGGER = LoggerFactory.getLogger(RedisSource.class); +private static final long MAX_DATA_SIZE = 500 * 1024; +private static final int REDIS_QUEUE_SIZE = 1; +private Gson gson; + +public InstanceProfile profile; +private String port; +private Jedis jedis; +private String hostName; +private boolean ssl; +private String authUser; +private String authPassword; +private String readTimeout; +private String replId; +private String snapShot; +private String dbNumber; +private String redisCommand; + +private String fieldOrMember; +private boolean destroyed; +private boolean isSubscribe; +private Set keys; +private Replicator redisReplicator; +private BlockingQueue redisQueue; +private ScheduledExecutorService executor; public RedisSource() { } @Override protected String getThreadName() { -return null; +return "redis-source-" + taskId + "-" + instanceId; } @Override protected void initSource(InstanceProfile profile) { +LOGGER.info("Redis Source init: {}", profile.toJsonStr()); +this.port = profile.get(TaskConstants.TASK_REDIS_PORT); +this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME); +this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false); +this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, ""); +this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, ""); +this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, ""); +this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, ""); +this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1"); +this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0"); +this.keys = new ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(","))); +this.isSubscribe = profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false); +this.instanceId = profile.getInstanceId(); +this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE); +initGson(); +String uri = getRedisUri(); +try { +if (isSubscribe) { +// use subscribe mode +this.executor =
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756138334 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ## @@ -17,54 +17,279 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.moilioncircle.redis.replicator.RedisReplicator; +import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.cmd.CommandName; +import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand; +import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser; +import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; +import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.exceptions.JedisConnectionException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Redis source */ public class RedisSource extends AbstractSource { private static final Logger LOGGER = LoggerFactory.getLogger(RedisSource.class); +private static final long MAX_DATA_SIZE = 500 * 1024; +private static final int REDIS_QUEUE_SIZE = 1; +private Gson gson; + +public InstanceProfile profile; +private String port; +private Jedis jedis; +private String hostName; +private boolean ssl; +private String authUser; +private String authPassword; +private String readTimeout; +private String replId; +private String snapShot; +private String dbNumber; +private String redisCommand; + +private String fieldOrMember; +private boolean destroyed; +private boolean isSubscribe; +private Set keys; +private Replicator redisReplicator; +private BlockingQueue redisQueue; +private ScheduledExecutorService executor; public RedisSource() { } @Override protected String getThreadName() { -return null; +return "redis-source-" + taskId + "-" + instanceId; } @Override protected void initSource(InstanceProfile profile) { +LOGGER.info("Redis Source init: {}", profile.toJsonStr()); +this.port = profile.get(TaskConstants.TASK_REDIS_PORT); +this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME); +this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false); +this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, ""); +this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, ""); +this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, ""); +this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, ""); +this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1"); +this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0"); +this.keys = new ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(","))); +this.isSubscribe = profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false); +this.instanceId = profile.getInstanceId(); +this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE); +initGson(); +String uri = getRedisUri(); +try { +if (isSubscribe) { +// use subscribe mode +this.executor =
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756145612 ## inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.task.OffsetManager; +import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.store.Store; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.enums.TaskStateEnum; +import org.apache.inlong.common.metric.MetricRegister; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Test cases for {@link RedisSource}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class}) +@PowerMockIgnore({"javax.management.*"}) +public class TestRedisSource { + +private static final Logger LOGGER = LoggerFactory.getLogger(TestRedisSource.class); + +private static AgentBaseTestsHelper helper; + +private final String instanceId = "s4bc475560bdbd4e9812ab1fd64d"; + +private static Store taskBasicStore; +private static Store instanceBasicStore; +private static Store offsetBasicStore; + +@Mock +private InstanceProfile profile; + +@Mock +private Jedis jedis; + +@Mock +private Pipeline pipeline; + +@Mock +private ScheduledExecutorService executor; + +@InjectMocks +private RedisSource redisSource; + +@Before +public void setUp() { +helper = new AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome(); +taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK); +instanceBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE); +offsetBasicStore = +TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET); +OffsetManager.init(taskBasicStore, instanceBasicStore, offsetBasicStore); +mockStatic(Executors.class); +when(Executors.newSingleThreadExecutor()).thenReturn(executor); +when(Executors.newScheduledThreadPool(1)).thenReturn(executor); +initProfile(); +} + +private void initProfile() { +final String username = ""; +final String password = "123456"; +final String hostname = "127.0.0.1"; +final String port = "6379"; +final String groupId = "group01"; +final String streamId = "stream01"; +final String keys = "age,name,sex"; +final String command = "zscore"; + +TaskProfile taskProfile = helper.getTaskPro
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756115922 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ## @@ -84,6 +321,272 @@ public boolean sourceFinish() { @Override public boolean sourceExist() { -return false; +return true; +} + +private String getRedisUri() { +StringBuffer sb = new StringBuffer("redis://"); +sb.append(hostName).append(":").append(port); +if (!StringUtils.isEmpty(dbNumber)) { +sb.append("/").append(dbNumber); +} +sb.append("?"); +if (!StringUtils.isEmpty(authPassword)) { +sb.append("authPassword=").append(authPassword).append("&"); +} +if (!StringUtils.isEmpty(authUser)) { +sb.append("authUser=").append(authUser).append("&"); +} +if (!StringUtils.isEmpty(readTimeout)) { +sb.append("readTimeout=").append(readTimeout).append("&"); +} +if (ssl) { +sb.append("ssl=").append("yes").append("&"); +} +if (!StringUtils.isEmpty(snapShot)) { +sb.append("replOffset=").append(snapShot).append("&"); +} +if (!StringUtils.isEmpty(replId)) { +sb.append("replId=").append(replId).append("&"); +} +if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == '&') { +sb.deleteCharAt(sb.length() - 1); +} +return sb.toString(); +} + +private void initReplicator() { +DefaultCommandParser defaultCommandParser = new DefaultCommandParser(); +this.redisReplicator.addCommandParser(CommandName.name("APPEND"), defaultCommandParser); Review Comment: In theory, yes, but I think that since the user is using a subscription model, they should be more sensitive to real-time changes in the data, so I have all the Key modification commands listened in here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756154385 ## inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.task.OffsetManager; +import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.store.Store; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.enums.TaskStateEnum; +import org.apache.inlong.common.metric.MetricRegister; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Test cases for {@link RedisSource}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class}) +@PowerMockIgnore({"javax.management.*"}) +public class TestRedisSource { + +private static final Logger LOGGER = LoggerFactory.getLogger(TestRedisSource.class); + +private static AgentBaseTestsHelper helper; + +private final String instanceId = "s4bc475560bdbd4e9812ab1fd64d"; + +private static Store taskBasicStore; +private static Store instanceBasicStore; +private static Store offsetBasicStore; + +@Mock +private InstanceProfile profile; + +@Mock +private Jedis jedis; + +@Mock +private Pipeline pipeline; + +@Mock +private ScheduledExecutorService executor; + +@InjectMocks +private RedisSource redisSource; + +@Before +public void setUp() { +helper = new AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome(); +taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK); +instanceBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE); +offsetBasicStore = +TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET); +OffsetManager.init(taskBasicStore, instanceBasicStore, offsetBasicStore); +mockStatic(Executors.class); +when(Executors.newSingleThreadExecutor()).thenReturn(executor); +when(Executors.newScheduledThreadPool(1)).thenReturn(executor); +initProfile(); +} + +private void initProfile() { +final String username = ""; +final String password = "123456"; +final String hostname = "127.0.0.1"; +final String port = "6379"; +final String groupId = "group01"; +final String streamId = "stream01"; +final String keys = "age,name,sex"; +final String command = "zscore"; + +TaskProfile taskProfile = helper.getTaskPro
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756115922 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ## @@ -84,6 +321,272 @@ public boolean sourceFinish() { @Override public boolean sourceExist() { -return false; +return true; +} + +private String getRedisUri() { +StringBuffer sb = new StringBuffer("redis://"); +sb.append(hostName).append(":").append(port); +if (!StringUtils.isEmpty(dbNumber)) { +sb.append("/").append(dbNumber); +} +sb.append("?"); +if (!StringUtils.isEmpty(authPassword)) { +sb.append("authPassword=").append(authPassword).append("&"); +} +if (!StringUtils.isEmpty(authUser)) { +sb.append("authUser=").append(authUser).append("&"); +} +if (!StringUtils.isEmpty(readTimeout)) { +sb.append("readTimeout=").append(readTimeout).append("&"); +} +if (ssl) { +sb.append("ssl=").append("yes").append("&"); +} +if (!StringUtils.isEmpty(snapShot)) { +sb.append("replOffset=").append(snapShot).append("&"); +} +if (!StringUtils.isEmpty(replId)) { +sb.append("replId=").append(replId).append("&"); +} +if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == '&') { +sb.deleteCharAt(sb.length() - 1); +} +return sb.toString(); +} + +private void initReplicator() { +DefaultCommandParser defaultCommandParser = new DefaultCommandParser(); +this.redisReplicator.addCommandParser(CommandName.name("APPEND"), defaultCommandParser); Review Comment: In theory, yes, but I think that since the user is using a subscription model, they should be more sensitive to real-time changes in the data, so I have all the Key modification commands listened in here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756120543 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ## @@ -84,6 +321,272 @@ public boolean sourceFinish() { @Override public boolean sourceExist() { -return false; +return true; +} + +private String getRedisUri() { +StringBuffer sb = new StringBuffer("redis://"); +sb.append(hostName).append(":").append(port); +if (!StringUtils.isEmpty(dbNumber)) { +sb.append("/").append(dbNumber); +} +sb.append("?"); +if (!StringUtils.isEmpty(authPassword)) { +sb.append("authPassword=").append(authPassword).append("&"); +} +if (!StringUtils.isEmpty(authUser)) { +sb.append("authUser=").append(authUser).append("&"); +} +if (!StringUtils.isEmpty(readTimeout)) { +sb.append("readTimeout=").append(readTimeout).append("&"); +} +if (ssl) { +sb.append("ssl=").append("yes").append("&"); +} +if (!StringUtils.isEmpty(snapShot)) { +sb.append("replOffset=").append(snapShot).append("&"); +} +if (!StringUtils.isEmpty(replId)) { +sb.append("replId=").append(replId).append("&"); +} +if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == '&') { +sb.deleteCharAt(sb.length() - 1); +} +return sb.toString(); +} + +private void initReplicator() { +DefaultCommandParser defaultCommandParser = new DefaultCommandParser(); +this.redisReplicator.addCommandParser(CommandName.name("APPEND"), defaultCommandParser); Review Comment: Is there a causal order problem with just subscribing to some of these operations? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-11081][SDK] Transform SQL supports INTERVAL parse [inlong]
Zkplo opened a new pull request, #11086: URL: https://github.com/apache/inlong/pull/11086 Fixes #11081 ### Motivation Based on [Mysql](https://dev.mysql.com/doc/refman/8.4/en/expressions.html#temporal-intervals): 1. Added IntervalParser class ``` IntervalParser <-> INTERVAL expr unit -> Pair(factor,Map(ChronoField,Count)): `factor`: 1) `expr` can accept strings starting with '-', representing the meaning of subtraction. So the positive or negative sign of `factor` indicates whether `expr` starts with a '-' or not. 2) For units like WEEK and QUARTER, it is not easy to parse, so WEEK -> ( unit=DAY, adb(factor)=7 ); QUARTER -> ( unit=MONTH, adb(factor)=3 ). `Map(ChronoField,Count)`: IntervalParser will automatically match the corresponding DateTimeFormatter based on the input `expr`, Based on DateTimeFormatter, IntervalParser will parse the incoming units and store them in a Map. In addition,acceptable expression parsing and specifying parameters in two ways: 1) interval rowName year -> expression 3) interval 2 year -> fixed parameter ``` 2. Modified AdditionParser and SubtractionParser to have the ability of time calculation. 3. Also, add the corresponding unit test classe: TestAdditionParser, TestSubtractionParser ### Modifications ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-11044][SDK] Transform SQL supports string judgment functions(IS_ALPHA、IS_DECIMAL、IS_DIGIT) [inlong]
dockerzhang merged PR #11052: URL: https://github.com/apache/inlong/pull/11052 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-11044][SDK] Transform SQL supports string judgment functions(IS_ALPHA、IS_DECIMAL、IS_DIGIT) (#11052)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 4cdce2c585 [INLONG-11044][SDK] Transform SQL supports string judgment functions(IS_ALPHA、IS_DECIMAL、IS_DIGIT) (#11052) 4cdce2c585 is described below commit 4cdce2c5855b1c872372d9e5ffae02e50452cb23 Author: Zkplo <87751516+zk...@users.noreply.github.com> AuthorDate: Thu Sep 12 14:40:47 2024 +0800 [INLONG-11044][SDK] Transform SQL supports string judgment functions(IS_ALPHA、IS_DECIMAL、IS_DIGIT) (#11052) Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com> --- .../process/function/IsAlphaFunction.java | 60 .../process/function/IsDecimalFunction.java| 54 +++ .../process/function/IsDigitFunction.java | 59 .../function/string/TestIsAlphaFunction.java | 74 .../function/string/TestIsDecimalFunction.java | 80 ++ .../function/string/TestIsDigitFunction.java | 80 ++ 6 files changed, 407 insertions(+) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsAlphaFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsAlphaFunction.java new file mode 100644 index 00..54f54aee9c --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsAlphaFunction.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +/** + * IsAlphaFunction + * description: is_alpha(string) + * - return true if all characters in string are letter + * - return false otherwise (Including cases where string is null and ''). + */ +@TransformFunction(names = {"is_alpha"}) +public class IsAlphaFunction implements ValueParser { + +private final ValueParser stringParser; + +public IsAlphaFunction(Function expr) { +stringParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); +} + +@Override +public Object parse(SourceData sourceData, int rowIndex, Context context) { +Object stringObject = stringParser.parse(sourceData, rowIndex, context); +if (stringObject == null) { +return false; +} +String string = OperatorTools.parseString(stringObject); +if (string.isEmpty()) { +return false; +} +for (char chr : string.toCharArray()) { +if ((chr >= 'a' && chr <= 'z') || (chr >= 'A' && chr <= 'Z')) { +continue; +} +return false; +} +return true; +} +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsDecimalFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsDecimalFunction.java new file mode 100644 index 00..d22f3ddaf6 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsDecimalFunction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Lic
Re: [PR] [INLONG-11064][SDK] Transform SQL supports NULLIF function [inlong]
dockerzhang merged PR #11078: URL: https://github.com/apache/inlong/pull/11078 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-11064][SDK] Transform SQL supports NULLIF function (#11078)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 2af7480220 [INLONG-11064][SDK] Transform SQL supports NULLIF function (#11078) 2af7480220 is described below commit 2af74802209b36197cb2f4938122d4c52132d1bf Author: Zkplo <87751516+zk...@users.noreply.github.com> AuthorDate: Thu Sep 12 14:44:12 2024 +0800 [INLONG-11064][SDK] Transform SQL supports NULLIF function (#11078) Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com> --- .../transform/process/function/NullIfFunction.java | 63 .../function/flowcontrol/TestNullIfFunction.java | 69 ++ 2 files changed, 132 insertions(+) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NullIfFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NullIfFunction.java new file mode 100644 index 00..f64f8faba5 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NullIfFunction.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * NullIfFunction + * description: NULLIF(expr1,expr2) + * - return NULL if expr1 = expr2 is true + * - returns expr1 otherwise + */ +@Slf4j +@TransformFunction(names = {"nullif", "null_if"}) +public class NullIfFunction implements ValueParser { + +private final ValueParser firstExprParser; +private final ValueParser secondExprParser; + +public NullIfFunction(Function expr) { +List expressions = expr.getParameters().getExpressions(); +firstExprParser = OperatorTools.buildParser(expressions.get(0)); +secondExprParser = OperatorTools.buildParser(expressions.get(1)); +} + +@Override +public Object parse(SourceData sourceData, int rowIndex, Context context) { +Object firstExprObj = firstExprParser.parse(sourceData, rowIndex, context); +if (firstExprObj == null) { +return null; +} +Object secondExprObj = secondExprParser.parse(sourceData, rowIndex, context); +if (secondExprObj == null) { +return firstExprObj; +} +int cmp = OperatorTools.compareValue((Comparable) firstExprObj, (Comparable) secondExprObj); +return cmp == 0 ? null : firstExprObj; +} +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestNullIfFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestNullIfFunction.java new file mode 100644 index 00..868e922cc9 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestNullIfFunction.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the spec
Re: [PR] [INLONG-11000][SDK] Add XML formatted data source for Transform [inlong]
dockerzhang commented on PR #11001: URL: https://github.com/apache/inlong/pull/11001#issuecomment-2345413366 @Zkplo please fix the failed workflows and UTs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10882][SDK] Transform SQL support ASCII function [inlong]
dockerzhang merged PR #10994: URL: https://github.com/apache/inlong/pull/10994 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-10882][SDK] Transform SQL support ASCII function (#10994)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new e33a5e8816 [INLONG-10882][SDK] Transform SQL support ASCII function (#10994) e33a5e8816 is described below commit e33a5e88166f7c2633400fb40e57015e9bb0d398 Author: MOONSakura0614 <151456101+moonsakura0...@users.noreply.github.com> AuthorDate: Thu Sep 12 14:55:05 2024 +0800 [INLONG-10882][SDK] Transform SQL support ASCII function (#10994) --- .../transform/process/function/AsciiFunction.java | 66 ++ .../process/function/string/TestAsciiFunction.java | 62 2 files changed, 128 insertions(+) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AsciiFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AsciiFunction.java new file mode 100644 index 00..dac7cb3bab --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AsciiFunction.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * ASCIIFunction + * description: ASCII(string) -- Returns the numeric value of the first character of string. Returns NULL if string is NULL. + */ +@TransformFunction(names = {"ascii"}) +public class AsciiFunction implements ValueParser { + +private final ValueParser stringParser; + +/** + * Constructor + * @param expr + */ +public AsciiFunction(Function expr) { +List expressions = expr.getParameters().getExpressions(); +stringParser = OperatorTools.buildParser(expressions.get(0)); +} + +/** + * parse + * @param sourceData + * @param rowIndex + * @return + */ +@Override +public Object parse(SourceData sourceData, int rowIndex, Context context) { +Object stringObj = stringParser.parse(sourceData, rowIndex, context); +if (stringObj == null) { +return null; +} +String str = OperatorTools.parseString(stringObj); +if (str == null || str.isEmpty()) { +return null; +} +return (int) str.charAt(0); +} +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestAsciiFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestAsciiFunction.java new file mode 100644 index 00..10d0c4032b --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestAsciiFunction.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.string; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.