Re: [PR] [INLONG-10944][SDK] Support Inlong Transform parser annotation [inlong]

2024-08-29 Thread via GitHub


emptyOVO commented on code in PR #10945:
URL: https://github.com/apache/inlong/pull/10945#discussion_r1735672712


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParserTools.java:
##
@@ -0,0 +1,64 @@
+package org.apache.inlong.sdk.transform.process.parser;
+
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.schema.Column;
+import org.reflections.Reflections;
+import org.reflections.scanners.Scanners;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class ParserTools {
+private static final String PARSER_PATH = 
"org.apache.inlong.sdk.transform.process.parser";
+private final static Map, Class> parserMap 
= Maps.newConcurrentMap();
+
+static{
+init();
+}
+private static void init() {
+Reflections reflections = new Reflections(PARSER_PATH, 
Scanners.TypesAnnotated);
+Set> clazzSet = 
reflections.getTypesAnnotatedWith(TransformParser.class);
+for (Class clazz : clazzSet) {
+if (ValueParser.class.isAssignableFrom(clazz)) {
+TransformParser annotation = 
clazz.getAnnotation(TransformParser.class);
+if (annotation == null) {
+continue;
+}
+Class[] values = annotation.value();
+for (Class value : values) {
+if (value==null) {
+continue;
+}
+parserMap.compute(value, (name, former) -> {

Review Comment:
   fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10944][SDK] Support Inlong Transform parser annotation [inlong]

2024-08-29 Thread via GitHub


vernedeng commented on code in PR #10945:
URL: https://github.com/apache/inlong/pull/10945#discussion_r1735695332


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/TransformParser.java:
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import net.sf.jsqlparser.expression.Expression;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Retention(RUNTIME)
+@Target(TYPE)
+public @interface TransformParser {
+
+Class[] value();

Review Comment:
   values



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-10947][Dashboard] Modify field description missing for English locale [inlong]

2024-08-29 Thread via GitHub


wohainilaodou opened a new pull request, #10948:
URL: https://github.com/apache/inlong/pull/10948

   
   
   Fixes #10947 
   
   ### Motivation
   
   Modify field description missing for English locale
   
   ### Modifications
   
   Modify field description missing for English locale
   
   ### Verifying this change
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10931][Manager] Data preview supports data containing escape characters [inlong]

2024-08-29 Thread via GitHub


aloyszhang merged PR #10932:
URL: https://github.com/apache/inlong/pull/10932


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated (4cf7ccba60 -> 6ba319369e)

2024-08-29 Thread aloyszhang
This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 4cf7ccba60 [INLONG-10847][Dashboard] Agent type cluster node 
management adds restart、reinstall, install log 、heartbeat detection and unLoad 
operations (#10926)
 add 6ba319369e [INLONG-10931][Manager] Data preview supports data 
containing escape characters (#10932)

No new revisions were added by this update.

Summary of changes:
 inlong-manager/manager-service/pom.xml   |  5 +
 .../manager/service/datatype/CsvDataTypeOperator.java| 16 
 2 files changed, 17 insertions(+), 4 deletions(-)



Re: [PR] [INLONG-10944][SDK] Support Inlong Transform parser annotation [inlong]

2024-08-29 Thread via GitHub


emptyOVO commented on code in PR #10945:
URL: https://github.com/apache/inlong/pull/10945#discussion_r1735712254


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/TransformParser.java:
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import net.sf.jsqlparser.expression.Expression;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Retention(RUNTIME)
+@Target(TYPE)
+public @interface TransformParser {
+
+Class[] value();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-10946][SDK] Transform SQL supports SPACE function [inlong]

2024-08-29 Thread via GitHub


Zkplo opened a new pull request, #10949:
URL: https://github.com/apache/inlong/pull/10949

   
   
   
   
   Fixes #10946 
   
   ### Motivation
   
   Add one function classe: SpaceFunction. Also, add the corresponding unit 
test codes.
   
   ### Modifications
   
   
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-10940][SDK] Transform SQL support arithmetic functions(Including cot, tanh, cosh, asin, atan and atan2) [inlong]

2024-08-29 Thread via GitHub


MOONSakura0614 opened a new pull request, #10950:
URL: https://github.com/apache/inlong/pull/10950

   
   
   
   
   Fixes #10940 
   
   ### Motivation
   Add six arithmetic function classes: CotFunction, TanhFunction, 
CoshFunction, AsinFunction, AtanFunction and Atan2Function. Also, add the 
corresponding unit test codes
   
   
   ### Modifications
   
   
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]

2024-08-29 Thread via GitHub


emptyOVO commented on code in PR #10801:
URL: https://github.com/apache/inlong/pull/10801#discussion_r1735782184


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##
@@ -17,54 +17,169 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Redis source
  */
 public class RedisSource extends AbstractSource {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisSource.class);
+public InstanceProfile profile;
+private String port;
+private Jedis jedis;
+private String hostName;
+private boolean ssl;
+private String authUser;
+private String authPassword;
+private String readTimeout;
+private String replId;
+private String snapShot;
+private String dbNumber;
+private String redisCommand;
+
+private String fieldOrMember;
+private boolean destroyed;
+
+private Set keys;
+private Replicator redisReplicator;
+private BlockingQueue redisQueue;
+private ExecutorService executor;
 
 public RedisSource() {
 
 }
 
 @Override
 protected String getThreadName() {
-return null;
+return "redis-source-" + taskId + "-" + instanceId;
 }
 
 @Override
 protected void initSource(InstanceProfile profile) {
+LOGGER.info("Redis Source init: {}", profile.toJsonStr());
+this.port = profile.get(TaskConstants.TASK_REDIS_PORT);
+this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME);
+this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false);
+this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, "");
+this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, 
"");
+this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, 
"");
+this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, "");
+this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1");
+this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0");
+this.redisCommand = profile.get(TaskConstants.TASK_REDIS_COMMAND, 
"get");
+this.keys = new 
ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(",")));
+this.fieldOrMember = 
profile.get(TaskConstants.TASK_REDIS_FIELD_OR_NUMBER, null);
+this.instanceId = profile.getInstanceId();
+this.redisQueue = new 
LinkedBlockingQueue<>(profile.getInt(TaskConstants.TASK_REDIS_QUEUE_SIZE, 
1));
+String uri = getRedisUri();
+this.jedis = new Jedis(uri);
+try {
+redisReplicator = new RedisReplicator(uri);
+startJedisSynchronize();
+initReplicator();
+executor = Executors.newSingleThreadExecutor();
+executor.execute(startRedisReplicator());
+} catch (URISyntaxException | IOException e) {
+sourceMetric.pluginReadFailCount.addAndGet(1);
+LOGGER.error("Connect to redis {}:{} failed.", hostName, port);
+}
+}
 
+private void startJedisSynchronize() {
+for (String key : keys) {

Review Comment:
   fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-10934][SDK] Transform support LocalDate function [inlong]

2024-08-29 Thread via GitHub


ying-hua opened a new pull request, #10951:
URL: https://github.com/apache/inlong/pull/10951

   
   
   
   
   Fixes #10934
   
   ### Motivation
   
   
   - Add a function class: LocalDateFunction and its unit tests.
   - LocalDate([string1])
   Returns the local date in the time zone string1(by default: string1 is 
system default time zone if not specified).
   ### Modifications
   
   
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10947][Dashboard] Modify field description missing for English locale [inlong]

2024-08-29 Thread via GitHub


aloyszhang merged PR #10948:
URL: https://github.com/apache/inlong/pull/10948


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-10947][Dashboard] Modify field description missing for English locale (#10948)

2024-08-29 Thread aloyszhang
This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 1b62fdd49e [INLONG-10947][Dashboard] Modify field description missing 
for English locale (#10948)
1b62fdd49e is described below

commit 1b62fdd49edd3577732a8f8b7fec0bdbedb09b2e
Author: kamianlaida <165994047+wohainilao...@users.noreply.github.com>
AuthorDate: Thu Aug 29 16:29:45 2024 +0800

[INLONG-10947][Dashboard] Modify field description missing for English 
locale (#10948)
---
 inlong-dashboard/src/ui/locales/en.json | 1 +
 1 file changed, 1 insertion(+)

diff --git a/inlong-dashboard/src/ui/locales/en.json 
b/inlong-dashboard/src/ui/locales/en.json
index c281478532..ebf2bbfcb1 100644
--- a/inlong-dashboard/src/ui/locales/en.json
+++ b/inlong-dashboard/src/ui/locales/en.json
@@ -838,6 +838,7 @@
   "pages.Clusters.Node.Status": "Status",
   "pages.Clusters.Node.Status.Normal": "Normal",
   "pages.Clusters.Node.Status.Timeout": "Timeout",
+  "pages.Clusters.Node.LastModifier": "Last modifier",
   "pages.Clusters.Node.Creator": "Creator",
   "pages.Clusters.Node.Create": "Create",
   "pages.Clusters.Node.IpRule": "Please enter the IP address correctly",



Re: [PR] [INLONG-10883][SDK] Transform SQL support InitCap function [inlong]

2024-08-29 Thread via GitHub


dockerzhang commented on PR #10892:
URL: https://github.com/apache/inlong/pull/10892#issuecomment-2317051155

   @MOONSakura0614 please fix the conflicts and failed workflows.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10893][SDK] Transform SQL support FromBase64 function [inlong]

2024-08-29 Thread via GitHub


dockerzhang commented on PR #10896:
URL: https://github.com/apache/inlong/pull/10896#issuecomment-2317050159

   @MOONSakura0614 please fix the conflicts and failed workflows.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10940][SDK] Transform SQL support arithmetic functions(Including cot, tanh, cosh, asin, atan and atan2) [inlong]

2024-08-29 Thread via GitHub


aloyszhang commented on code in PR #10950:
URL: https://github.com/apache/inlong/pull/10950#discussion_r1735884683


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/CotFunction.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Function;
+
+import java.math.BigDecimal;
+
+/**
+ * CotFunction
+ * description: cot(numeric) -- returns the cotangent of the numeric (in 
radians)
+ */
+@TransformFunction(names = {"cot"})
+public class CotFunction implements ValueParser {
+
+private final ValueParser valueParser;
+
+public CotFunction(Function expr) {
+// 使用OperatorTools构建解析器来处理表达式中的参数

Review Comment:
   please replace with english



##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/CotFunction.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Function;
+
+import java.math.BigDecimal;
+
+/**
+ * CotFunction
+ * description: cot(numeric) -- returns the cotangent of the numeric (in 
radians)
+ */
+@TransformFunction(names = {"cot"})
+public class CotFunction implements ValueParser {
+
+private final ValueParser valueParser;
+
+public CotFunction(Function expr) {
+// 使用OperatorTools构建解析器来处理表达式中的参数
+this.valueParser = 
OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
+}
+
+@Override
+public Object parse(SourceData sourceData, int rowIndex, Context context) {
+// 解析输入的参数
+Object valueObj = valueParser.parse(sourceData, rowIndex, context);
+
+// 将解析结果转换为BigDecimal以处理数值计算
+BigDecimal value = OperatorTools.parseBigDecimal(valueObj);
+
+// 计算tan(x)并取倒数以求得cot(x)

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug][Doc] Wrong referring to StarRocks [inlong-website]

2024-08-29 Thread via GitHub


aloyszhang opened a new issue, #970:
URL: https://github.com/apache/inlong-website/issues/970

   ### What happened
   
   mysql_starrocks_example page is referring to StarRocks as Apache StarRocks 
incorrectly.
   
   
   
   ### What you expected to happen
   
StarRocks instead of Apache StarRocks
   
   ### How to reproduce
   
   see 
https://inlong.apache.org/docs/quick_start/data_sync/mysql_starrocks_example/
   
   ### Environment
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes, I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [INLONG-970][Doc] fix the wrong referring to StarRocks [inlong-website]

2024-08-29 Thread via GitHub


aloyszhang opened a new pull request, #971:
URL: https://github.com/apache/inlong-website/pull/971

   
   Fixes #970 
   
   ### Motivation
   
   
   
   ### Modifications
   
   current `Apache starrocks` to `Starrocks`
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   
   ### Documentation
   
 - Does this pull request introduce a new feature? ( no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Improve][Doc] Add excalidraw raw file directory [inlong-website]

2024-08-29 Thread via GitHub


aloyszhang closed issue #968: [Improve][Doc] Add excalidraw raw file directory
URL: https://github.com/apache/inlong-website/issues/968


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong-website) branch master updated: [INLONG-968][Doc] Add excalidraw raw file directories (#969)

2024-08-29 Thread aloyszhang
This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
 new 07fdec66a2 [INLONG-968][Doc] Add excalidraw raw file directories (#969)
07fdec66a2 is described below

commit 07fdec66a2cbb5db9bf4887491db142a3b58b138
Author: AloysZhang 
AuthorDate: Thu Aug 29 18:41:23 2024 +0800

[INLONG-968][Doc] Add excalidraw raw file directories (#969)
---
 image_raw/administration/.gitkeep|   0
 image_raw/components/.gitkeep|   0
 image_raw/data_nodes/.gitkeep|   0
 image_raw/deployment/.gitkeep|   0
 image_raw/design_and_concept/.gitkeep|   0
 image_raw/development/.gitkeep   |   0
 image_raw/introduction/architecture.pptx | Bin 0 -> 815539 bytes
 image_raw/quick_start/.gitkeep   |   0
 image_raw/sdk/.gitkeep   |   0
 image_raw/user_guide/.gitkeep|   0
 10 files changed, 0 insertions(+), 0 deletions(-)

diff --git a/image_raw/administration/.gitkeep 
b/image_raw/administration/.gitkeep
new file mode 100644
index 00..e69de29bb2
diff --git a/image_raw/components/.gitkeep b/image_raw/components/.gitkeep
new file mode 100644
index 00..e69de29bb2
diff --git a/image_raw/data_nodes/.gitkeep b/image_raw/data_nodes/.gitkeep
new file mode 100644
index 00..e69de29bb2
diff --git a/image_raw/deployment/.gitkeep b/image_raw/deployment/.gitkeep
new file mode 100644
index 00..e69de29bb2
diff --git a/image_raw/design_and_concept/.gitkeep 
b/image_raw/design_and_concept/.gitkeep
new file mode 100644
index 00..e69de29bb2
diff --git a/image_raw/development/.gitkeep b/image_raw/development/.gitkeep
new file mode 100644
index 00..e69de29bb2
diff --git a/image_raw/introduction/architecture.pptx 
b/image_raw/introduction/architecture.pptx
new file mode 100644
index 00..f8c6e54ef6
Binary files /dev/null and b/image_raw/introduction/architecture.pptx differ
diff --git a/image_raw/quick_start/.gitkeep b/image_raw/quick_start/.gitkeep
new file mode 100644
index 00..e69de29bb2
diff --git a/image_raw/sdk/.gitkeep b/image_raw/sdk/.gitkeep
new file mode 100644
index 00..e69de29bb2
diff --git a/image_raw/user_guide/.gitkeep b/image_raw/user_guide/.gitkeep
new file mode 100644
index 00..e69de29bb2



Re: [PR] [INLONG-968][Doc] Add excalidraw raw file directories [inlong-website]

2024-08-29 Thread via GitHub


aloyszhang merged PR #969:
URL: https://github.com/apache/inlong-website/pull/969


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-970][Doc] fix the wrong referring to StarRocks [inlong-website]

2024-08-29 Thread via GitHub


dockerzhang merged PR #971:
URL: https://github.com/apache/inlong-website/pull/971


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug][Doc] Wrong referring to StarRocks [inlong-website]

2024-08-29 Thread via GitHub


dockerzhang closed issue #970: [Bug][Doc] Wrong referring to StarRocks 
URL: https://github.com/apache/inlong-website/issues/970


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong-website) branch master updated: [INLONG-970][Doc] fix the wrong referring to StarRocks (#971)

2024-08-29 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
 new 387b07f85f [INLONG-970][Doc] fix the wrong referring to StarRocks 
(#971)
387b07f85f is described below

commit 387b07f85f399564585184c8931209a011811cd0
Author: AloysZhang 
AuthorDate: Thu Aug 29 18:46:26 2024 +0800

[INLONG-970][Doc] fix the wrong referring to StarRocks (#971)
---
 docs/quick_start/data_sync/mysql_starrocks_example.md   | 2 +-
 .../current/quick_start/data_sync/mysql_starrocks_example.md| 2 +-
 .../version-1.10.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +-
 .../version-1.11.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +-
 .../version-1.12.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +-
 .../version-1.13.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +-
 .../version-1.10.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +-
 .../version-1.11.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +-
 .../version-1.12.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +-
 .../version-1.13.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +-
 10 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/docs/quick_start/data_sync/mysql_starrocks_example.md 
b/docs/quick_start/data_sync/mysql_starrocks_example.md
index d6b4c9f5c7..405e8b77f0 100644
--- a/docs/quick_start/data_sync/mysql_starrocks_example.md
+++ b/docs/quick_start/data_sync/mysql_starrocks_example.md
@@ -17,7 +17,7 @@ Before we begin, we need to install InLong. Here we provide 
two ways:
 Download the [connectors](https://inlong.apache.org/downloads/) corresponding 
to Flink version, and after decompression, place 
`sort-connector-starrocks-[version]-SNAPSHOT.jar` in `/inlong-sort/connectors/` 
directory.
 
 ### Install StarRocks
-Please refer to the [Installation 
Tutorial](https://docs.starrocks.io/docs/quick_start/) on the Apache StarRocks 
official website
+Please refer to the [Installation 
Tutorial](https://docs.starrocks.io/docs/quick_start/) on the StarRocks 
official website
 
 ## Cluster Initialize
 When all containers are successfully started, you can access the InLong 
dashboard address http://localhost, and use the following default account to 
log in.
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/quick_start/data_sync/mysql_starrocks_example.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/quick_start/data_sync/mysql_starrocks_example.md
index 571bc65325..266872a45c 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/quick_start/data_sync/mysql_starrocks_example.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/quick_start/data_sync/mysql_starrocks_example.md
@@ -17,7 +17,7 @@ sidebar_position: 2
 下载与 Flink 版本对应的 [connectors](https://inlong.apache.org/zh-CN/downloads),解压后将 
`sort-connector-starrocks-[version]-SNAPSHOT.jar` 放在 `/inlong-sort/connectors/` 
目录下。
 
 ### 安装 StarRocks
-请参考 Apache StarRocks 官网的[安装教程](https://docs.starrocks.io/docs/quick_start/)。
+请参考 StarRocks 官网的[安装教程](https://docs.starrocks.io/docs/quick_start/)。
 
 ## 集群初始化
 容器启动成功后,访问 InLong Dashboard 地址 http://localhost,并使用以下默认账号登录:
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.10.0/quick_start/data_sync/mysql_starrocks_example.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.10.0/quick_start/data_sync/mysql_starrocks_example.md
index 64b31a95bc..964c97f09a 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.10.0/quick_start/data_sync/mysql_starrocks_example.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.10.0/quick_start/data_sync/mysql_starrocks_example.md
@@ -17,7 +17,7 @@ sidebar_position: 2
 下载 Flink 1.13 对应版本的 
[connectors](https://inlong.apache.org/zh-CN/downloads),解压后将 
`sort-connector-starrocks-[version]-SNAPSHOT.jar` 放在 `/inlong-sort/connectors/` 
目录下。
 
 ### 安装 StarRocks
-请参考 Apache StarRocks 官网的[安装教程](https://docs.starrocks.io/docs/quick_start/)。
+请参考 StarRocks 官网的[安装教程](https://docs.starrocks.io/docs/quick_start/)。
 
 ## 集群初始化
 容器启动成功后,访问 InLong Dashboard 地址 http://localhost,并使用以下默认账号登录:
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.11.0/quick_start/data_sync/mysql_starrocks_example.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.11.0/quick_start/data_sync/mysql_starrocks_example.md
index 571bc65325..266872a45c 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.11.0/quick_start/data_sync/mysql_starrocks_example.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.11.0/quick_start/data_sync/mysql_starrocks_example.md
@@ -17,7 +17,7 @@ sidebar_position: 2
 下载与 Flink 版本对应的 [connectors](https://inlong.apache.org/zh-CN/downloads),解压后将 
`sort-connector-starrocks-[version]-SNAPSHOT.jar` 放在 `/inlong-sort/conne

Re: [PR] [INLONG-10897][SDK] Transform support DATEDIFF function [inlong]

2024-08-29 Thread via GitHub


aloyszhang commented on code in PR #10925:
URL: https://github.com/apache/inlong/pull/10925#discussion_r1735987352


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+
+/**
+ * DateDiffFunction
+ * description: DATEDIFF(d1, d2)
+ * - return null if one of the two parameters is null or ""
+ * - return null if one of the two parameters has an incorrect date format
+ * - return the number of days between the dates d1->d2.
+ */
+@TransformFunction(names = {"datediff", "date_diff"})
+public class DateDiffFunction implements ValueParser {
+
+private final ValueParser leftDateParser;
+private final ValueParser rightDateParser;
+private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME =
+DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss");
+private static final DateTimeFormatter DEFAULT_FORMAT_DATE = 
DateTimeFormatter.ofPattern("-MM-dd");
+
+public DateDiffFunction(Function expr) {
+List expressions = expr.getParameters().getExpressions();
+leftDateParser = OperatorTools.buildParser(expressions.get(0));
+rightDateParser = OperatorTools.buildParser(expressions.get(1));
+}
+
+@Override
+public Object parse(SourceData sourceData, int rowIndex, Context context) {
+Object leftDateObj = leftDateParser.parse(sourceData, rowIndex, 
context);
+Object rightDateObj = rightDateParser.parse(sourceData, rowIndex, 
context);
+if (leftDateObj == null || rightDateObj == null) {
+return null;
+}
+String leftDate = OperatorTools.parseString(leftDateObj);
+String rightDate = OperatorTools.parseString(rightDateObj);
+if (leftDate.isEmpty() || rightDate.isEmpty()) {
+return null;
+}
+try {
+LocalDate left = getLocalDate(leftDate);
+LocalDate right = getLocalDate(rightDate);
+return ChronoUnit.DAYS.between(right, left);
+} catch (Exception e) {

Review Comment:
   Better to print some logs here for trouble shooting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10942][SDK] Add official function names for all Transform functions [inlong]

2024-08-29 Thread via GitHub


aloyszhang commented on PR #10943:
URL: https://github.com/apache/inlong/pull/10943#issuecomment-2317316437

   Where is the source of truth for "official function names", and any 
specifications for it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10902][SDK] Transform support HEX(numeric or string) function [inlong]

2024-08-29 Thread via GitHub


vernedeng commented on code in PR #10904:
URL: https://github.com/apache/inlong/pull/10904#discussion_r1736010161


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/HexFunction.java:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Function;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.Optional;
+
+/**
+ * HexFunction
+ * description: If the input argument is a numeric value (such as an integer), 
the HEX function converts the value to the corresponding hexadecimal string.
+ *  If the input argument is a string, the HEX function converts 
each character in the string to its corresponding hexadecimal ASCII encoding 
and returns the hexadecimal representation of the entire string.
+ */
+public class HexFunction implements ValueParser {
+
+private ValueParser valueParser;
+
+public HexFunction(Function expr) {
+valueParser = 
OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
+}
+
+@Override
+public Object parse(SourceData sourceData, int rowIndex, Context context) {
+Object valueObj = valueParser.parse(sourceData, rowIndex, context);
+try {
+return hex(OperatorTools.parseBigDecimal(valueObj)).toUpperCase();
+} catch (NumberFormatException e) {
+return hex(OperatorTools.parseString(valueObj));

Review Comment:
   Maybe you can make a new method called `isBigDecimal` to check the type of 
input valueObj, if true, parse it in BigDecimal way; otherwise, parse it as 
String.
   
   ```
   if (isBigDecimal(valueObj)) {
return hex(OperatorTools.parseBigDecimal(valueObj)).toUpperCase();
   } 
   return hex(OperatorTools.parseString(valueObj)).toUpperCase();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


aloyszhang commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1736049185


##
inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties:
##
@@ -35,3 +35,7 @@ flink.savepoint.directory=file:///data/inlong-sort/savepoints
 flink.parallelism=1
 # flink stop request drain
 flink.drain=false
+# max data count per core per second
+flink.max.data.percore=1000

Review Comment:
   ```suggestion
   # max msg rate per core
   flink.max.msg.rate.percore=1000
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10902][SDK] Transform support HEX(numeric or string) function [inlong]

2024-08-29 Thread via GitHub


emptyOVO commented on code in PR #10904:
URL: https://github.com/apache/inlong/pull/10904#discussion_r1736065655


##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/HexFunction.java:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Function;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.Optional;
+
+/**
+ * HexFunction
+ * description: If the input argument is a numeric value (such as an integer), 
the HEX function converts the value to the corresponding hexadecimal string.
+ *  If the input argument is a string, the HEX function converts 
each character in the string to its corresponding hexadecimal ASCII encoding 
and returns the hexadecimal representation of the entire string.
+ */
+public class HexFunction implements ValueParser {
+
+private ValueParser valueParser;
+
+public HexFunction(Function expr) {
+valueParser = 
OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
+}
+
+@Override
+public Object parse(SourceData sourceData, int rowIndex, Context context) {
+Object valueObj = valueParser.parse(sourceData, rowIndex, context);
+try {
+return hex(OperatorTools.parseBigDecimal(valueObj)).toUpperCase();
+} catch (NumberFormatException e) {
+return hex(OperatorTools.parseString(valueObj));

Review Comment:
   > Maybe you can make a new method called `isBigDecimal` to check the type of 
input valueObj, if true, parse it in BigDecimal way; otherwise, parse it as 
String.
   > 
   > ```
   > if (isBigDecimal(valueObj)) {
   >  return hex(OperatorTools.parseBigDecimal(valueObj)).toUpperCase();
   > } 
   > return hex(OperatorTools.parseString(valueObj)).toUpperCase();
   > ```
   
   i fix it as using regex to check the data



##
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/HexFunction.java:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Function;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.Optional;
+
+/**
+ * HexFunction
+ * description: If the input argument is a numeric value (such as an integer), 
the HEX function converts the value to the corresponding hexadecimal string.
+ *  If the input argument is a string, the HEX function converts 
each character in the string to its corresponding hexadecimal ASCII encoding 
and returns the hexadecimal representation of the entire string.
+ */
+public class HexFunction implements ValueParser {
+
+private ValueParser valueParser;
+
+public HexFunction(Function expr) {
+valueParser = 
OperatorTool

Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


aloyszhang commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1736084213


##
inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java:
##
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.consts;

Review Comment:
   That makes sense, thanks for your explain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


aloyszhang commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1736084213


##
inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java:
##
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.consts;

Review Comment:
   That makes sense, thanks for your explanation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


aloyszhang commented on PR #10916:
URL: https://github.com/apache/inlong/pull/10916#issuecomment-2317478140

   @fuweng11 There are some modifications in the manager module, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10946][SDK] Transform SQL supports SPACE function [inlong]

2024-08-29 Thread via GitHub


aloyszhang merged PR #10949:
URL: https://github.com/apache/inlong/pull/10949


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10934][SDK] Transform support LocalDate function [inlong]

2024-08-29 Thread via GitHub


aloyszhang merged PR #10951:
URL: https://github.com/apache/inlong/pull/10951


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-10934][SDK] Transform support LocalDate function (#10951)

2024-08-29 Thread aloyszhang
This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new f35ab6f619 [INLONG-10934][SDK] Transform support LocalDate function 
(#10951)
f35ab6f619 is described below

commit f35ab6f619004fac58a06f79b98e681e778622f0
Author: Xincheng Huang <60057611+ying-...@users.noreply.github.com>
AuthorDate: Thu Aug 29 20:19:44 2024 +0800

[INLONG-10934][SDK] Transform support LocalDate function (#10951)
---
 .../process/function/LocalDateFunction.java| 56 ++
 .../TestTransformTemporalFunctionsProcessor.java   | 42 
 2 files changed, 98 insertions(+)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocalDateFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocalDateFunction.java
new file mode 100644
index 00..ba2d909d95
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocalDateFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Function;
+
+import java.time.LocalDate;
+import java.time.ZoneId;
+
+/**
+ *  LocalDateFunction
+ *  description:
+ *  localDate([string1]) returns the current date in the specified time zone.
+ *  (by default: the current date in the system time zone)
+ */
+@TransformFunction(names = {"localdate", "currentdate", "current_date", 
"curdate"})
+public class LocalDateFunction implements ValueParser {
+
+private ValueParser stringParser;
+
+public LocalDateFunction(Function expr) {
+if (expr.getParameters() != null) {
+stringParser = 
OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
+}
+}
+
+@Override
+public Object parse(SourceData sourceData, int rowIndex, Context context) {
+if (stringParser != null) {
+String zoneString = 
OperatorTools.parseString(stringParser.parse(sourceData, rowIndex, context));
+return LocalDate.now(ZoneId.of(zoneString));
+} else {
+return LocalDate.now(ZoneId.systemDefault());
+}
+}
+}
\ No newline at end of file
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
index e0830dd72e..354b8b7f18 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
@@ -29,6 +29,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.time.Duration;
+import java.time.LocalDate;
 import java.time.LocalTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
@@ -421,6 +422,47 @@ public class TestTransformTemporalFunctionsProcessor {
 Assert.assertTrue(duration3.getSeconds() < 1);
 }
 
+@Test
+public void testLocalDateFunction() throws Exception {
+DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern("-MM-dd");
+
+// case1: localDate() - default system time zone
+String transformSql1 = "select localdate() from source";
+TransformConfig config1 = new TransformConfig(transformSql1);
+TransformProcessor processor1 = TransformProcessor
+.create(config1, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+SinkEncoderFactory.createKvEncoder(kvSink));
+List outp

(inlong) branch master updated (1b62fdd49e -> a9719038f1)

2024-08-29 Thread aloyszhang
This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 1b62fdd49e [INLONG-10947][Dashboard] Modify field description missing 
for English locale (#10948)
 add a9719038f1 [INLONG-10946][SDK] Transform SQL supports SPACE function 
(#10949)

No new revisions were added by this update.

Summary of changes:
 .../{Md5Function.java => SpaceFunction.java}   | 35 +++
 .../TestTransformStringFunctionsProcessor.java | 39 ++
 2 files changed, 60 insertions(+), 14 deletions(-)
 copy 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/{Md5Function.java
 => SpaceFunction.java} (62%)



Re: [PR] [INLONG-10939][SDK] Transform SQL supports STRCMP function [inlong]

2024-08-29 Thread via GitHub


dockerzhang merged PR #10941:
URL: https://github.com/apache/inlong/pull/10941


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-10939][SDK] Transform SQL supports STRCMP function (#10941)

2024-08-29 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 7cfbf8a74b [INLONG-10939][SDK] Transform SQL supports STRCMP function 
(#10941)
7cfbf8a74b is described below

commit 7cfbf8a74bb9570daa5171ed9300420fb37e1489
Author: Zkplo <87751516+zk...@users.noreply.github.com>
AuthorDate: Fri Aug 30 09:53:02 2024 +0800

[INLONG-10939][SDK] Transform SQL supports STRCMP function (#10941)

Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com>
---
 .../transform/process/function/StrcmpFunction.java | 67 ++
 .../TestTransformStringFunctionsProcessor.java | 42 ++
 2 files changed, 109 insertions(+)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/StrcmpFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/StrcmpFunction.java
new file mode 100644
index 00..ebbcc94ae9
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/StrcmpFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+
+/**
+ * StrcmpFunction
+ * description:  strcmp(s1,s2)
+ * return NULL if either argument is NULL
+ * return 0 if the strings are the same
+ * return -1 if the first argument is smaller than the second according to the 
current sort order
+ * return 1 otherwise
+ */
+@TransformFunction(names = {"strcmp"})
+public class StrcmpFunction implements ValueParser {
+
+private final ValueParser leftStringParser;
+private final ValueParser rightStringParser;
+
+public StrcmpFunction(Function expr) {
+List expressions = expr.getParameters().getExpressions();
+leftStringParser = OperatorTools.buildParser(expressions.get(0));
+rightStringParser = OperatorTools.buildParser(expressions.get(1));
+}
+
+@Override
+public Object parse(SourceData sourceData, int rowIndex, Context context) {
+Object leftStringObj = leftStringParser.parse(sourceData, rowIndex, 
context);
+Object rightStringObj = rightStringParser.parse(sourceData, rowIndex, 
context);
+if (leftStringObj == null || rightStringObj == null) {
+return null;
+}
+String leftString = OperatorTools.parseString(leftStringObj);
+String rightString = OperatorTools.parseString(rightStringObj);
+int cmp = OperatorTools.compareValue(leftString, rightString);
+if (cmp > 0) {
+return 1;
+} else if (cmp < 0) {
+return -1;
+}
+return 0;
+}
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java
index d8576d12fc..10cfa740c9 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java
@@ -386,6 +386,48 @@ public class TestTransformStringFunctionsProcessor {
 Assert.assertEquals(output7.get(0), "result=da");
 }
 
+@Test
+public void testStrcmpFunction() throws Exception {
+String transformSql = null, data = null;
+TransformConfig config = null;
+TransformProcessor processor = null;
+List output = null;
+
+transformSql = "select strcmp(string1,string2) fro

Re: [PR] [INLONG-10935][SDK] Transform support DAYNAME function [inlong]

2024-08-29 Thread via GitHub


aloyszhang commented on PR #10937:
URL: https://github.com/apache/inlong/pull/10937#issuecomment-2319692262

   @ying-hua please resolve the conflict. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


fuweng11 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737670802


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.inlong.audit.consts.OpenApiConstants.*;
+import static org.apache.inlong.manager.common.consts.InlongConstants.*;
+
+/**
+ * This class is used to calculate the recommended parallelism based on 
the maximum message per second per core.
+ * The data volume is calculated based on the average data count per hour.
+ * The data count is retrieved from the inlong audit API.
+ */
+@Slf4j
+@Component
+public class FlinkParallelismOptimizer {
+
+@Value("${audit.query.url:http://127.0.0.1:10080}";)
+public String auditQueryUrl;
+
+private static final int MAX_PARALLELISM = 2048;
+private long maximumMessagePerSecondPerCore = 1000L;
+private static final long DEFAULT_ERROR_DATA_VOLUME = 0L;
+private static final FlowType DEFAULT_FLOWTYPE = FlowType.OUTPUT;
+private static final String DEFAULT_AUDIT_TYPE = "DataProxy";
+private static final String AUDIT_CYCLE_REALTIME = "1";
+// maxmimum data scale counting range in hours
+private static final int DATA_SCALE_COUNTING_RANGE_IN_HOURS = 1;
+// sample time format: 2024-08-23T22:47:38.866
+private static final String AUDIT_QUERY_DATE_TIME_FORMAT = 
"-MM-dd'T'HH:mm:ss.SSS";
+
+private static final String LOGTS_DATE_TIME_FORMAT = "-MM-dd HH:mm:ss";
+private static final String TIMEZONE_REGEX = "([+-])(\\d):";
+
+/**
+ * Calculate recommended parallelism based on maximum message per second 
per core
+ *
+ * @return Recommended parallelism
+ */
+public int calculateRecommendedParallelism(List 
streamInfos) {
+long averageDataVolume;
+InlongStreamInfo streamInfo = streamInfos.get(0);
+try {
+averageDataVolume = getAverageDataVolume(streamInfo);
+log.info("Retrieved data volume: {}", averageDataVolume);
+} catch (Exception e) {
+log.error("Error retrieving data volume: {}", e.getMessage(), e);
+log.warn("Using default data volume: {}", 
DEFAULT_ERROR_DATA_VOLUME);
+averageDataVolume = DEFAULT_ERROR_DATA_VOLUME;
+}
+int newParallelism = (int) (averageDataVolume / 
maximumMessagePerSecondPerCore);
+// Ensure parallelism is at most MAX_PARALLELISM
+newParallelism = Math.min(newParallelism, MAX_PARALLELISM);
+log.info("Calculated parallelism: {} for data volume: {}", 
newParallelism, averageDataVolume);
+return newParallelism;
+}
+
+/**
+ * Initialize maximum message per second per core based on configuration
+ *
+ * @param maximumMessagePerSecondPerCore The maximum messages per second 
per core
+ */
+public void setMaximumMessagePerSecondPerCore(Integer 
maximumMessagePerSecondPerCore) {
+if (maximumMessagePerSecondPerCore == null || 
maximumMessagePerSecondPerCore <= 0) {
+log.error("Illegal flink.maxpercore property, mus

Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


fuweng11 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737673199


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.inlong.audit.consts.OpenApiConstants.*;
+import static org.apache.inlong.manager.common.consts.InlongConstants.*;
+
+/**
+ * This class is used to calculate the recommended parallelism based on 
the maximum message per second per core.
+ * The data volume is calculated based on the average data count per hour.
+ * The data count is retrieved from the inlong audit API.
+ */
+@Slf4j
+@Component
+public class FlinkParallelismOptimizer {
+
+@Value("${audit.query.url:http://127.0.0.1:10080}";)

Review Comment:
   Is it certain that this annotation will take effect in plugins?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


fuweng11 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737676926


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.inlong.audit.consts.OpenApiConstants.*;
+import static org.apache.inlong.manager.common.consts.InlongConstants.*;
+
+/**
+ * This class is used to calculate the recommended parallelism based on 
the maximum message per second per core.
+ * The data volume is calculated based on the average data count per hour.
+ * The data count is retrieved from the inlong audit API.
+ */
+@Slf4j
+@Component
+public class FlinkParallelismOptimizer {
+
+@Value("${audit.query.url:http://127.0.0.1:10080}";)
+public String auditQueryUrl;
+
+private static final int MAX_PARALLELISM = 2048;
+private long maximumMessagePerSecondPerCore = 1000L;
+private static final long DEFAULT_ERROR_DATA_VOLUME = 0L;
+private static final FlowType DEFAULT_FLOWTYPE = FlowType.OUTPUT;
+private static final String DEFAULT_AUDIT_TYPE = "DataProxy";
+private static final String AUDIT_CYCLE_REALTIME = "1";
+// maxmimum data scale counting range in hours
+private static final int DATA_SCALE_COUNTING_RANGE_IN_HOURS = 1;
+// sample time format: 2024-08-23T22:47:38.866
+private static final String AUDIT_QUERY_DATE_TIME_FORMAT = 
"-MM-dd'T'HH:mm:ss.SSS";
+
+private static final String LOGTS_DATE_TIME_FORMAT = "-MM-dd HH:mm:ss";
+private static final String TIMEZONE_REGEX = "([+-])(\\d):";
+
+/**
+ * Calculate recommended parallelism based on maximum message per second 
per core
+ *
+ * @return Recommended parallelism
+ */
+public int calculateRecommendedParallelism(List 
streamInfos) {
+long averageDataVolume;
+InlongStreamInfo streamInfo = streamInfos.get(0);
+try {
+averageDataVolume = getAverageDataVolume(streamInfo);
+log.info("Retrieved data volume: {}", averageDataVolume);
+} catch (Exception e) {
+log.error("Error retrieving data volume: {}", e.getMessage(), e);
+log.warn("Using default data volume: {}", 
DEFAULT_ERROR_DATA_VOLUME);

Review Comment:
   Please delete one of the warm and error logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


PeterZh6 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737677041


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.inlong.audit.consts.OpenApiConstants.*;
+import static org.apache.inlong.manager.common.consts.InlongConstants.*;
+
+/**
+ * This class is used to calculate the recommended parallelism based on 
the maximum message per second per core.
+ * The data volume is calculated based on the average data count per hour.
+ * The data count is retrieved from the inlong audit API.
+ */
+@Slf4j
+@Component
+public class FlinkParallelismOptimizer {
+
+@Value("${audit.query.url:http://127.0.0.1:10080}";)

Review Comment:
   https://github.com/user-attachments/assets/fd795b0f-40b3-4a91-b033-b8e6e1f0f989";>
   Yes, as I used `ApplicationContextProvider` to instantiate this Spring bean 
in `FlinkSerivice`, even though `FlinkService` is not Spring-managed. Attached 
is the log of `manager`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


fuweng11 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737678355


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.inlong.audit.consts.OpenApiConstants.*;
+import static org.apache.inlong.manager.common.consts.InlongConstants.*;
+
+/**
+ * This class is used to calculate the recommended parallelism based on 
the maximum message per second per core.
+ * The data volume is calculated based on the average data count per hour.
+ * The data count is retrieved from the inlong audit API.
+ */
+@Slf4j
+@Component
+public class FlinkParallelismOptimizer {
+
+@Value("${audit.query.url:http://127.0.0.1:10080}";)

Review Comment:
   This annotation in the plugins directory will not be scanned during loading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


PeterZh6 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737681783


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.inlong.audit.consts.OpenApiConstants.*;
+import static org.apache.inlong.manager.common.consts.InlongConstants.*;
+
+/**
+ * This class is used to calculate the recommended parallelism based on 
the maximum message per second per core.
+ * The data volume is calculated based on the average data count per hour.
+ * The data count is retrieved from the inlong audit API.
+ */
+@Slf4j
+@Component
+public class FlinkParallelismOptimizer {
+
+@Value("${audit.query.url:http://127.0.0.1:10080}";)

Review Comment:
   Could you further explain? I didn't quite get it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


PeterZh6 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737681783


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.inlong.audit.consts.OpenApiConstants.*;
+import static org.apache.inlong.manager.common.consts.InlongConstants.*;
+
+/**
+ * This class is used to calculate the recommended parallelism based on 
the maximum message per second per core.
+ * The data volume is calculated based on the average data count per hour.
+ * The data count is retrieved from the inlong audit API.
+ */
+@Slf4j
+@Component
+public class FlinkParallelismOptimizer {
+
+@Value("${audit.query.url:http://127.0.0.1:10080}";)

Review Comment:
   Could you further explain? I didn't quite get it. As it did work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


fuweng11 commented on PR #10916:
URL: https://github.com/apache/inlong/pull/10916#issuecomment-2319745536

   When a group is a real-time synchronization task, there is no dataproxy 
present. How to handle this situation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


PeterZh6 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737708162


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.inlong.audit.consts.OpenApiConstants.*;
+import static org.apache.inlong.manager.common.consts.InlongConstants.*;
+
+/**
+ * This class is used to calculate the recommended parallelism based on 
the maximum message per second per core.
+ * The data volume is calculated based on the average data count per hour.
+ * The data count is retrieved from the inlong audit API.
+ */
+@Slf4j
+@Component
+public class FlinkParallelismOptimizer {
+
+@Value("${audit.query.url:http://127.0.0.1:10080}";)

Review Comment:
   Or could you suggest a way to read the property? When the Spring profile 
changes, it is quite clumsy to get the file name of the configuration file. 
Therefore, I can only think of using the `@value` annotation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


fuweng11 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737719535


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.inlong.audit.consts.OpenApiConstants.*;
+import static org.apache.inlong.manager.common.consts.InlongConstants.*;
+
+/**
+ * This class is used to calculate the recommended parallelism based on 
the maximum message per second per core.
+ * The data volume is calculated based on the average data count per hour.
+ * The data count is retrieved from the inlong audit API.
+ */
+@Slf4j
+@Component
+public class FlinkParallelismOptimizer {
+
+@Value("${audit.query.url:http://127.0.0.1:10080}";)

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


fuweng11 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737738348


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.manager.pojo.audit.AuditInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.inlong.audit.consts.OpenApiConstants.*;
+import static org.apache.inlong.manager.common.consts.InlongConstants.*;
+
+/**
+ * This class is used to calculate the recommended parallelism based on 
the maximum message per second per core.
+ * The data volume is calculated based on the average data count per hour.
+ * The data count is retrieved from the inlong audit API.
+ */
+@Slf4j
+@Component
+public class FlinkParallelismOptimizer {
+
+@Value("${audit.query.url:http://127.0.0.1:10080}";)
+public String auditQueryUrl;
+
+private static final int MAX_PARALLELISM = 2048;
+private long maximumMessagePerSecondPerCore = 1000L;
+private static final long DEFAULT_ERROR_DATA_VOLUME = 0L;
+private static final FlowType DEFAULT_FLOWTYPE = FlowType.OUTPUT;
+private static final String DEFAULT_AUDIT_TYPE = "DataProxy";
+private static final String AUDIT_CYCLE_REALTIME = "1";
+// maxmimum data scale counting range in hours
+private static final int DATA_SCALE_COUNTING_RANGE_IN_HOURS = 1;
+// sample time format: 2024-08-23T22:47:38.866
+private static final String AUDIT_QUERY_DATE_TIME_FORMAT = 
"-MM-dd'T'HH:mm:ss.SSS";
+
+private static final String LOGTS_DATE_TIME_FORMAT = "-MM-dd HH:mm:ss";
+private static final String TIMEZONE_REGEX = "([+-])(\\d):";
+
+/**
+ * Calculate recommended parallelism based on maximum message per second 
per core
+ *
+ * @return Recommended parallelism
+ */
+public int calculateRecommendedParallelism(List 
streamInfos) {
+long averageDataVolume;
+InlongStreamInfo streamInfo = streamInfos.get(0);
+try {
+averageDataVolume = getAverageDataVolume(streamInfo);
+log.info("Retrieved data volume: {}", averageDataVolume);
+} catch (Exception e) {
+log.error("Error retrieving data volume: {}", e.getMessage(), e);
+log.warn("Using default data volume: {}", 
DEFAULT_ERROR_DATA_VOLUME);
+averageDataVolume = DEFAULT_ERROR_DATA_VOLUME;
+}
+int newParallelism = (int) (averageDataVolume / 
maximumMessagePerSecondPerCore);
+// Ensure parallelism is at most MAX_PARALLELISM
+newParallelism = Math.min(newParallelism, MAX_PARALLELISM);
+log.info("Calculated parallelism: {} for data volume: {}", 
newParallelism, averageDataVolume);
+return newParallelism;
+}
+
+/**
+ * Initialize maximum message per second per core based on configuration
+ *
+ * @param maximumMessagePerSecondPerCore The maximum messages per second 
per core
+ */
+public void setMaximumMessagePerSecondPerCore(Integer 
maximumMessagePerSecondPerCore) {
+if (maximumMessagePerSecondPerCore == null || 
maximumMessagePerSecondPerCore <= 0) {
+log.error("Illegal flink.maxpercore property, mus

Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


PeterZh6 commented on PR #10916:
URL: https://github.com/apache/inlong/pull/10916#issuecomment-2319790130

   > When a group is a real-time synchronization task, there is no dataproxy 
present. How to handle this situation.Is there a default value when dataproxy 
does not exist?
   
   When no audit data of DataProxy is present, the `entity` should be `null`, 
causing `parseResponseAndCalculateAverageDataVolume` to return 
`DEFAULT_ERROR_DATA_VOLUME`, which is `0L`
   
![image](https://github.com/user-attachments/assets/f0af0358-daab-4c21-bcda-1b01122e29a3)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


PeterZh6 commented on PR #10916:
URL: https://github.com/apache/inlong/pull/10916#issuecomment-2319808998

   
![9e8f6992-bb82-40f3-99b1-1445cea9f45b](file:///D:/%E5%9B%BE%E7%89%87/Typedown/9e8f6992-bb82-40f3-99b1-1445cea9f45b.png)
   Effect of adjusting parallelism is shown in the picture. (Failure is caused 
by resource limitation of my computer)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


fuweng11 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737778216


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java:
##
@@ -213,6 +218,22 @@ private String submitJobBySavepoint(FlinkInfo flinkInfo, 
SavepointRestoreSetting
 }).filter(Objects::nonNull).collect(Collectors.toList());
 
 Configuration configuration = 
getFlinkConfiguration(flinkInfo.getEndpoint());
+log.debug("flink info: {}", flinkInfo);
+if (flinkConfig.getDynamicParallelismEnable()) {
+
flinkParallelismOptimizer.setMaximumMessagePerSecondPerCore(flinkConfig.getMaxMsgRatePerCore());
+// get stream info list for auditing
+int recommendedParallelism =
+
flinkParallelismOptimizer.calculateRecommendedParallelism(flinkInfo.getInlongStreamInfoList());
+// Ensure parallelism is at least the default value
+if (recommendedParallelism < parallelism) {
+recommendedParallelism = parallelism;
+}
+if (recommendedParallelism != parallelism) {
+log.info("switched to recommended parallelism: {}", 
recommendedParallelism);
+parallelism = recommendedParallelism;
+}
+}
+log.info("current parallelism: {}", parallelism);

Review Comment:
   Is it reasonable that when audit data cannot be obtained, the 
`recommendedParallelism` will be 0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


PeterZh6 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737802253


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java:
##
@@ -213,6 +218,22 @@ private String submitJobBySavepoint(FlinkInfo flinkInfo, 
SavepointRestoreSetting
 }).filter(Objects::nonNull).collect(Collectors.toList());
 
 Configuration configuration = 
getFlinkConfiguration(flinkInfo.getEndpoint());
+log.debug("flink info: {}", flinkInfo);
+if (flinkConfig.getDynamicParallelismEnable()) {
+
flinkParallelismOptimizer.setMaximumMessagePerSecondPerCore(flinkConfig.getMaxMsgRatePerCore());
+// get stream info list for auditing
+int recommendedParallelism =
+
flinkParallelismOptimizer.calculateRecommendedParallelism(flinkInfo.getInlongStreamInfoList());
+// Ensure parallelism is at least the default value
+if (recommendedParallelism < parallelism) {
+recommendedParallelism = parallelism;
+}
+if (recommendedParallelism != parallelism) {
+log.info("switched to recommended parallelism: {}", 
recommendedParallelism);
+parallelism = recommendedParallelism;
+}
+}
+log.info("current parallelism: {}", parallelism);

Review Comment:
   Thanks a lot for your extremely rigorous review. However, I believe with the 
comparison in the red rectangle, the minimum parallelism is ensured, unless the 
parallelsim itself in `flink-sort-plugin.properties` is illegal.
   
   
![image](https://github.com/user-attachments/assets/ad39ce48-3d53-4767-98e5-4ecd4a911c48)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]

2024-08-29 Thread via GitHub


fuweng11 commented on code in PR #10916:
URL: https://github.com/apache/inlong/pull/10916#discussion_r1737830177


##
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java:
##
@@ -213,6 +218,22 @@ private String submitJobBySavepoint(FlinkInfo flinkInfo, 
SavepointRestoreSetting
 }).filter(Objects::nonNull).collect(Collectors.toList());
 
 Configuration configuration = 
getFlinkConfiguration(flinkInfo.getEndpoint());
+log.debug("flink info: {}", flinkInfo);
+if (flinkConfig.getDynamicParallelismEnable()) {
+
flinkParallelismOptimizer.setMaximumMessagePerSecondPerCore(flinkConfig.getMaxMsgRatePerCore());
+// get stream info list for auditing
+int recommendedParallelism =
+
flinkParallelismOptimizer.calculateRecommendedParallelism(flinkInfo.getInlongStreamInfoList());
+// Ensure parallelism is at least the default value
+if (recommendedParallelism < parallelism) {
+recommendedParallelism = parallelism;
+}
+if (recommendedParallelism != parallelism) {
+log.info("switched to recommended parallelism: {}", 
recommendedParallelism);
+parallelism = recommendedParallelism;
+}
+}
+log.info("current parallelism: {}", parallelism);

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10935][SDK] Transform support DAYNAME function [inlong]

2024-08-29 Thread via GitHub


ying-hua commented on PR #10937:
URL: https://github.com/apache/inlong/pull/10937#issuecomment-2320046925

   > @ying-hua please resolve the conflict.
   
   done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org