[GitHub] [inlong] dockerzhang commented on issue #5643: [Bug][CVE] There is a vulnerability in Apache Flume 1.9.0

2022-11-05 Thread GitBox


dockerzhang commented on issue #5643:
URL: https://github.com/apache/inlong/issues/5643#issuecomment-1304435058

   updating the flume version will affect a lot for dataproxy module, and we 
could update it later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang commented on issue #6105: [Umbrella] [Manager] Manager support delete/update by unique key

2022-11-05 Thread GitBox


dockerzhang commented on issue #6105:
URL: https://github.com/apache/inlong/issues/6105#issuecomment-1304435627

   All sub-tasks are finished, so close it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #6407: [INLONG-6401][Sort] Schema update stuck in dead loop cause stackoverflow in multiple sink scences

2022-11-05 Thread GitBox


dockerzhang merged PR #6407:
URL: https://github.com/apache/inlong/pull/6407


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop cause (#6407)

2022-11-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new a3d9d597b [INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop 
cause (#6407)
a3d9d597b is described below

commit a3d9d597b3787a1d42ac79091cc3ae2007b751c9
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Sat Nov 5 15:57:58 2022 +0800

[INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop cause (#6407)

Co-authored-by: thesumery <158971...@qq.com>
---
 .../sink/multiple/DynamicSchemaHandleOperator.java | 30 +++---
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index e7fe68127..a0a9092c6 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -229,10 +229,12 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator tableChanges = 
SchemaChangeUtils.diffSchema(oldSchema, newSchema);
-if (canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
-
SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges);
-LOG.info("Schema evolution in table({}) for table change: {}", 
tableId, tableChanges);
+if (!canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
+// If can not handle this schema update, should not push data 
into next operator
+return;
 }
+SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), 
tableChanges);
+LOG.info("Schema evolution in table({}) for table change: {}", 
tableId, tableChanges);
 }
 transaction.commitTransaction();
 handleSchemaInfoEvent(tableId, table.schema());
@@ -270,22 +272,20 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator tableChanges) {
 boolean canHandle = true;
 for (TableChange tableChange : tableChanges) {
-if (tableChange instanceof AddColumn) {
-canHandle &= 
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
-multipleSinkOption.getSchemaUpdatePolicy());
-} else {
-if 
(MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
-multipleSinkOption.getSchemaUpdatePolicy())) {
-LOG.info("Ignore table {} schema change: {} because 
iceberg can't handle it.",
-tableId, tableChange);
-}
+canHandle &= 
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+multipleSinkOption.getSchemaUpdatePolicy());
+if (!(tableChange instanceof AddColumn)) {
 // todo:currently iceberg can only handle addColumn, so always 
return false
+LOG.info("Ignore table {} schema change: {} because iceberg 
can't handle it.",
+tableId, tableChange);
 canHandle = false;
 }
+if (!canHandle) {
+blacklist.add(tableId);
+break;
+}
 }
-if (!canHandle) {
-blacklist.add(tableId);
-}
+
 return canHandle;
 }
 }



[GitHub] [inlong] dockerzhang commented on issue #5469: [Feature][Sort] Add metric for Hive with Flink metrics group and audit SDK

2022-11-05 Thread GitBox


dockerzhang commented on issue #5469:
URL: https://github.com/apache/inlong/issues/5469#issuecomment-1304445376

   this issue has finished, close it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6411: [INLONG-6410][Manager] Ensure that the additional parameters of group and stream can be saved

2022-11-05 Thread GitBox


healchow commented on code in PR #6411:
URL: https://github.com/apache/inlong/pull/6411#discussion_r1014638247


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java:
##
@@ -61,15 +63,25 @@ public InlongTubeMQInfo getFromEntity(InlongGroupEntity 
entity) {
 
 InlongTubeMQInfo groupInfo = new InlongTubeMQInfo();
 CommonBeanUtils.copyProperties(entity, groupInfo);
-
+if (StringUtils.isNotBlank(entity.getExtParams())) {
+InlongTubeMQDTO dto = 
InlongTubeMQDTO.getFromJson(entity.getExtParams());
+CommonBeanUtils.copyProperties(dto, groupInfo);
+}
 // TODO get the cluster
 // groupInfo.setTubeMaster();
 return groupInfo;
 }
 
 @Override
 protected void setTargetEntity(InlongGroupRequest request, 
InlongGroupEntity targetEntity) {
-LOGGER.info("do nothing for inlong group with TubeMQ");
+InlongTubeMQRequest tubeMQRequest = (InlongTubeMQRequest) request;
+try {
+InlongTubeMQDTO dto = 
InlongTubeMQDTO.getFromRequest(tubeMQRequest);
+targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+} catch (Exception e) {
+throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+}
+LOGGER.info("success set entity for inlong group with TubeMQ");

Review Comment:
   Not necessary to add this info log, just remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6411: [INLONG-6410][Manager] Ensure that the additional parameters of group and stream can be saved

2022-11-05 Thread GitBox


healchow commented on code in PR #6411:
URL: https://github.com/apache/inlong/pull/6411#discussion_r1014638768


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BaseInlongConsume.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume;
+
+import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * The base parameter class of InlongGroup, support user extend their own 
business params.
+ */
+@Data
+@AllArgsConstructor
+@ApiModel("Base info of inlong group")
+public class BaseInlongConsume {
+
+// you can add extend parameters in this class
+
+}

Review Comment:
   Suggest adding one blank line after the end of `}`, then it will not appear 
this:
   
   https://user-images.githubusercontent.com/31994335/200123281-0c60a7de-6215-477d-be2e-29ca2509f801.png";>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] 01/03: [INLONG-6379][Sort] Iceberg misses metric data in multiple sink scenes (#6381)

2022-11-05 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 7617fa00748b5e9d8fa1d6a7998885f461e91b60
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Fri Nov 4 16:34:50 2022 +0800

[INLONG-6379][Sort] Iceberg misses metric data in multiple sink scenes 
(#6381)
---
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  3 +-
 .../sink/multiple/IcebergMultipleStreamWriter.java | 59 +-
 2 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index bb7498650..25f9e963b 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -528,7 +528,8 @@ public class FlinkSink {
 .setParallelism(parallelism);
 
 IcebergProcessOperator streamWriter =
-new IcebergProcessOperator(new 
IcebergMultipleStreamWriter(appendMode, catalogLoader));
+new IcebergProcessOperator(new IcebergMultipleStreamWriter(
+appendMode, catalogLoader, inlongMetric, 
auditHostAndPorts));
 SingleOutputStreamOperator writerStream = 
routeStream
 
.transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME),
 TypeInformation.of(IcebergProcessOperator.class),
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 4c3fb0045..617eb6d69 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -18,6 +18,10 @@
 
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -34,10 +38,16 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.sink.TaskWriterFactory;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.util.HashMap;
 import java.util.List;
@@ -52,6 +62,9 @@ import static 
org.apache.iceberg.TableProperties.UPSERT_ENABLED;
 import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
 import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * Iceberg writer that can distinguish different sink tables and route and 
distribute data into different
@@ -70,9 +83,23 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction multipleSchemas;
 private transient FunctionInitializationContext 
functionInitializationContext;
 
-public IcebergMultipleStreamWriter(boolean appendMode, CatalogLoader 
catalogLoader) {
+// metric
+private final String inlongMetric;
+private final String auditHostAndPorts;
+@Nullable
+private transient SinkMetricData metricData;
+private transient ListState metricStateListState;
+private transient MetricState metricState;
+
+public IcebergMultipleStreamWriter(
+boolean appendMode,
+CatalogLoader catalogLoader,
+String inlongMetric,
+String auditHostAndPorts) {

[inlong] branch master updated (a3d9d597b -> d6874a8b8)

2022-11-05 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


omit a3d9d597b [INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop 
cause (#6407)
omit d4d4dd5b4 [INLONG-6332][Agent] Fix reboot will reset file position 
error (#6333)
omit 9228a22d6 [INLONG-6379][Sort] Bugfix:iceberg miss metric data in 
multiple sink (#6381)
 new 7617fa007 [INLONG-6379][Sort] Iceberg misses metric data in multiple 
sink scenes (#6381)
 new 1c790a399 [INLONG-6332][Agent] Fix reboot will reset file position 
error (#6333)
 new d6874a8b8 [INLONG-6401][Sort] Schema update causes stack overflow in 
multiple sink scenes (#6407)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a3d9d597b)
\
 N -- N -- N   refs/heads/master (d6874a8b8)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:



[inlong] 03/03: [INLONG-6401][Sort] Schema update causes stack overflow in multiple sink scenes (#6407)

2022-11-05 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d6874a8b8c363a71d44182db626c62322ad02e41
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Sat Nov 5 15:57:58 2022 +0800

[INLONG-6401][Sort] Schema update causes stack overflow in multiple sink 
scenes (#6407)
---
 .../sink/multiple/DynamicSchemaHandleOperator.java | 30 +++---
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index e7fe68127..a0a9092c6 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -229,10 +229,12 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator tableChanges = 
SchemaChangeUtils.diffSchema(oldSchema, newSchema);
-if (canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
-
SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges);
-LOG.info("Schema evolution in table({}) for table change: {}", 
tableId, tableChanges);
+if (!canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
+// If can not handle this schema update, should not push data 
into next operator
+return;
 }
+SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), 
tableChanges);
+LOG.info("Schema evolution in table({}) for table change: {}", 
tableId, tableChanges);
 }
 transaction.commitTransaction();
 handleSchemaInfoEvent(tableId, table.schema());
@@ -270,22 +272,20 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator tableChanges) {
 boolean canHandle = true;
 for (TableChange tableChange : tableChanges) {
-if (tableChange instanceof AddColumn) {
-canHandle &= 
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
-multipleSinkOption.getSchemaUpdatePolicy());
-} else {
-if 
(MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
-multipleSinkOption.getSchemaUpdatePolicy())) {
-LOG.info("Ignore table {} schema change: {} because 
iceberg can't handle it.",
-tableId, tableChange);
-}
+canHandle &= 
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+multipleSinkOption.getSchemaUpdatePolicy());
+if (!(tableChange instanceof AddColumn)) {
 // todo:currently iceberg can only handle addColumn, so always 
return false
+LOG.info("Ignore table {} schema change: {} because iceberg 
can't handle it.",
+tableId, tableChange);
 canHandle = false;
 }
+if (!canHandle) {
+blacklist.add(tableId);
+break;
+}
 }
-if (!canHandle) {
-blacklist.add(tableId);
-}
+
 return canHandle;
 }
 }



[inlong] 02/03: [INLONG-6332][Agent] Fix reboot will reset file position error (#6333)

2022-11-05 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 1c790a399ea7bb9e964d8215e7d65584aba53705
Author: ganfengtan 
AuthorDate: Fri Nov 4 21:18:58 2022 +0800

[INLONG-6332][Agent] Fix reboot will reset file position error (#6333)

Co-authored-by: healchow 
---
 .../agent/core/task/TaskPositionManager.java   |   2 +
 .../sources/reader/file/MonitorTextFile.java   | 101 -
 2 files changed, 60 insertions(+), 43 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index c3bfa0abe..c5b944210 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -139,6 +139,8 @@ public class TaskPositionManager extends AbstractDaemon {
 ConcurrentHashMap positionTemp = new 
ConcurrentHashMap<>();
 ConcurrentHashMap position = 
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
 if (position == null) {
+JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
+positionTemp.put(sourcePath, jobProfile.getLong(sourcePath + 
POSITION_SUFFIX, 0));
 position = positionTemp;
 }
 Long beforePosition = position.getOrDefault(sourcePath, 0L);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index d8160bd8b..1f5958cdf 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -21,10 +21,10 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.attribute.BasicFileAttributes;
-import java.util.Objects;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -35,26 +35,28 @@ import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_EXP
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INTERVAL;
 
 /**
- * monitor files
+ * Monitor for text files
  */
 public final class MonitorTextFile {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(MonitorTextFile.class);
-private static volatile MonitorTextFile monitorTextFile = null;
 // monitor thread pool
-private final ThreadPoolExecutor runningPool = new ThreadPoolExecutor(
+private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
 0, Integer.MAX_VALUE,
 60L, TimeUnit.SECONDS,
 new SynchronousQueue<>(),
 new AgentThreadFactory("monitor-file"));
 
+private static volatile MonitorTextFile monitorTextFile = null;
+
 private MonitorTextFile() {
 
 }
 
 /**
- * Mode of singleton
- * @return MonitorTextFile instance
+ * Get a singleton instance of MonitorTextFile.
+ *
+ * @return monitor text file instance
  */
 public static MonitorTextFile getInstance() {
 if (monitorTextFile == null) {
@@ -68,37 +70,35 @@ public final class MonitorTextFile {
 }
 
 public void monitor(FileReaderOperator fileReaderOperator, TextFileReader 
textFileReader) {
-MonitorEventRunnable monitorEvent = new 
MonitorEventRunnable(fileReaderOperator, textFileReader);
-runningPool.execute(monitorEvent);
+EXECUTOR_SERVICE.execute(new MonitorEventRunnable(fileReaderOperator, 
textFileReader));
 }
 
 /**
- * monitor file event
+ * Runnable for monitor the file event
  */
-private class MonitorEventRunnable implements Runnable {
+private static class MonitorEventRunnable implements Runnable {
 
 private static final int WAIT_TIME = 30;
 private final FileReaderOperator fileReaderOperator;
 private final TextFileReader textFileReader;
 private final Long interval;
 private final long startTime = System.currentTimeMillis();
+private long lastFlushTime = System.currentTimeMillis();
 private String path;
-/**
- * the last modify time of the file
- */
+
+// the last modify time of the file
 private BasicFileAttributes attributesBefore;
 
-public MonitorEventRunnable(File

[GitHub] [inlong] gosonzhang opened a new pull request, #6413: [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started

2022-11-05 Thread GitBox


gosonzhang opened a new pull request, #6413:
URL: https://github.com/apache/inlong/pull/6413

   1. I added an error SINK_SERVICE_UNREADY: if MQ cluster is not configured, 
or if MQ cluster is configured but initialization fails, DataProxy will not 
serve externally; it will only serve after initialization is successful;
   2. Adjusted the error code of HTTP access and incorporated it into the 
DataProxyErrCode enumeration
   
   - Fixes #6406 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] github-code-scanning[bot] commented on a diff in pull request #6413: [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started

2022-11-05 Thread GitBox


github-code-scanning[bot] commented on code in PR #6413:
URL: https://github.com/apache/inlong/pull/6413#discussion_r1014650437


##
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java:
##
@@ -49,81 +51,98 @@
 }
 
 @Override
-public void doFilter(ServletRequest request, ServletResponse response, 
FilterChain chain) throws IOException {
+public void doFilter(ServletRequest request,
+ ServletResponse response,
+ FilterChain chain) throws IOException {
 HttpServletRequest req = (HttpServletRequest) request;
 HttpServletResponse resp = (HttpServletResponse) response;
 
-int code = StatusCode.SUCCESS;
-String message = "success";
-
 String pathInfo = req.getPathInfo();
 if (pathInfo.startsWith("/")) {
 pathInfo = pathInfo.substring(1);
 }
 if ("heartbeat".equals(pathInfo)) {
-resp.setCharacterEncoding(req.getCharacterEncoding());
-resp.setStatus(HttpServletResponse.SC_OK);
-resp.flushBuffer();
+returnRspPackage(resp, req.getCharacterEncoding(),
+DataProxyErrCode.SUCCESS.getErrCode(),
+DataProxyErrCode.SUCCESS.getErrMsg());
 return;
 }
-
-String invalidKey = null;
+// check sink service status
+if (!ConfigManager.getInstance().isMqClusterReady()) {
+returnRspPackage(resp, req.getCharacterEncoding(),
+DataProxyErrCode.SINK_SERVICE_UNREADY.getErrCode(),
+DataProxyErrCode.SINK_SERVICE_UNREADY.getErrMsg());
+return;
+}
+// get and check groupId
 String groupId = req.getParameter(AttributeConstants.GROUP_ID);
+if (StringUtils.isEmpty(groupId)) {
+returnRspPackage(resp, req.getCharacterEncoding(),
+
DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(),
+
DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrMsg());
+return;
+}
+// get and check streamId
 String streamId = req.getParameter(AttributeConstants.STREAM_ID);
+if (StringUtils.isEmpty(streamId)) {
+returnRspPackage(resp, req.getCharacterEncoding(),
+
DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
+
DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrMsg());
+return;
+}
+// get and check dt
 String dt = req.getParameter(AttributeConstants.DATA_TIME);
+if (StringUtils.isEmpty(dt)) {
+returnRspPackage(resp, req.getCharacterEncoding(),
+DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrCode(),
+DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrMsg());
+return;
+}
+// get and check body
 String body = req.getParameter(AttrConstants.BODY);
-
-if (StringUtils.isEmpty(groupId)) {
-invalidKey = "groupId";
-} else if (StringUtils.isEmpty(streamId)) {
-invalidKey = "streamId";
-} else if (StringUtils.isEmpty(dt)) {
-invalidKey = "dt";
-} else if (StringUtils.isEmpty(body)) {
-invalidKey = "body";
+if (StringUtils.isEmpty(body)) {
+returnRspPackage(resp, req.getCharacterEncoding(),
+DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
+DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrMsg());
+return;
+}
+// check body length
+if (body.length() > maxMsgLength) {
+returnRspPackage(resp, req.getCharacterEncoding(),
+DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
+"Bad request, body length exceeds the limit:" + 
maxMsgLength);
+return;
 }
-
 try {
-if (invalidKey != null) {
-LOG.warn("Received bad request from client. " + invalidKey + " 
is empty.");
-code = StatusCode.ILLEGAL_ARGUMENT;
-message = "Bad request from client. " + invalidKey + " must 
not be empty.";
-} else if (body.length() > maxMsgLength) {
-LOG.warn("Received bad request from client. Body length is " + 
body.length());
-code = StatusCode.EXCEED_LEN;
-message = "Bad request from client. Body length is exceeding 
the limit:" + maxMsgLength;
-} else {
-chain.doFilter(request, response);
-}
+chain.doFilter(request, response);
+returnRspPackage(resp, req.getCharacterEncoding(),
+DataProxyErrCode.SUCCESS.getErrCode(),
+DataProxyErrCode.SUCCESS.getErrMsg());
 } catch (Throwable t) {
-code

[GitHub] [inlong] EMsnap merged pull request #6408: [INLONG-6370][Sort] The op type in debezium format should be u or update

2022-11-05 Thread GitBox


EMsnap merged PR #6408:
URL: https://github.com/apache/inlong/pull/6408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6370][Sort] The op type in debezium format should be u or update (#6408)

2022-11-05 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 1fbf04550 [INLONG-6370][Sort] The op type in debezium format should be 
u or update (#6408)
1fbf04550 is described below

commit 1fbf0455046f8d066337893225862245b5c1adfe
Author: Schnapps 
AuthorDate: Sun Nov 6 08:41:25 2022 +0800

[INLONG-6370][Sort] The op type in debezium format should be u or update 
(#6408)
---
 .../sort/cdc/mysql/table/MySqlReadableMetadata.java | 21 +
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index fcfb636ac..bff2cc284 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -166,7 +166,7 @@ public enum MySqlReadableMetadata {
 .mysqlType(getMysqlType(tableSchema))
 .build();
 DebeziumJson debeziumJson = 
DebeziumJson.builder().after(field).source(source)
-
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getOpType(record))
+
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(record))
 .tableChange(tableSchema).build();
 
 try {
@@ -237,7 +237,7 @@ public enum MySqlReadableMetadata {
 
 @Override
 public Object read(SourceRecord record) {
-return StringData.fromString(getOpType(record));
+return StringData.fromString(getCanalOpType(record));
 }
 }),
 
@@ -435,7 +435,7 @@ public enum MySqlReadableMetadata {
 .data(dataList).database(databaseName)
 .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
 .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
-.type(getOpType(record)).sqlType(getSqlType(tableSchema)).build();
+
.type(getCanalOpType(record)).sqlType(getSqlType(tableSchema)).build();
 
 try {
 return 
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
@@ -455,7 +455,7 @@ public enum MySqlReadableMetadata {
 this.converter = converter;
 }
 
-private static String getOpType(SourceRecord record) {
+private static String getCanalOpType(SourceRecord record) {
 String opType;
 final Envelope.Operation op = Envelope.operationFor(record);
 if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
@@ -468,6 +468,19 @@ public enum MySqlReadableMetadata {
 return opType;
 }
 
+private static String getDebeziumOpType(SourceRecord record) {
+String opType;
+final Envelope.Operation op = Envelope.operationFor(record);
+if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+opType = "c";
+} else if (op == Envelope.Operation.DELETE) {
+opType = "d";
+} else {
+opType = "u";
+}
+return opType;
+}
+
 private static List getPkNames(@Nullable TableChanges.TableChange 
tableSchema) {
 if (tableSchema == null) {
 return null;



[GitHub] [inlong] healchow opened a new pull request, #6414: [INLONG-6412][Manager] Add a parameter to control whether to initiate the process for StreamSink

2022-11-05 Thread GitBox


healchow opened a new pull request, #6414:
URL: https://github.com/apache/inlong/pull/6414

   ### Prepare a Pull Request
   
   - Fixes #6412
   
   ### Motivation
   
   Add a parameter to control whether to initiate the process for StreamSink.
   
   ### Modifications
   
   1. Add `startProcess` parameter in `SinkRequest`, which is used by saving or 
updating APIs;
   2. Add `startProcess` in the query parameter for delete API.
   
   ### Verifying this change
   
   - [ ] This change is already covered by existing tests, such as:
  `XxxSinkServiceTest`
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong-website] e-mhui opened a new pull request, #584: [INLONG-579][Sort] Add doc for oracle connector for all migrate

2022-11-05 Thread GitBox


e-mhui opened a new pull request, #584:
URL: https://github.com/apache/inlong-website/pull/584

   ### Add doc for oracle connector for all migrate
   
   - Fixes #579
   
   ### Motivation
   
   Add doc for oracle connector for all migrate


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] ciscozhou commented on pull request #6414: [INLONG-6412][Manager] Add a parameter to control whether to initiate the process for StreamSink

2022-11-05 Thread GitBox


ciscozhou commented on PR #6414:
URL: https://github.com/apache/inlong/pull/6414#issuecomment-1304710810

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #6411: [INLONG-6410][Manager] Ensure that the additional parameters of group and stream can be saved

2022-11-05 Thread GitBox


gong commented on code in PR #6411:
URL: https://github.com/apache/inlong/pull/6411#discussion_r1014770780


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BaseInlongConsume.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume;
+
+import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * The base parameter class of InlongGroup, support user extend their own 
business params.
+ */

Review Comment:
   Maybe,`The base parameter class of InlongGroup` decription is error, 
`InlongGroup` should be modified to`InlongConsume`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org