[GitHub] [inlong] dockerzhang commented on issue #5643: [Bug][CVE] There is a vulnerability in Apache Flume 1.9.0
dockerzhang commented on issue #5643: URL: https://github.com/apache/inlong/issues/5643#issuecomment-1304435058 updating the flume version will affect a lot for dataproxy module, and we could update it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang commented on issue #6105: [Umbrella] [Manager] Manager support delete/update by unique key
dockerzhang commented on issue #6105: URL: https://github.com/apache/inlong/issues/6105#issuecomment-1304435627 All sub-tasks are finished, so close it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #6407: [INLONG-6401][Sort] Schema update stuck in dead loop cause stackoverflow in multiple sink scences
dockerzhang merged PR #6407: URL: https://github.com/apache/inlong/pull/6407 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop cause (#6407)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new a3d9d597b [INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop cause (#6407) a3d9d597b is described below commit a3d9d597b3787a1d42ac79091cc3ae2007b751c9 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Sat Nov 5 15:57:58 2022 +0800 [INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop cause (#6407) Co-authored-by: thesumery <158971...@qq.com> --- .../sink/multiple/DynamicSchemaHandleOperator.java | 30 +++--- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java index e7fe68127..a0a9092c6 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java @@ -229,10 +229,12 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator tableChanges = SchemaChangeUtils.diffSchema(oldSchema, newSchema); -if (canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) { - SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges); -LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges); +if (!canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) { +// If can not handle this schema update, should not push data into next operator +return; } +SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges); +LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges); } transaction.commitTransaction(); handleSchemaInfoEvent(tableId, table.schema()); @@ -270,22 +272,20 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator tableChanges) { boolean canHandle = true; for (TableChange tableChange : tableChanges) { -if (tableChange instanceof AddColumn) { -canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, -multipleSinkOption.getSchemaUpdatePolicy()); -} else { -if (MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, -multipleSinkOption.getSchemaUpdatePolicy())) { -LOG.info("Ignore table {} schema change: {} because iceberg can't handle it.", -tableId, tableChange); -} +canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, +multipleSinkOption.getSchemaUpdatePolicy()); +if (!(tableChange instanceof AddColumn)) { // todo:currently iceberg can only handle addColumn, so always return false +LOG.info("Ignore table {} schema change: {} because iceberg can't handle it.", +tableId, tableChange); canHandle = false; } +if (!canHandle) { +blacklist.add(tableId); +break; +} } -if (!canHandle) { -blacklist.add(tableId); -} + return canHandle; } }
[GitHub] [inlong] dockerzhang commented on issue #5469: [Feature][Sort] Add metric for Hive with Flink metrics group and audit SDK
dockerzhang commented on issue #5469: URL: https://github.com/apache/inlong/issues/5469#issuecomment-1304445376 this issue has finished, close it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6411: [INLONG-6410][Manager] Ensure that the additional parameters of group and stream can be saved
healchow commented on code in PR #6411: URL: https://github.com/apache/inlong/pull/6411#discussion_r1014638247 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java: ## @@ -61,15 +63,25 @@ public InlongTubeMQInfo getFromEntity(InlongGroupEntity entity) { InlongTubeMQInfo groupInfo = new InlongTubeMQInfo(); CommonBeanUtils.copyProperties(entity, groupInfo); - +if (StringUtils.isNotBlank(entity.getExtParams())) { +InlongTubeMQDTO dto = InlongTubeMQDTO.getFromJson(entity.getExtParams()); +CommonBeanUtils.copyProperties(dto, groupInfo); +} // TODO get the cluster // groupInfo.setTubeMaster(); return groupInfo; } @Override protected void setTargetEntity(InlongGroupRequest request, InlongGroupEntity targetEntity) { -LOGGER.info("do nothing for inlong group with TubeMQ"); +InlongTubeMQRequest tubeMQRequest = (InlongTubeMQRequest) request; +try { +InlongTubeMQDTO dto = InlongTubeMQDTO.getFromRequest(tubeMQRequest); +targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); +} catch (Exception e) { +throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); +} +LOGGER.info("success set entity for inlong group with TubeMQ"); Review Comment: Not necessary to add this info log, just remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6411: [INLONG-6410][Manager] Ensure that the additional parameters of group and stream can be saved
healchow commented on code in PR #6411: URL: https://github.com/apache/inlong/pull/6411#discussion_r1014638768 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BaseInlongConsume.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume; + +import io.swagger.annotations.ApiModel; +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * The base parameter class of InlongGroup, support user extend their own business params. + */ +@Data +@AllArgsConstructor +@ApiModel("Base info of inlong group") +public class BaseInlongConsume { + +// you can add extend parameters in this class + +} Review Comment: Suggest adding one blank line after the end of `}`, then it will not appear this: https://user-images.githubusercontent.com/31994335/200123281-0c60a7de-6215-477d-be2e-29ca2509f801.png";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] 01/03: [INLONG-6379][Sort] Iceberg misses metric data in multiple sink scenes (#6381)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git commit 7617fa00748b5e9d8fa1d6a7998885f461e91b60 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Fri Nov 4 16:34:50 2022 +0800 [INLONG-6379][Sort] Iceberg misses metric data in multiple sink scenes (#6381) --- .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 3 +- .../sink/multiple/IcebergMultipleStreamWriter.java | 59 +- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java index bb7498650..25f9e963b 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java @@ -528,7 +528,8 @@ public class FlinkSink { .setParallelism(parallelism); IcebergProcessOperator streamWriter = -new IcebergProcessOperator(new IcebergMultipleStreamWriter(appendMode, catalogLoader)); +new IcebergProcessOperator(new IcebergMultipleStreamWriter( +appendMode, catalogLoader, inlongMetric, auditHostAndPorts)); SingleOutputStreamOperator writerStream = routeStream .transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME), TypeInformation.of(IcebergProcessOperator.class), diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java index 4c3fb0045..617eb6d69 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java @@ -18,6 +18,10 @@ package org.apache.inlong.sort.iceberg.sink.multiple; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -34,10 +38,16 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.PropertyUtil; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.Closeable; import java.util.HashMap; import java.util.List; @@ -52,6 +62,9 @@ import static org.apache.iceberg.TableProperties.UPSERT_ENABLED; import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; /** * Iceberg writer that can distinguish different sink tables and route and distribute data into different @@ -70,9 +83,23 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction multipleSchemas; private transient FunctionInitializationContext functionInitializationContext; -public IcebergMultipleStreamWriter(boolean appendMode, CatalogLoader catalogLoader) { +// metric +private final String inlongMetric; +private final String auditHostAndPorts; +@Nullable +private transient SinkMetricData metricData; +private transient ListState metricStateListState; +private transient MetricState metricState; + +public IcebergMultipleStreamWriter( +boolean appendMode, +CatalogLoader catalogLoader, +String inlongMetric, +String auditHostAndPorts) {
[inlong] branch master updated (a3d9d597b -> d6874a8b8)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git omit a3d9d597b [INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop cause (#6407) omit d4d4dd5b4 [INLONG-6332][Agent] Fix reboot will reset file position error (#6333) omit 9228a22d6 [INLONG-6379][Sort] Bugfix:iceberg miss metric data in multiple sink (#6381) new 7617fa007 [INLONG-6379][Sort] Iceberg misses metric data in multiple sink scenes (#6381) new 1c790a399 [INLONG-6332][Agent] Fix reboot will reset file position error (#6333) new d6874a8b8 [INLONG-6401][Sort] Schema update causes stack overflow in multiple sink scenes (#6407) This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (a3d9d597b) \ N -- N -- N refs/heads/master (d6874a8b8) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes:
[inlong] 03/03: [INLONG-6401][Sort] Schema update causes stack overflow in multiple sink scenes (#6407)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git commit d6874a8b8c363a71d44182db626c62322ad02e41 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Sat Nov 5 15:57:58 2022 +0800 [INLONG-6401][Sort] Schema update causes stack overflow in multiple sink scenes (#6407) --- .../sink/multiple/DynamicSchemaHandleOperator.java | 30 +++--- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java index e7fe68127..a0a9092c6 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java @@ -229,10 +229,12 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator tableChanges = SchemaChangeUtils.diffSchema(oldSchema, newSchema); -if (canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) { - SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges); -LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges); +if (!canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) { +// If can not handle this schema update, should not push data into next operator +return; } +SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges); +LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges); } transaction.commitTransaction(); handleSchemaInfoEvent(tableId, table.schema()); @@ -270,22 +272,20 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator tableChanges) { boolean canHandle = true; for (TableChange tableChange : tableChanges) { -if (tableChange instanceof AddColumn) { -canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, -multipleSinkOption.getSchemaUpdatePolicy()); -} else { -if (MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, -multipleSinkOption.getSchemaUpdatePolicy())) { -LOG.info("Ignore table {} schema change: {} because iceberg can't handle it.", -tableId, tableChange); -} +canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, +multipleSinkOption.getSchemaUpdatePolicy()); +if (!(tableChange instanceof AddColumn)) { // todo:currently iceberg can only handle addColumn, so always return false +LOG.info("Ignore table {} schema change: {} because iceberg can't handle it.", +tableId, tableChange); canHandle = false; } +if (!canHandle) { +blacklist.add(tableId); +break; +} } -if (!canHandle) { -blacklist.add(tableId); -} + return canHandle; } }
[inlong] 02/03: [INLONG-6332][Agent] Fix reboot will reset file position error (#6333)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git commit 1c790a399ea7bb9e964d8215e7d65584aba53705 Author: ganfengtan AuthorDate: Fri Nov 4 21:18:58 2022 +0800 [INLONG-6332][Agent] Fix reboot will reset file position error (#6333) Co-authored-by: healchow --- .../agent/core/task/TaskPositionManager.java | 2 + .../sources/reader/file/MonitorTextFile.java | 101 - 2 files changed, 60 insertions(+), 43 deletions(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java index c3bfa0abe..c5b944210 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java @@ -139,6 +139,8 @@ public class TaskPositionManager extends AbstractDaemon { ConcurrentHashMap positionTemp = new ConcurrentHashMap<>(); ConcurrentHashMap position = jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp); if (position == null) { +JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId); +positionTemp.put(sourcePath, jobProfile.getLong(sourcePath + POSITION_SUFFIX, 0)); position = positionTemp; } Long beforePosition = position.getOrDefault(sourcePath, 0L); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java index d8160bd8b..1f5958cdf 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java @@ -21,10 +21,10 @@ import org.apache.inlong.agent.common.AgentThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.attribute.BasicFileAttributes; -import java.util.Objects; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -35,26 +35,28 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_EXP import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INTERVAL; /** - * monitor files + * Monitor for text files */ public final class MonitorTextFile { private static final Logger LOGGER = LoggerFactory.getLogger(MonitorTextFile.class); -private static volatile MonitorTextFile monitorTextFile = null; // monitor thread pool -private final ThreadPoolExecutor runningPool = new ThreadPoolExecutor( +private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new AgentThreadFactory("monitor-file")); +private static volatile MonitorTextFile monitorTextFile = null; + private MonitorTextFile() { } /** - * Mode of singleton - * @return MonitorTextFile instance + * Get a singleton instance of MonitorTextFile. + * + * @return monitor text file instance */ public static MonitorTextFile getInstance() { if (monitorTextFile == null) { @@ -68,37 +70,35 @@ public final class MonitorTextFile { } public void monitor(FileReaderOperator fileReaderOperator, TextFileReader textFileReader) { -MonitorEventRunnable monitorEvent = new MonitorEventRunnable(fileReaderOperator, textFileReader); -runningPool.execute(monitorEvent); +EXECUTOR_SERVICE.execute(new MonitorEventRunnable(fileReaderOperator, textFileReader)); } /** - * monitor file event + * Runnable for monitor the file event */ -private class MonitorEventRunnable implements Runnable { +private static class MonitorEventRunnable implements Runnable { private static final int WAIT_TIME = 30; private final FileReaderOperator fileReaderOperator; private final TextFileReader textFileReader; private final Long interval; private final long startTime = System.currentTimeMillis(); +private long lastFlushTime = System.currentTimeMillis(); private String path; -/** - * the last modify time of the file - */ + +// the last modify time of the file private BasicFileAttributes attributesBefore; -public MonitorEventRunnable(File
[GitHub] [inlong] gosonzhang opened a new pull request, #6413: [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started
gosonzhang opened a new pull request, #6413: URL: https://github.com/apache/inlong/pull/6413 1. I added an error SINK_SERVICE_UNREADY: if MQ cluster is not configured, or if MQ cluster is configured but initialization fails, DataProxy will not serve externally; it will only serve after initialization is successful; 2. Adjusted the error code of HTTP access and incorporated it into the DataProxyErrCode enumeration - Fixes #6406 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] github-code-scanning[bot] commented on a diff in pull request #6413: [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started
github-code-scanning[bot] commented on code in PR #6413: URL: https://github.com/apache/inlong/pull/6413#discussion_r1014650437 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java: ## @@ -49,81 +51,98 @@ } @Override -public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException { +public void doFilter(ServletRequest request, + ServletResponse response, + FilterChain chain) throws IOException { HttpServletRequest req = (HttpServletRequest) request; HttpServletResponse resp = (HttpServletResponse) response; -int code = StatusCode.SUCCESS; -String message = "success"; - String pathInfo = req.getPathInfo(); if (pathInfo.startsWith("/")) { pathInfo = pathInfo.substring(1); } if ("heartbeat".equals(pathInfo)) { -resp.setCharacterEncoding(req.getCharacterEncoding()); -resp.setStatus(HttpServletResponse.SC_OK); -resp.flushBuffer(); +returnRspPackage(resp, req.getCharacterEncoding(), +DataProxyErrCode.SUCCESS.getErrCode(), +DataProxyErrCode.SUCCESS.getErrMsg()); return; } - -String invalidKey = null; +// check sink service status +if (!ConfigManager.getInstance().isMqClusterReady()) { +returnRspPackage(resp, req.getCharacterEncoding(), +DataProxyErrCode.SINK_SERVICE_UNREADY.getErrCode(), +DataProxyErrCode.SINK_SERVICE_UNREADY.getErrMsg()); +return; +} +// get and check groupId String groupId = req.getParameter(AttributeConstants.GROUP_ID); +if (StringUtils.isEmpty(groupId)) { +returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(), + DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrMsg()); +return; +} +// get and check streamId String streamId = req.getParameter(AttributeConstants.STREAM_ID); +if (StringUtils.isEmpty(streamId)) { +returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(), + DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrMsg()); +return; +} +// get and check dt String dt = req.getParameter(AttributeConstants.DATA_TIME); +if (StringUtils.isEmpty(dt)) { +returnRspPackage(resp, req.getCharacterEncoding(), +DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrCode(), +DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrMsg()); +return; +} +// get and check body String body = req.getParameter(AttrConstants.BODY); - -if (StringUtils.isEmpty(groupId)) { -invalidKey = "groupId"; -} else if (StringUtils.isEmpty(streamId)) { -invalidKey = "streamId"; -} else if (StringUtils.isEmpty(dt)) { -invalidKey = "dt"; -} else if (StringUtils.isEmpty(body)) { -invalidKey = "body"; +if (StringUtils.isEmpty(body)) { +returnRspPackage(resp, req.getCharacterEncoding(), +DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(), +DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrMsg()); +return; +} +// check body length +if (body.length() > maxMsgLength) { +returnRspPackage(resp, req.getCharacterEncoding(), +DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(), +"Bad request, body length exceeds the limit:" + maxMsgLength); +return; } - try { -if (invalidKey != null) { -LOG.warn("Received bad request from client. " + invalidKey + " is empty."); -code = StatusCode.ILLEGAL_ARGUMENT; -message = "Bad request from client. " + invalidKey + " must not be empty."; -} else if (body.length() > maxMsgLength) { -LOG.warn("Received bad request from client. Body length is " + body.length()); -code = StatusCode.EXCEED_LEN; -message = "Bad request from client. Body length is exceeding the limit:" + maxMsgLength; -} else { -chain.doFilter(request, response); -} +chain.doFilter(request, response); +returnRspPackage(resp, req.getCharacterEncoding(), +DataProxyErrCode.SUCCESS.getErrCode(), +DataProxyErrCode.SUCCESS.getErrMsg()); } catch (Throwable t) { -code
[GitHub] [inlong] EMsnap merged pull request #6408: [INLONG-6370][Sort] The op type in debezium format should be u or update
EMsnap merged PR #6408: URL: https://github.com/apache/inlong/pull/6408 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6370][Sort] The op type in debezium format should be u or update (#6408)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 1fbf04550 [INLONG-6370][Sort] The op type in debezium format should be u or update (#6408) 1fbf04550 is described below commit 1fbf0455046f8d066337893225862245b5c1adfe Author: Schnapps AuthorDate: Sun Nov 6 08:41:25 2022 +0800 [INLONG-6370][Sort] The op type in debezium format should be u or update (#6408) --- .../sort/cdc/mysql/table/MySqlReadableMetadata.java | 21 + 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java index fcfb636ac..bff2cc284 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java @@ -166,7 +166,7 @@ public enum MySqlReadableMetadata { .mysqlType(getMysqlType(tableSchema)) .build(); DebeziumJson debeziumJson = DebeziumJson.builder().after(field).source(source) - .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getOpType(record)) + .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(record)) .tableChange(tableSchema).build(); try { @@ -237,7 +237,7 @@ public enum MySqlReadableMetadata { @Override public Object read(SourceRecord record) { -return StringData.fromString(getOpType(record)); +return StringData.fromString(getCanalOpType(record)); } }), @@ -435,7 +435,7 @@ public enum MySqlReadableMetadata { .data(dataList).database(databaseName) .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema)) .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts) -.type(getOpType(record)).sqlType(getSqlType(tableSchema)).build(); + .type(getCanalOpType(record)).sqlType(getSqlType(tableSchema)).build(); try { return StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson)); @@ -455,7 +455,7 @@ public enum MySqlReadableMetadata { this.converter = converter; } -private static String getOpType(SourceRecord record) { +private static String getCanalOpType(SourceRecord record) { String opType; final Envelope.Operation op = Envelope.operationFor(record); if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { @@ -468,6 +468,19 @@ public enum MySqlReadableMetadata { return opType; } +private static String getDebeziumOpType(SourceRecord record) { +String opType; +final Envelope.Operation op = Envelope.operationFor(record); +if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { +opType = "c"; +} else if (op == Envelope.Operation.DELETE) { +opType = "d"; +} else { +opType = "u"; +} +return opType; +} + private static List getPkNames(@Nullable TableChanges.TableChange tableSchema) { if (tableSchema == null) { return null;
[GitHub] [inlong] healchow opened a new pull request, #6414: [INLONG-6412][Manager] Add a parameter to control whether to initiate the process for StreamSink
healchow opened a new pull request, #6414: URL: https://github.com/apache/inlong/pull/6414 ### Prepare a Pull Request - Fixes #6412 ### Motivation Add a parameter to control whether to initiate the process for StreamSink. ### Modifications 1. Add `startProcess` parameter in `SinkRequest`, which is used by saving or updating APIs; 2. Add `startProcess` in the query parameter for delete API. ### Verifying this change - [ ] This change is already covered by existing tests, such as: `XxxSinkServiceTest` ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong-website] e-mhui opened a new pull request, #584: [INLONG-579][Sort] Add doc for oracle connector for all migrate
e-mhui opened a new pull request, #584: URL: https://github.com/apache/inlong-website/pull/584 ### Add doc for oracle connector for all migrate - Fixes #579 ### Motivation Add doc for oracle connector for all migrate -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] ciscozhou commented on pull request #6414: [INLONG-6412][Manager] Add a parameter to control whether to initiate the process for StreamSink
ciscozhou commented on PR #6414: URL: https://github.com/apache/inlong/pull/6414#issuecomment-1304710810 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #6411: [INLONG-6410][Manager] Ensure that the additional parameters of group and stream can be saved
gong commented on code in PR #6411: URL: https://github.com/apache/inlong/pull/6411#discussion_r1014770780 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BaseInlongConsume.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume; + +import io.swagger.annotations.ApiModel; +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * The base parameter class of InlongGroup, support user extend their own business params. + */ Review Comment: Maybe,`The base parameter class of InlongGroup` decription is error, `InlongGroup` should be modified to`InlongConsume`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org