Re: [PR] [INLONG-10944][SDK] Support Inlong Transform parser annotation [inlong]
emptyOVO commented on code in PR #10945: URL: https://github.com/apache/inlong/pull/10945#discussion_r1735672712 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParserTools.java: ## @@ -0,0 +1,64 @@ +package org.apache.inlong.sdk.transform.process.parser; + +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.schema.Column; +import org.reflections.Reflections; +import org.reflections.scanners.Scanners; + +import java.lang.reflect.Constructor; +import java.util.Map; +import java.util.Set; + +@Slf4j +public class ParserTools { +private static final String PARSER_PATH = "org.apache.inlong.sdk.transform.process.parser"; +private final static Map, Class> parserMap = Maps.newConcurrentMap(); + +static{ +init(); +} +private static void init() { +Reflections reflections = new Reflections(PARSER_PATH, Scanners.TypesAnnotated); +Set> clazzSet = reflections.getTypesAnnotatedWith(TransformParser.class); +for (Class clazz : clazzSet) { +if (ValueParser.class.isAssignableFrom(clazz)) { +TransformParser annotation = clazz.getAnnotation(TransformParser.class); +if (annotation == null) { +continue; +} +Class[] values = annotation.value(); +for (Class value : values) { +if (value==null) { +continue; +} +parserMap.compute(value, (name, former) -> { Review Comment: fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10944][SDK] Support Inlong Transform parser annotation [inlong]
vernedeng commented on code in PR #10945: URL: https://github.com/apache/inlong/pull/10945#discussion_r1735695332 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/TransformParser.java: ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import net.sf.jsqlparser.expression.Expression; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target(TYPE) +public @interface TransformParser { + +Class[] value(); Review Comment: values -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10947][Dashboard] Modify field description missing for English locale [inlong]
wohainilaodou opened a new pull request, #10948: URL: https://github.com/apache/inlong/pull/10948 Fixes #10947 ### Motivation Modify field description missing for English locale ### Modifications Modify field description missing for English locale ### Verifying this change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10931][Manager] Data preview supports data containing escape characters [inlong]
aloyszhang merged PR #10932: URL: https://github.com/apache/inlong/pull/10932 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated (4cf7ccba60 -> 6ba319369e)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 4cf7ccba60 [INLONG-10847][Dashboard] Agent type cluster node management adds restart、reinstall, install log 、heartbeat detection and unLoad operations (#10926) add 6ba319369e [INLONG-10931][Manager] Data preview supports data containing escape characters (#10932) No new revisions were added by this update. Summary of changes: inlong-manager/manager-service/pom.xml | 5 + .../manager/service/datatype/CsvDataTypeOperator.java| 16 2 files changed, 17 insertions(+), 4 deletions(-)
Re: [PR] [INLONG-10944][SDK] Support Inlong Transform parser annotation [inlong]
emptyOVO commented on code in PR #10945: URL: https://github.com/apache/inlong/pull/10945#discussion_r1735712254 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/TransformParser.java: ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import net.sf.jsqlparser.expression.Expression; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target(TYPE) +public @interface TransformParser { + +Class[] value(); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10946][SDK] Transform SQL supports SPACE function [inlong]
Zkplo opened a new pull request, #10949: URL: https://github.com/apache/inlong/pull/10949 Fixes #10946 ### Motivation Add one function classe: SpaceFunction. Also, add the corresponding unit test codes. ### Modifications ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10940][SDK] Transform SQL support arithmetic functions(Including cot, tanh, cosh, asin, atan and atan2) [inlong]
MOONSakura0614 opened a new pull request, #10950: URL: https://github.com/apache/inlong/pull/10950 Fixes #10940 ### Motivation Add six arithmetic function classes: CotFunction, TanhFunction, CoshFunction, AsinFunction, AtanFunction and Atan2Function. Also, add the corresponding unit test codes ### Modifications ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10287][Agent] Update the Redis Source [inlong]
emptyOVO commented on code in PR #10801: URL: https://github.com/apache/inlong/pull/10801#discussion_r1735782184 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ## @@ -17,54 +17,169 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; +import com.moilioncircle.redis.replicator.RedisReplicator; +import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * Redis source */ public class RedisSource extends AbstractSource { private static final Logger LOGGER = LoggerFactory.getLogger(RedisSource.class); +public InstanceProfile profile; +private String port; +private Jedis jedis; +private String hostName; +private boolean ssl; +private String authUser; +private String authPassword; +private String readTimeout; +private String replId; +private String snapShot; +private String dbNumber; +private String redisCommand; + +private String fieldOrMember; +private boolean destroyed; + +private Set keys; +private Replicator redisReplicator; +private BlockingQueue redisQueue; +private ExecutorService executor; public RedisSource() { } @Override protected String getThreadName() { -return null; +return "redis-source-" + taskId + "-" + instanceId; } @Override protected void initSource(InstanceProfile profile) { +LOGGER.info("Redis Source init: {}", profile.toJsonStr()); +this.port = profile.get(TaskConstants.TASK_REDIS_PORT); +this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME); +this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false); +this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, ""); +this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, ""); +this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, ""); +this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, ""); +this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1"); +this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0"); +this.redisCommand = profile.get(TaskConstants.TASK_REDIS_COMMAND, "get"); +this.keys = new ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(","))); +this.fieldOrMember = profile.get(TaskConstants.TASK_REDIS_FIELD_OR_NUMBER, null); +this.instanceId = profile.getInstanceId(); +this.redisQueue = new LinkedBlockingQueue<>(profile.getInt(TaskConstants.TASK_REDIS_QUEUE_SIZE, 1)); +String uri = getRedisUri(); +this.jedis = new Jedis(uri); +try { +redisReplicator = new RedisReplicator(uri); +startJedisSynchronize(); +initReplicator(); +executor = Executors.newSingleThreadExecutor(); +executor.execute(startRedisReplicator()); +} catch (URISyntaxException | IOException e) { +sourceMetric.pluginReadFailCount.addAndGet(1); +LOGGER.error("Connect to redis {}:{} failed.", hostName, port); +} +} +private void startJedisSynchronize() { +for (String key : keys) { Review Comment: fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10934][SDK] Transform support LocalDate function [inlong]
ying-hua opened a new pull request, #10951: URL: https://github.com/apache/inlong/pull/10951 Fixes #10934 ### Motivation - Add a function class: LocalDateFunction and its unit tests. - LocalDate([string1]) Returns the local date in the time zone string1(by default: string1 is system default time zone if not specified). ### Modifications ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10947][Dashboard] Modify field description missing for English locale [inlong]
aloyszhang merged PR #10948: URL: https://github.com/apache/inlong/pull/10948 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-10947][Dashboard] Modify field description missing for English locale (#10948)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 1b62fdd49e [INLONG-10947][Dashboard] Modify field description missing for English locale (#10948) 1b62fdd49e is described below commit 1b62fdd49edd3577732a8f8b7fec0bdbedb09b2e Author: kamianlaida <165994047+wohainilao...@users.noreply.github.com> AuthorDate: Thu Aug 29 16:29:45 2024 +0800 [INLONG-10947][Dashboard] Modify field description missing for English locale (#10948) --- inlong-dashboard/src/ui/locales/en.json | 1 + 1 file changed, 1 insertion(+) diff --git a/inlong-dashboard/src/ui/locales/en.json b/inlong-dashboard/src/ui/locales/en.json index c281478532..ebf2bbfcb1 100644 --- a/inlong-dashboard/src/ui/locales/en.json +++ b/inlong-dashboard/src/ui/locales/en.json @@ -838,6 +838,7 @@ "pages.Clusters.Node.Status": "Status", "pages.Clusters.Node.Status.Normal": "Normal", "pages.Clusters.Node.Status.Timeout": "Timeout", + "pages.Clusters.Node.LastModifier": "Last modifier", "pages.Clusters.Node.Creator": "Creator", "pages.Clusters.Node.Create": "Create", "pages.Clusters.Node.IpRule": "Please enter the IP address correctly",
Re: [PR] [INLONG-10883][SDK] Transform SQL support InitCap function [inlong]
dockerzhang commented on PR #10892: URL: https://github.com/apache/inlong/pull/10892#issuecomment-2317051155 @MOONSakura0614 please fix the conflicts and failed workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10893][SDK] Transform SQL support FromBase64 function [inlong]
dockerzhang commented on PR #10896: URL: https://github.com/apache/inlong/pull/10896#issuecomment-2317050159 @MOONSakura0614 please fix the conflicts and failed workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10940][SDK] Transform SQL support arithmetic functions(Including cot, tanh, cosh, asin, atan and atan2) [inlong]
aloyszhang commented on code in PR #10950: URL: https://github.com/apache/inlong/pull/10950#discussion_r1735884683 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/CotFunction.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * CotFunction + * description: cot(numeric) -- returns the cotangent of the numeric (in radians) + */ +@TransformFunction(names = {"cot"}) +public class CotFunction implements ValueParser { + +private final ValueParser valueParser; + +public CotFunction(Function expr) { +// 使用OperatorTools构建解析器来处理表达式中的参数 Review Comment: please replace with english ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/CotFunction.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * CotFunction + * description: cot(numeric) -- returns the cotangent of the numeric (in radians) + */ +@TransformFunction(names = {"cot"}) +public class CotFunction implements ValueParser { + +private final ValueParser valueParser; + +public CotFunction(Function expr) { +// 使用OperatorTools构建解析器来处理表达式中的参数 +this.valueParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); +} + +@Override +public Object parse(SourceData sourceData, int rowIndex, Context context) { +// 解析输入的参数 +Object valueObj = valueParser.parse(sourceData, rowIndex, context); + +// 将解析结果转换为BigDecimal以处理数值计算 +BigDecimal value = OperatorTools.parseBigDecimal(valueObj); + +// 计算tan(x)并取倒数以求得cot(x) Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug][Doc] Wrong referring to StarRocks [inlong-website]
aloyszhang opened a new issue, #970: URL: https://github.com/apache/inlong-website/issues/970 ### What happened mysql_starrocks_example page is referring to StarRocks as Apache StarRocks incorrectly. ### What you expected to happen StarRocks instead of Apache StarRocks ### How to reproduce see https://inlong.apache.org/docs/quick_start/data_sync/mysql_starrocks_example/ ### Environment _No response_ ### Are you willing to submit PR? - [X] Yes, I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-970][Doc] fix the wrong referring to StarRocks [inlong-website]
aloyszhang opened a new pull request, #971: URL: https://github.com/apache/inlong-website/pull/971 Fixes #970 ### Motivation ### Modifications current `Apache starrocks` to `Starrocks` ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. ### Documentation - Does this pull request introduce a new feature? ( no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Improve][Doc] Add excalidraw raw file directory [inlong-website]
aloyszhang closed issue #968: [Improve][Doc] Add excalidraw raw file directory URL: https://github.com/apache/inlong-website/issues/968 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong-website) branch master updated: [INLONG-968][Doc] Add excalidraw raw file directories (#969)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong-website.git The following commit(s) were added to refs/heads/master by this push: new 07fdec66a2 [INLONG-968][Doc] Add excalidraw raw file directories (#969) 07fdec66a2 is described below commit 07fdec66a2cbb5db9bf4887491db142a3b58b138 Author: AloysZhang AuthorDate: Thu Aug 29 18:41:23 2024 +0800 [INLONG-968][Doc] Add excalidraw raw file directories (#969) --- image_raw/administration/.gitkeep| 0 image_raw/components/.gitkeep| 0 image_raw/data_nodes/.gitkeep| 0 image_raw/deployment/.gitkeep| 0 image_raw/design_and_concept/.gitkeep| 0 image_raw/development/.gitkeep | 0 image_raw/introduction/architecture.pptx | Bin 0 -> 815539 bytes image_raw/quick_start/.gitkeep | 0 image_raw/sdk/.gitkeep | 0 image_raw/user_guide/.gitkeep| 0 10 files changed, 0 insertions(+), 0 deletions(-) diff --git a/image_raw/administration/.gitkeep b/image_raw/administration/.gitkeep new file mode 100644 index 00..e69de29bb2 diff --git a/image_raw/components/.gitkeep b/image_raw/components/.gitkeep new file mode 100644 index 00..e69de29bb2 diff --git a/image_raw/data_nodes/.gitkeep b/image_raw/data_nodes/.gitkeep new file mode 100644 index 00..e69de29bb2 diff --git a/image_raw/deployment/.gitkeep b/image_raw/deployment/.gitkeep new file mode 100644 index 00..e69de29bb2 diff --git a/image_raw/design_and_concept/.gitkeep b/image_raw/design_and_concept/.gitkeep new file mode 100644 index 00..e69de29bb2 diff --git a/image_raw/development/.gitkeep b/image_raw/development/.gitkeep new file mode 100644 index 00..e69de29bb2 diff --git a/image_raw/introduction/architecture.pptx b/image_raw/introduction/architecture.pptx new file mode 100644 index 00..f8c6e54ef6 Binary files /dev/null and b/image_raw/introduction/architecture.pptx differ diff --git a/image_raw/quick_start/.gitkeep b/image_raw/quick_start/.gitkeep new file mode 100644 index 00..e69de29bb2 diff --git a/image_raw/sdk/.gitkeep b/image_raw/sdk/.gitkeep new file mode 100644 index 00..e69de29bb2 diff --git a/image_raw/user_guide/.gitkeep b/image_raw/user_guide/.gitkeep new file mode 100644 index 00..e69de29bb2
Re: [PR] [INLONG-968][Doc] Add excalidraw raw file directories [inlong-website]
aloyszhang merged PR #969: URL: https://github.com/apache/inlong-website/pull/969 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-970][Doc] fix the wrong referring to StarRocks [inlong-website]
dockerzhang merged PR #971: URL: https://github.com/apache/inlong-website/pull/971 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug][Doc] Wrong referring to StarRocks [inlong-website]
dockerzhang closed issue #970: [Bug][Doc] Wrong referring to StarRocks URL: https://github.com/apache/inlong-website/issues/970 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong-website) branch master updated: [INLONG-970][Doc] fix the wrong referring to StarRocks (#971)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong-website.git The following commit(s) were added to refs/heads/master by this push: new 387b07f85f [INLONG-970][Doc] fix the wrong referring to StarRocks (#971) 387b07f85f is described below commit 387b07f85f399564585184c8931209a011811cd0 Author: AloysZhang AuthorDate: Thu Aug 29 18:46:26 2024 +0800 [INLONG-970][Doc] fix the wrong referring to StarRocks (#971) --- docs/quick_start/data_sync/mysql_starrocks_example.md | 2 +- .../current/quick_start/data_sync/mysql_starrocks_example.md| 2 +- .../version-1.10.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +- .../version-1.11.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +- .../version-1.12.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +- .../version-1.13.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +- .../version-1.10.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +- .../version-1.11.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +- .../version-1.12.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +- .../version-1.13.0/quick_start/data_sync/mysql_starrocks_example.md | 2 +- 10 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/quick_start/data_sync/mysql_starrocks_example.md b/docs/quick_start/data_sync/mysql_starrocks_example.md index d6b4c9f5c7..405e8b77f0 100644 --- a/docs/quick_start/data_sync/mysql_starrocks_example.md +++ b/docs/quick_start/data_sync/mysql_starrocks_example.md @@ -17,7 +17,7 @@ Before we begin, we need to install InLong. Here we provide two ways: Download the [connectors](https://inlong.apache.org/downloads/) corresponding to Flink version, and after decompression, place `sort-connector-starrocks-[version]-SNAPSHOT.jar` in `/inlong-sort/connectors/` directory. ### Install StarRocks -Please refer to the [Installation Tutorial](https://docs.starrocks.io/docs/quick_start/) on the Apache StarRocks official website +Please refer to the [Installation Tutorial](https://docs.starrocks.io/docs/quick_start/) on the StarRocks official website ## Cluster Initialize When all containers are successfully started, you can access the InLong dashboard address http://localhost, and use the following default account to log in. diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/quick_start/data_sync/mysql_starrocks_example.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/quick_start/data_sync/mysql_starrocks_example.md index 571bc65325..266872a45c 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/quick_start/data_sync/mysql_starrocks_example.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/quick_start/data_sync/mysql_starrocks_example.md @@ -17,7 +17,7 @@ sidebar_position: 2 下载与 Flink 版本对应的 [connectors](https://inlong.apache.org/zh-CN/downloads),解压后将 `sort-connector-starrocks-[version]-SNAPSHOT.jar` 放在 `/inlong-sort/connectors/` 目录下。 ### 安装 StarRocks -请参考 Apache StarRocks 官网的[安装教程](https://docs.starrocks.io/docs/quick_start/)。 +请参考 StarRocks 官网的[安装教程](https://docs.starrocks.io/docs/quick_start/)。 ## 集群初始化 容器启动成功后,访问 InLong Dashboard 地址 http://localhost,并使用以下默认账号登录: diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.10.0/quick_start/data_sync/mysql_starrocks_example.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.10.0/quick_start/data_sync/mysql_starrocks_example.md index 64b31a95bc..964c97f09a 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.10.0/quick_start/data_sync/mysql_starrocks_example.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.10.0/quick_start/data_sync/mysql_starrocks_example.md @@ -17,7 +17,7 @@ sidebar_position: 2 下载 Flink 1.13 对应版本的 [connectors](https://inlong.apache.org/zh-CN/downloads),解压后将 `sort-connector-starrocks-[version]-SNAPSHOT.jar` 放在 `/inlong-sort/connectors/` 目录下。 ### 安装 StarRocks -请参考 Apache StarRocks 官网的[安装教程](https://docs.starrocks.io/docs/quick_start/)。 +请参考 StarRocks 官网的[安装教程](https://docs.starrocks.io/docs/quick_start/)。 ## 集群初始化 容器启动成功后,访问 InLong Dashboard 地址 http://localhost,并使用以下默认账号登录: diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.11.0/quick_start/data_sync/mysql_starrocks_example.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.11.0/quick_start/data_sync/mysql_starrocks_example.md index 571bc65325..266872a45c 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.11.0/quick_start/data_sync/mysql_starrocks_example.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-1.11.0/quick_start/data_sync/mysql_starrocks_example.md @@ -17,7 +17,7 @@ sidebar_position: 2 下载与 Flink 版本对应的 [connectors](https://inlong.apache.org/zh-CN/downloads),解压后将 `sort-connector-starrocks-[version]-SNAPSHOT.jar` 放在 `/inlong-sort/conne
Re: [PR] [INLONG-10897][SDK] Transform support DATEDIFF function [inlong]
aloyszhang commented on code in PR #10925: URL: https://github.com/apache/inlong/pull/10925#discussion_r1735987352 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.List; + +/** + * DateDiffFunction + * description: DATEDIFF(d1, d2) + * - return null if one of the two parameters is null or "" + * - return null if one of the two parameters has an incorrect date format + * - return the number of days between the dates d1->d2. + */ +@TransformFunction(names = {"datediff", "date_diff"}) +public class DateDiffFunction implements ValueParser { + +private final ValueParser leftDateParser; +private final ValueParser rightDateParser; +private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME = +DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss"); +private static final DateTimeFormatter DEFAULT_FORMAT_DATE = DateTimeFormatter.ofPattern("-MM-dd"); + +public DateDiffFunction(Function expr) { +List expressions = expr.getParameters().getExpressions(); +leftDateParser = OperatorTools.buildParser(expressions.get(0)); +rightDateParser = OperatorTools.buildParser(expressions.get(1)); +} + +@Override +public Object parse(SourceData sourceData, int rowIndex, Context context) { +Object leftDateObj = leftDateParser.parse(sourceData, rowIndex, context); +Object rightDateObj = rightDateParser.parse(sourceData, rowIndex, context); +if (leftDateObj == null || rightDateObj == null) { +return null; +} +String leftDate = OperatorTools.parseString(leftDateObj); +String rightDate = OperatorTools.parseString(rightDateObj); +if (leftDate.isEmpty() || rightDate.isEmpty()) { +return null; +} +try { +LocalDate left = getLocalDate(leftDate); +LocalDate right = getLocalDate(rightDate); +return ChronoUnit.DAYS.between(right, left); +} catch (Exception e) { Review Comment: Better to print some logs here for trouble shooting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10942][SDK] Add official function names for all Transform functions [inlong]
aloyszhang commented on PR #10943: URL: https://github.com/apache/inlong/pull/10943#issuecomment-2317316437 Where is the source of truth for "official function names", and any specifications for it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10902][SDK] Transform support HEX(numeric or string) function [inlong]
vernedeng commented on code in PR #10904: URL: https://github.com/apache/inlong/pull/10904#discussion_r1736010161 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/HexFunction.java: ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.Optional; + +/** + * HexFunction + * description: If the input argument is a numeric value (such as an integer), the HEX function converts the value to the corresponding hexadecimal string. + * If the input argument is a string, the HEX function converts each character in the string to its corresponding hexadecimal ASCII encoding and returns the hexadecimal representation of the entire string. + */ +public class HexFunction implements ValueParser { + +private ValueParser valueParser; + +public HexFunction(Function expr) { +valueParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); +} + +@Override +public Object parse(SourceData sourceData, int rowIndex, Context context) { +Object valueObj = valueParser.parse(sourceData, rowIndex, context); +try { +return hex(OperatorTools.parseBigDecimal(valueObj)).toUpperCase(); +} catch (NumberFormatException e) { +return hex(OperatorTools.parseString(valueObj)); Review Comment: Maybe you can make a new method called `isBigDecimal` to check the type of input valueObj, if true, parse it in BigDecimal way; otherwise, parse it as String. ``` if (isBigDecimal(valueObj)) { return hex(OperatorTools.parseBigDecimal(valueObj)).toUpperCase(); } return hex(OperatorTools.parseString(valueObj)).toUpperCase(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
aloyszhang commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1736049185 ## inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties: ## @@ -35,3 +35,7 @@ flink.savepoint.directory=file:///data/inlong-sort/savepoints flink.parallelism=1 # flink stop request drain flink.drain=false +# max data count per core per second +flink.max.data.percore=1000 Review Comment: ```suggestion # max msg rate per core flink.max.msg.rate.percore=1000 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10902][SDK] Transform support HEX(numeric or string) function [inlong]
emptyOVO commented on code in PR #10904: URL: https://github.com/apache/inlong/pull/10904#discussion_r1736065655 ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/HexFunction.java: ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.Optional; + +/** + * HexFunction + * description: If the input argument is a numeric value (such as an integer), the HEX function converts the value to the corresponding hexadecimal string. + * If the input argument is a string, the HEX function converts each character in the string to its corresponding hexadecimal ASCII encoding and returns the hexadecimal representation of the entire string. + */ +public class HexFunction implements ValueParser { + +private ValueParser valueParser; + +public HexFunction(Function expr) { +valueParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); +} + +@Override +public Object parse(SourceData sourceData, int rowIndex, Context context) { +Object valueObj = valueParser.parse(sourceData, rowIndex, context); +try { +return hex(OperatorTools.parseBigDecimal(valueObj)).toUpperCase(); +} catch (NumberFormatException e) { +return hex(OperatorTools.parseString(valueObj)); Review Comment: > Maybe you can make a new method called `isBigDecimal` to check the type of input valueObj, if true, parse it in BigDecimal way; otherwise, parse it as String. > > ``` > if (isBigDecimal(valueObj)) { > return hex(OperatorTools.parseBigDecimal(valueObj)).toUpperCase(); > } > return hex(OperatorTools.parseString(valueObj)).toUpperCase(); > ``` i fix it as using regex to check the data ## inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/HexFunction.java: ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.Optional; + +/** + * HexFunction + * description: If the input argument is a numeric value (such as an integer), the HEX function converts the value to the corresponding hexadecimal string. + * If the input argument is a string, the HEX function converts each character in the string to its corresponding hexadecimal ASCII encoding and returns the hexadecimal representation of the entire string. + */ +public class HexFunction implements ValueParser { + +private ValueParser valueParser; + +public HexFunction(Function expr) { +valueParser = OperatorTool
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
aloyszhang commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1736084213 ## inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java: ## @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.config; +package org.apache.inlong.audit.consts; Review Comment: That makes sense, thanks for your explain. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
aloyszhang commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1736084213 ## inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java: ## @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.config; +package org.apache.inlong.audit.consts; Review Comment: That makes sense, thanks for your explanation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
aloyszhang commented on PR #10916: URL: https://github.com/apache/inlong/pull/10916#issuecomment-2317478140 @fuweng11 There are some modifications in the manager module, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10946][SDK] Transform SQL supports SPACE function [inlong]
aloyszhang merged PR #10949: URL: https://github.com/apache/inlong/pull/10949 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10934][SDK] Transform support LocalDate function [inlong]
aloyszhang merged PR #10951: URL: https://github.com/apache/inlong/pull/10951 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-10934][SDK] Transform support LocalDate function (#10951)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new f35ab6f619 [INLONG-10934][SDK] Transform support LocalDate function (#10951) f35ab6f619 is described below commit f35ab6f619004fac58a06f79b98e681e778622f0 Author: Xincheng Huang <60057611+ying-...@users.noreply.github.com> AuthorDate: Thu Aug 29 20:19:44 2024 +0800 [INLONG-10934][SDK] Transform support LocalDate function (#10951) --- .../process/function/LocalDateFunction.java| 56 ++ .../TestTransformTemporalFunctionsProcessor.java | 42 2 files changed, 98 insertions(+) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocalDateFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocalDateFunction.java new file mode 100644 index 00..ba2d909d95 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocalDateFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.time.LocalDate; +import java.time.ZoneId; + +/** + * LocalDateFunction + * description: + * localDate([string1]) returns the current date in the specified time zone. + * (by default: the current date in the system time zone) + */ +@TransformFunction(names = {"localdate", "currentdate", "current_date", "curdate"}) +public class LocalDateFunction implements ValueParser { + +private ValueParser stringParser; + +public LocalDateFunction(Function expr) { +if (expr.getParameters() != null) { +stringParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); +} +} + +@Override +public Object parse(SourceData sourceData, int rowIndex, Context context) { +if (stringParser != null) { +String zoneString = OperatorTools.parseString(stringParser.parse(sourceData, rowIndex, context)); +return LocalDate.now(ZoneId.of(zoneString)); +} else { +return LocalDate.now(ZoneId.systemDefault()); +} +} +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java index e0830dd72e..354b8b7f18 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java @@ -29,6 +29,7 @@ import org.junit.Before; import org.junit.Test; import java.time.Duration; +import java.time.LocalDate; import java.time.LocalTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -421,6 +422,47 @@ public class TestTransformTemporalFunctionsProcessor { Assert.assertTrue(duration3.getSeconds() < 1); } +@Test +public void testLocalDateFunction() throws Exception { +DateTimeFormatter formatter = DateTimeFormatter.ofPattern("-MM-dd"); + +// case1: localDate() - default system time zone +String transformSql1 = "select localdate() from source"; +TransformConfig config1 = new TransformConfig(transformSql1); +TransformProcessor processor1 = TransformProcessor +.create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), +SinkEncoderFactory.createKvEncoder(kvSink)); +List outp
(inlong) branch master updated (1b62fdd49e -> a9719038f1)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 1b62fdd49e [INLONG-10947][Dashboard] Modify field description missing for English locale (#10948) add a9719038f1 [INLONG-10946][SDK] Transform SQL supports SPACE function (#10949) No new revisions were added by this update. Summary of changes: .../{Md5Function.java => SpaceFunction.java} | 35 +++ .../TestTransformStringFunctionsProcessor.java | 39 ++ 2 files changed, 60 insertions(+), 14 deletions(-) copy inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/{Md5Function.java => SpaceFunction.java} (62%)
Re: [PR] [INLONG-10939][SDK] Transform SQL supports STRCMP function [inlong]
dockerzhang merged PR #10941: URL: https://github.com/apache/inlong/pull/10941 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-10939][SDK] Transform SQL supports STRCMP function (#10941)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 7cfbf8a74b [INLONG-10939][SDK] Transform SQL supports STRCMP function (#10941) 7cfbf8a74b is described below commit 7cfbf8a74bb9570daa5171ed9300420fb37e1489 Author: Zkplo <87751516+zk...@users.noreply.github.com> AuthorDate: Fri Aug 30 09:53:02 2024 +0800 [INLONG-10939][SDK] Transform SQL supports STRCMP function (#10941) Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com> --- .../transform/process/function/StrcmpFunction.java | 67 ++ .../TestTransformStringFunctionsProcessor.java | 42 ++ 2 files changed, 109 insertions(+) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/StrcmpFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/StrcmpFunction.java new file mode 100644 index 00..ebbcc94ae9 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/StrcmpFunction.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * StrcmpFunction + * description: strcmp(s1,s2) + * return NULL if either argument is NULL + * return 0 if the strings are the same + * return -1 if the first argument is smaller than the second according to the current sort order + * return 1 otherwise + */ +@TransformFunction(names = {"strcmp"}) +public class StrcmpFunction implements ValueParser { + +private final ValueParser leftStringParser; +private final ValueParser rightStringParser; + +public StrcmpFunction(Function expr) { +List expressions = expr.getParameters().getExpressions(); +leftStringParser = OperatorTools.buildParser(expressions.get(0)); +rightStringParser = OperatorTools.buildParser(expressions.get(1)); +} + +@Override +public Object parse(SourceData sourceData, int rowIndex, Context context) { +Object leftStringObj = leftStringParser.parse(sourceData, rowIndex, context); +Object rightStringObj = rightStringParser.parse(sourceData, rowIndex, context); +if (leftStringObj == null || rightStringObj == null) { +return null; +} +String leftString = OperatorTools.parseString(leftStringObj); +String rightString = OperatorTools.parseString(rightStringObj); +int cmp = OperatorTools.compareValue(leftString, rightString); +if (cmp > 0) { +return 1; +} else if (cmp < 0) { +return -1; +} +return 0; +} +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java index d8576d12fc..10cfa740c9 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java @@ -386,6 +386,48 @@ public class TestTransformStringFunctionsProcessor { Assert.assertEquals(output7.get(0), "result=da"); } +@Test +public void testStrcmpFunction() throws Exception { +String transformSql = null, data = null; +TransformConfig config = null; +TransformProcessor processor = null; +List output = null; + +transformSql = "select strcmp(string1,string2) fro
Re: [PR] [INLONG-10935][SDK] Transform support DAYNAME function [inlong]
aloyszhang commented on PR #10937: URL: https://github.com/apache/inlong/pull/10937#issuecomment-2319692262 @ying-hua please resolve the conflict. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
fuweng11 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737670802 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import org.apache.inlong.audit.AuditIdEnum; +import org.apache.inlong.audit.entity.FlowType; +import org.apache.inlong.manager.pojo.audit.AuditInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.StringJoiner; + +import static org.apache.inlong.audit.consts.OpenApiConstants.*; +import static org.apache.inlong.manager.common.consts.InlongConstants.*; + +/** + * This class is used to calculate the recommended parallelism based on the maximum message per second per core. + * The data volume is calculated based on the average data count per hour. + * The data count is retrieved from the inlong audit API. + */ +@Slf4j +@Component +public class FlinkParallelismOptimizer { + +@Value("${audit.query.url:http://127.0.0.1:10080}";) +public String auditQueryUrl; + +private static final int MAX_PARALLELISM = 2048; +private long maximumMessagePerSecondPerCore = 1000L; +private static final long DEFAULT_ERROR_DATA_VOLUME = 0L; +private static final FlowType DEFAULT_FLOWTYPE = FlowType.OUTPUT; +private static final String DEFAULT_AUDIT_TYPE = "DataProxy"; +private static final String AUDIT_CYCLE_REALTIME = "1"; +// maxmimum data scale counting range in hours +private static final int DATA_SCALE_COUNTING_RANGE_IN_HOURS = 1; +// sample time format: 2024-08-23T22:47:38.866 +private static final String AUDIT_QUERY_DATE_TIME_FORMAT = "-MM-dd'T'HH:mm:ss.SSS"; + +private static final String LOGTS_DATE_TIME_FORMAT = "-MM-dd HH:mm:ss"; +private static final String TIMEZONE_REGEX = "([+-])(\\d):"; + +/** + * Calculate recommended parallelism based on maximum message per second per core + * + * @return Recommended parallelism + */ +public int calculateRecommendedParallelism(List streamInfos) { +long averageDataVolume; +InlongStreamInfo streamInfo = streamInfos.get(0); +try { +averageDataVolume = getAverageDataVolume(streamInfo); +log.info("Retrieved data volume: {}", averageDataVolume); +} catch (Exception e) { +log.error("Error retrieving data volume: {}", e.getMessage(), e); +log.warn("Using default data volume: {}", DEFAULT_ERROR_DATA_VOLUME); +averageDataVolume = DEFAULT_ERROR_DATA_VOLUME; +} +int newParallelism = (int) (averageDataVolume / maximumMessagePerSecondPerCore); +// Ensure parallelism is at most MAX_PARALLELISM +newParallelism = Math.min(newParallelism, MAX_PARALLELISM); +log.info("Calculated parallelism: {} for data volume: {}", newParallelism, averageDataVolume); +return newParallelism; +} + +/** + * Initialize maximum message per second per core based on configuration + * + * @param maximumMessagePerSecondPerCore The maximum messages per second per core + */ +public void setMaximumMessagePerSecondPerCore(Integer maximumMessagePerSecondPerCore) { +if (maximumMessagePerSecondPerCore == null || maximumMessagePerSecondPerCore <= 0) { +log.error("Illegal flink.maxpercore property, mus
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
fuweng11 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737673199 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import org.apache.inlong.audit.AuditIdEnum; +import org.apache.inlong.audit.entity.FlowType; +import org.apache.inlong.manager.pojo.audit.AuditInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.StringJoiner; + +import static org.apache.inlong.audit.consts.OpenApiConstants.*; +import static org.apache.inlong.manager.common.consts.InlongConstants.*; + +/** + * This class is used to calculate the recommended parallelism based on the maximum message per second per core. + * The data volume is calculated based on the average data count per hour. + * The data count is retrieved from the inlong audit API. + */ +@Slf4j +@Component +public class FlinkParallelismOptimizer { + +@Value("${audit.query.url:http://127.0.0.1:10080}";) Review Comment: Is it certain that this annotation will take effect in plugins? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
fuweng11 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737676926 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import org.apache.inlong.audit.AuditIdEnum; +import org.apache.inlong.audit.entity.FlowType; +import org.apache.inlong.manager.pojo.audit.AuditInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.StringJoiner; + +import static org.apache.inlong.audit.consts.OpenApiConstants.*; +import static org.apache.inlong.manager.common.consts.InlongConstants.*; + +/** + * This class is used to calculate the recommended parallelism based on the maximum message per second per core. + * The data volume is calculated based on the average data count per hour. + * The data count is retrieved from the inlong audit API. + */ +@Slf4j +@Component +public class FlinkParallelismOptimizer { + +@Value("${audit.query.url:http://127.0.0.1:10080}";) +public String auditQueryUrl; + +private static final int MAX_PARALLELISM = 2048; +private long maximumMessagePerSecondPerCore = 1000L; +private static final long DEFAULT_ERROR_DATA_VOLUME = 0L; +private static final FlowType DEFAULT_FLOWTYPE = FlowType.OUTPUT; +private static final String DEFAULT_AUDIT_TYPE = "DataProxy"; +private static final String AUDIT_CYCLE_REALTIME = "1"; +// maxmimum data scale counting range in hours +private static final int DATA_SCALE_COUNTING_RANGE_IN_HOURS = 1; +// sample time format: 2024-08-23T22:47:38.866 +private static final String AUDIT_QUERY_DATE_TIME_FORMAT = "-MM-dd'T'HH:mm:ss.SSS"; + +private static final String LOGTS_DATE_TIME_FORMAT = "-MM-dd HH:mm:ss"; +private static final String TIMEZONE_REGEX = "([+-])(\\d):"; + +/** + * Calculate recommended parallelism based on maximum message per second per core + * + * @return Recommended parallelism + */ +public int calculateRecommendedParallelism(List streamInfos) { +long averageDataVolume; +InlongStreamInfo streamInfo = streamInfos.get(0); +try { +averageDataVolume = getAverageDataVolume(streamInfo); +log.info("Retrieved data volume: {}", averageDataVolume); +} catch (Exception e) { +log.error("Error retrieving data volume: {}", e.getMessage(), e); +log.warn("Using default data volume: {}", DEFAULT_ERROR_DATA_VOLUME); Review Comment: Please delete one of the warm and error logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
PeterZh6 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737677041 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import org.apache.inlong.audit.AuditIdEnum; +import org.apache.inlong.audit.entity.FlowType; +import org.apache.inlong.manager.pojo.audit.AuditInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.StringJoiner; + +import static org.apache.inlong.audit.consts.OpenApiConstants.*; +import static org.apache.inlong.manager.common.consts.InlongConstants.*; + +/** + * This class is used to calculate the recommended parallelism based on the maximum message per second per core. + * The data volume is calculated based on the average data count per hour. + * The data count is retrieved from the inlong audit API. + */ +@Slf4j +@Component +public class FlinkParallelismOptimizer { + +@Value("${audit.query.url:http://127.0.0.1:10080}";) Review Comment: https://github.com/user-attachments/assets/fd795b0f-40b3-4a91-b033-b8e6e1f0f989";> Yes, as I used `ApplicationContextProvider` to instantiate this Spring bean in `FlinkSerivice`, even though `FlinkService` is not Spring-managed. Attached is the log of `manager` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
fuweng11 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737678355 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import org.apache.inlong.audit.AuditIdEnum; +import org.apache.inlong.audit.entity.FlowType; +import org.apache.inlong.manager.pojo.audit.AuditInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.StringJoiner; + +import static org.apache.inlong.audit.consts.OpenApiConstants.*; +import static org.apache.inlong.manager.common.consts.InlongConstants.*; + +/** + * This class is used to calculate the recommended parallelism based on the maximum message per second per core. + * The data volume is calculated based on the average data count per hour. + * The data count is retrieved from the inlong audit API. + */ +@Slf4j +@Component +public class FlinkParallelismOptimizer { + +@Value("${audit.query.url:http://127.0.0.1:10080}";) Review Comment: This annotation in the plugins directory will not be scanned during loading. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
PeterZh6 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737681783 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import org.apache.inlong.audit.AuditIdEnum; +import org.apache.inlong.audit.entity.FlowType; +import org.apache.inlong.manager.pojo.audit.AuditInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.StringJoiner; + +import static org.apache.inlong.audit.consts.OpenApiConstants.*; +import static org.apache.inlong.manager.common.consts.InlongConstants.*; + +/** + * This class is used to calculate the recommended parallelism based on the maximum message per second per core. + * The data volume is calculated based on the average data count per hour. + * The data count is retrieved from the inlong audit API. + */ +@Slf4j +@Component +public class FlinkParallelismOptimizer { + +@Value("${audit.query.url:http://127.0.0.1:10080}";) Review Comment: Could you further explain? I didn't quite get it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
PeterZh6 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737681783 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import org.apache.inlong.audit.AuditIdEnum; +import org.apache.inlong.audit.entity.FlowType; +import org.apache.inlong.manager.pojo.audit.AuditInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.StringJoiner; + +import static org.apache.inlong.audit.consts.OpenApiConstants.*; +import static org.apache.inlong.manager.common.consts.InlongConstants.*; + +/** + * This class is used to calculate the recommended parallelism based on the maximum message per second per core. + * The data volume is calculated based on the average data count per hour. + * The data count is retrieved from the inlong audit API. + */ +@Slf4j +@Component +public class FlinkParallelismOptimizer { + +@Value("${audit.query.url:http://127.0.0.1:10080}";) Review Comment: Could you further explain? I didn't quite get it. As it did work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
fuweng11 commented on PR #10916: URL: https://github.com/apache/inlong/pull/10916#issuecomment-2319745536 When a group is a real-time synchronization task, there is no dataproxy present. How to handle this situation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
PeterZh6 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737708162 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import org.apache.inlong.audit.AuditIdEnum; +import org.apache.inlong.audit.entity.FlowType; +import org.apache.inlong.manager.pojo.audit.AuditInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.StringJoiner; + +import static org.apache.inlong.audit.consts.OpenApiConstants.*; +import static org.apache.inlong.manager.common.consts.InlongConstants.*; + +/** + * This class is used to calculate the recommended parallelism based on the maximum message per second per core. + * The data volume is calculated based on the average data count per hour. + * The data count is retrieved from the inlong audit API. + */ +@Slf4j +@Component +public class FlinkParallelismOptimizer { + +@Value("${audit.query.url:http://127.0.0.1:10080}";) Review Comment: Or could you suggest a way to read the property? When the Spring profile changes, it is quite clumsy to get the file name of the configuration file. Therefore, I can only think of using the `@value` annotation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
fuweng11 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737719535 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import org.apache.inlong.audit.AuditIdEnum; +import org.apache.inlong.audit.entity.FlowType; +import org.apache.inlong.manager.pojo.audit.AuditInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.StringJoiner; + +import static org.apache.inlong.audit.consts.OpenApiConstants.*; +import static org.apache.inlong.manager.common.consts.InlongConstants.*; + +/** + * This class is used to calculate the recommended parallelism based on the maximum message per second per core. + * The data volume is calculated based on the average data count per hour. + * The data count is retrieved from the inlong audit API. + */ +@Slf4j +@Component +public class FlinkParallelismOptimizer { + +@Value("${audit.query.url:http://127.0.0.1:10080}";) Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
fuweng11 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737738348 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import org.apache.inlong.audit.AuditIdEnum; +import org.apache.inlong.audit.entity.FlowType; +import org.apache.inlong.manager.pojo.audit.AuditInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.StringJoiner; + +import static org.apache.inlong.audit.consts.OpenApiConstants.*; +import static org.apache.inlong.manager.common.consts.InlongConstants.*; + +/** + * This class is used to calculate the recommended parallelism based on the maximum message per second per core. + * The data volume is calculated based on the average data count per hour. + * The data count is retrieved from the inlong audit API. + */ +@Slf4j +@Component +public class FlinkParallelismOptimizer { + +@Value("${audit.query.url:http://127.0.0.1:10080}";) +public String auditQueryUrl; + +private static final int MAX_PARALLELISM = 2048; +private long maximumMessagePerSecondPerCore = 1000L; +private static final long DEFAULT_ERROR_DATA_VOLUME = 0L; +private static final FlowType DEFAULT_FLOWTYPE = FlowType.OUTPUT; +private static final String DEFAULT_AUDIT_TYPE = "DataProxy"; +private static final String AUDIT_CYCLE_REALTIME = "1"; +// maxmimum data scale counting range in hours +private static final int DATA_SCALE_COUNTING_RANGE_IN_HOURS = 1; +// sample time format: 2024-08-23T22:47:38.866 +private static final String AUDIT_QUERY_DATE_TIME_FORMAT = "-MM-dd'T'HH:mm:ss.SSS"; + +private static final String LOGTS_DATE_TIME_FORMAT = "-MM-dd HH:mm:ss"; +private static final String TIMEZONE_REGEX = "([+-])(\\d):"; + +/** + * Calculate recommended parallelism based on maximum message per second per core + * + * @return Recommended parallelism + */ +public int calculateRecommendedParallelism(List streamInfos) { +long averageDataVolume; +InlongStreamInfo streamInfo = streamInfos.get(0); +try { +averageDataVolume = getAverageDataVolume(streamInfo); +log.info("Retrieved data volume: {}", averageDataVolume); +} catch (Exception e) { +log.error("Error retrieving data volume: {}", e.getMessage(), e); +log.warn("Using default data volume: {}", DEFAULT_ERROR_DATA_VOLUME); +averageDataVolume = DEFAULT_ERROR_DATA_VOLUME; +} +int newParallelism = (int) (averageDataVolume / maximumMessagePerSecondPerCore); +// Ensure parallelism is at most MAX_PARALLELISM +newParallelism = Math.min(newParallelism, MAX_PARALLELISM); +log.info("Calculated parallelism: {} for data volume: {}", newParallelism, averageDataVolume); +return newParallelism; +} + +/** + * Initialize maximum message per second per core based on configuration + * + * @param maximumMessagePerSecondPerCore The maximum messages per second per core + */ +public void setMaximumMessagePerSecondPerCore(Integer maximumMessagePerSecondPerCore) { +if (maximumMessagePerSecondPerCore == null || maximumMessagePerSecondPerCore <= 0) { +log.error("Illegal flink.maxpercore property, mus
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
PeterZh6 commented on PR #10916: URL: https://github.com/apache/inlong/pull/10916#issuecomment-2319790130 > When a group is a real-time synchronization task, there is no dataproxy present. How to handle this situation.Is there a default value when dataproxy does not exist? When no audit data of DataProxy is present, the `entity` should be `null`, causing `parseResponseAndCalculateAverageDataVolume` to return `DEFAULT_ERROR_DATA_VOLUME`, which is `0L`  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
PeterZh6 commented on PR #10916: URL: https://github.com/apache/inlong/pull/10916#issuecomment-2319808998  Effect of adjusting parallelism is shown in the picture. (Failure is caused by resource limitation of my computer) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
fuweng11 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737778216 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java: ## @@ -213,6 +218,22 @@ private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSetting }).filter(Objects::nonNull).collect(Collectors.toList()); Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint()); +log.debug("flink info: {}", flinkInfo); +if (flinkConfig.getDynamicParallelismEnable()) { + flinkParallelismOptimizer.setMaximumMessagePerSecondPerCore(flinkConfig.getMaxMsgRatePerCore()); +// get stream info list for auditing +int recommendedParallelism = + flinkParallelismOptimizer.calculateRecommendedParallelism(flinkInfo.getInlongStreamInfoList()); +// Ensure parallelism is at least the default value +if (recommendedParallelism < parallelism) { +recommendedParallelism = parallelism; +} +if (recommendedParallelism != parallelism) { +log.info("switched to recommended parallelism: {}", recommendedParallelism); +parallelism = recommendedParallelism; +} +} +log.info("current parallelism: {}", parallelism); Review Comment: Is it reasonable that when audit data cannot be obtained, the `recommendedParallelism` will be 0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
PeterZh6 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737802253 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java: ## @@ -213,6 +218,22 @@ private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSetting }).filter(Objects::nonNull).collect(Collectors.toList()); Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint()); +log.debug("flink info: {}", flinkInfo); +if (flinkConfig.getDynamicParallelismEnable()) { + flinkParallelismOptimizer.setMaximumMessagePerSecondPerCore(flinkConfig.getMaxMsgRatePerCore()); +// get stream info list for auditing +int recommendedParallelism = + flinkParallelismOptimizer.calculateRecommendedParallelism(flinkInfo.getInlongStreamInfoList()); +// Ensure parallelism is at least the default value +if (recommendedParallelism < parallelism) { +recommendedParallelism = parallelism; +} +if (recommendedParallelism != parallelism) { +log.info("switched to recommended parallelism: {}", recommendedParallelism); +parallelism = recommendedParallelism; +} +} +log.info("current parallelism: {}", parallelism); Review Comment: Thanks a lot for your extremely rigorous review. However, I believe with the comparison in the red rectangle, the minimum parallelism is ensured, unless the parallelsim itself in `flink-sort-plugin.properties` is illegal.  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-7056][Sort] Adjust sort resources according to data scale [inlong]
fuweng11 commented on code in PR #10916: URL: https://github.com/apache/inlong/pull/10916#discussion_r1737830177 ## inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java: ## @@ -213,6 +218,22 @@ private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSetting }).filter(Objects::nonNull).collect(Collectors.toList()); Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint()); +log.debug("flink info: {}", flinkInfo); +if (flinkConfig.getDynamicParallelismEnable()) { + flinkParallelismOptimizer.setMaximumMessagePerSecondPerCore(flinkConfig.getMaxMsgRatePerCore()); +// get stream info list for auditing +int recommendedParallelism = + flinkParallelismOptimizer.calculateRecommendedParallelism(flinkInfo.getInlongStreamInfoList()); +// Ensure parallelism is at least the default value +if (recommendedParallelism < parallelism) { +recommendedParallelism = parallelism; +} +if (recommendedParallelism != parallelism) { +log.info("switched to recommended parallelism: {}", recommendedParallelism); +parallelism = recommendedParallelism; +} +} +log.info("current parallelism: {}", parallelism); Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10935][SDK] Transform support DAYNAME function [inlong]
ying-hua commented on PR #10937: URL: https://github.com/apache/inlong/pull/10937#issuecomment-2320046925 > @ying-hua please resolve the conflict. done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org