[GitHub] [inlong] EMsnap commented on a diff in pull request #6210: [INLONG-6174][Sort] Mysql connector support meta data with debezium format

2022-10-20 Thread GitBox


EMsnap commented on code in PR #6210:
URL: https://github.com/apache/inlong/pull/6210#discussion_r1000490064


##
inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java:
##
@@ -62,12 +62,22 @@ public enum MetaField {
 /**
  * Represents a canal json of a record in database (in string format)
  */
-DATA,

Review Comment:
   done thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #6237: [INLONG-6236][CVE] Fix the CVE-2022-42003 for jackson-databind

2022-10-20 Thread GitBox


dockerzhang merged PR #6237:
URL: https://github.com/apache/inlong/pull/6237


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6236][CVE] Fix the CVE-2022-42003 for jackson-databind (#6237)

2022-10-20 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 7b40f0b10 [INLONG-6236][CVE] Fix the CVE-2022-42003 for 
jackson-databind (#6237)
7b40f0b10 is described below

commit 7b40f0b10db7252ecc963e13566d8167675e82dc
Author: Charles Zhang 
AuthorDate: Thu Oct 20 19:13:10 2022 +0800

[INLONG-6236][CVE] Fix the CVE-2022-42003 for jackson-databind (#6237)
---
 licenses/inlong-agent/LICENSE   | 2 +-
 licenses/inlong-audit/LICENSE   | 2 +-
 licenses/inlong-dataproxy/LICENSE   | 2 +-
 licenses/inlong-manager/LICENSE | 2 +-
 licenses/inlong-sort-connectors/LICENSE | 2 +-
 licenses/inlong-sort-standalone/LICENSE | 2 +-
 licenses/inlong-tubemq-manager/LICENSE  | 2 +-
 pom.xml | 2 +-
 8 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE
index 9738923c3..67f86c53e 100644
--- a/licenses/inlong-agent/LICENSE
+++ b/licenses/inlong-agent/LICENSE
@@ -395,7 +395,7 @@ The text of each license is the standard Apache 2.0 license.
   com.google.j2objc:j2objc-annotations:1.3 - J2ObjC Annotations 
(https://github.com/google/j2objc/), (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.core:jackson-annotations:2.13.2 - Jackson-annotations 
(https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.13.2),
 (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.core:jackson-core:2.13.2 - Jackson-core 
(https://github.com/FasterXML/jackson-core/tree/jackson-core-2.13.2), (The 
Apache Software License, Version 2.0)
-  com.fasterxml.jackson.core:jackson-databind:2.13.2.2 - jackson-databind 
(https://github.com/FasterXML/jackson-databind/tree/jackson-databind-2.13.2.2), 
(The Apache Software License, Version 2.0)
+  com.fasterxml.jackson.core:jackson-databind:2.13.4.2 - jackson-databind 
(https://github.com/FasterXML/jackson-databind/tree/jackson-databind-2.13.4.2), 
(The Apache Software License, Version 2.0)
   com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.10.0 - 
Jackson-dataformat-CSV 
(https://github.com/FasterXML/jackson-dataformats-text/tree/jackson-dataformats-text-2.10.0),
 (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.10.0 - Jackson 
datatype: jdk8 
(https://github.com/FasterXML/jackson-modules-java8/tree/jackson-modules-java8-2.10.0),
 (The Apache Software License, Version 2.0)
   jakarta.validation:jakarta.validation-api:2.0.2 - Jakarta Bean Validation 
API (https://beanvalidation.org), (Apache License 2.0)
diff --git a/licenses/inlong-audit/LICENSE b/licenses/inlong-audit/LICENSE
index 8e9147d14..15c7420c6 100644
--- a/licenses/inlong-audit/LICENSE
+++ b/licenses/inlong-audit/LICENSE
@@ -393,7 +393,7 @@ The text of each license is the standard Apache 2.0 license.
   com.google.j2objc:j2objc-annotations:1.3 - J2ObjC Annotations 
(https://github.com/google/j2objc/), (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.core:jackson-annotations:2.13.2 - Jackson-annotations 
(https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.13.2),
 (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.core:jackson-core:2.13.2 - Jackson-core 
(https://github.com/FasterXML/jackson-core/tree/jackson-core-2.13.2), (The 
Apache Software License, Version 2.0)
-  com.fasterxml.jackson.core:jackson-databind:2.13.2.2 - jackson-databind 
(https://github.com/FasterXML/jackson-databind/tree/jackson-databind-2.13.2.2), 
(The Apache Software License, Version 2.0)
+  com.fasterxml.jackson.core:jackson-databind:2.13.4.2 - jackson-databind 
(https://github.com/FasterXML/jackson-databind/tree/jackson-databind-2.13.4.2), 
(The Apache Software License, Version 2.0)
   com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.8.11 - Jackson 
dataformat: CBOR 
(https://github.com/FasterXML/jackson-dataformats-binary/tree/jackson-dataformats-binary-2.8.11/cbor),
 (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.8.11 - Jackson 
dataformat: Smile 
(https://github.com/FasterXML/jackson-dataformats-binary/tree/jackson-dataformats-binary-2.8.11/smile),
 (The Apache Software License, Version 2.0)
   joda-time:joda-time:2.9.9 - Joda-Time (http://www.joda.org/joda-time/), 
(Apache License, Version 2.0)
diff --git a/licenses/inlong-dataproxy/LICENSE 
b/licenses/inlong-dataproxy/LICENSE
index 9d1affbc3..941e387b6 100644
--- a/licenses/inlong-dataproxy/LICENSE
+++ b/licenses/inlong-dataproxy/LICENSE
@@ -381,7 +381,7 @@ The text of each license is the standard Apache 2.0 license.
   com.google.j2objc:j2objc-annotations:1.3 - J2ObjC Annotations 
(https://github.com/google/j2objc/), (The Apache Software License, Version 2.0)
   c

[GitHub] [inlong] healchow commented on a diff in pull request #6230: [INLONG-6216][Manager] Support get group topic info

2022-10-20 Thread GitBox


healchow commented on code in PR #6230:
URL: https://github.com/apache/inlong/pull/6230#discussion_r1000499024


##
inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml:
##
@@ -247,4 +247,14 @@
 where id = #{id,jdbcType=INTEGER}
 
 
+
+select
+
+from inlong_cluster
+
+is_deleted = 0
+and inlong_cluster_tag = #{clusterTag, jdbcType=STRING}

Review Comment:
   Did it work for the type `STRING`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6230: [INLONG-6216][Manager] Support get group topic info

2022-10-20 Thread GitBox


healchow commented on code in PR #6230:
URL: https://github.com/apache/inlong/pull/6230#discussion_r1000499307


##
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java:
##
@@ -44,6 +44,9 @@ public interface InlongStreamExtEntityMapper {
 
 List selectByRelatedId(@Param("groupId") String 
groupId, @Param("streamId") String streamId);
 
+InlongStreamExtEntity selectByKey(@Param("groupId") String groupId, 
@Param("streamId") String streamId,
+  @Param("keyName") String keyName);

Review Comment:
   Not necessary to indent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6240: [INLONG-6239][InLong] Add inlongctl in the root directory

2022-10-20 Thread GitBox


healchow commented on code in PR #6240:
URL: https://github.com/apache/inlong/pull/6240#discussion_r1000380309


##
bin/inlongctl:
##
@@ -0,0 +1,24 @@
+#! /bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+INLONG_HOME=$(
+  cd $(dirname $0)

Review Comment:
   ```suggestion
 # shellcheck disable=SC2164
 cd $(dirname $0)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6240: [INLONG-6239][InLong] Add inlongctl in the root directory

2022-10-20 Thread GitBox


healchow commented on code in PR #6240:
URL: https://github.com/apache/inlong/pull/6240#discussion_r1000500877


##
bin/inlongctl:
##
@@ -0,0 +1,25 @@
+#! /bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+INLONG_HOME=$(
+  cd $(dirname $0)
+  cd ..
+  pwd
+)

Review Comment:
   ```suggestion
   
   INLONG_HOME=$(
 # shellcheck disable=SC2164
 cd "$(dirname $0)"
 cd ..
 pwd
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6210: [INLONG-6174][Sort] Mysql connector support meta data with debezium format

2022-10-20 Thread GitBox


yunqingmoswu commented on code in PR #6210:
URL: https://github.com/apache/inlong/pull/6210#discussion_r1000527832


##
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java:
##
@@ -98,7 +100,7 @@ public Object read(SourceRecord record) {
 }),
 
 DATA(
-"meta.data",

Review Comment:
   Please keep in mind code compatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] thesumery opened a new pull request, #6247: [INLONG-6246][Sort] Import multiple sink option and schema update handle policy

2022-10-20 Thread GitBox


thesumery opened a new pull request, #6247:
URL: https://github.com/apache/inlong/pull/6247

   [INLONG-6246][Sort] Import multiple sink option and schema update handle 
policy
   
   ### Prepare a Pull Request
   - Title Example: [INLONG-6246][Sort] Import multiple sink option and schema 
update handle policy
   - Fixes #6246
   
   ### Motivation
   
   *Import multiple sink option and schema update handle policy*
   
   ### Modifications
   
   *Import multiple sink option and schema update handle policy*
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #6210: [INLONG-6174][Sort] Mysql connector support meta data with debezium format

2022-10-20 Thread GitBox


EMsnap commented on code in PR #6210:
URL: https://github.com/apache/inlong/pull/6210#discussion_r1000628573


##
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java:
##
@@ -98,7 +100,7 @@ public Object read(SourceRecord record) {
 }),
 
 DATA(
-"meta.data",

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

2022-10-20 Thread GitBox


EMsnap commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1000656733


##
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java:
##
@@ -0,0 +1,395 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifests;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifestsSerializer;
+import org.apache.inlong.sort.iceberg.sink.FlinkManifestUtil;
+import org.apache.inlong.sort.iceberg.sink.ManifestOutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+
+public class IcebergSingleFileCommiter extends 
IcebergProcessFunction
+implements CheckpointedFunction, CheckpointListener {
+private static final long serialVersionUID = 1L;
+private static final long INITIAL_CHECKPOINT_ID = -1L;
+private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+
+private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSingleFileCommiter.class);
+private static final String FLINK_JOB_ID = "flink.job-id";
+
+// The max checkpoint id we've committed to iceberg table. As the flink's 
checkpoint is always increasing, so we
+// could correctly commit all the data files whose checkpoint id is 
greater than the max committed one to iceberg
+// table, for avoiding committing the same data files twice. This id will 
be attached to iceberg's meta when
+// committing the iceberg transaction.
+private static final String MAX_COMMITTED_CHECKPOINT_ID = 
"flink.max-committed-checkpoint-id";
+static final String MAX_CONTINUOUS_EMPTY_COMMITS = 
"flink.max-continuous-empty-commits";
+
+// TableLoader to load iceberg table lazily.
+private final TableLoader tableLoader;
+private final boolean replacePartitions;
+
+// A sorted map to maintain the completed data files for each pending 
checkpointId (which have not been committed
+// to iceberg table). We need a sorted map here because there's possible 
that few checkpoints snapshot failed, for
+// example: the 1st checkpoint have 2 data files <1, >, the 
2st checkpoint have 1 data files
+// <2, >. Snapshot for checkpoint#1 interrupted because of 
network/disk failure etc, wh

[GitHub] [inlong] thesumery commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

2022-10-20 Thread GitBox


thesumery commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1000718497


##
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java:
##
@@ -0,0 +1,395 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifests;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifestsSerializer;
+import org.apache.inlong.sort.iceberg.sink.FlinkManifestUtil;
+import org.apache.inlong.sort.iceberg.sink.ManifestOutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+
+public class IcebergSingleFileCommiter extends 
IcebergProcessFunction
+implements CheckpointedFunction, CheckpointListener {
+private static final long serialVersionUID = 1L;
+private static final long INITIAL_CHECKPOINT_ID = -1L;
+private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+
+private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSingleFileCommiter.class);
+private static final String FLINK_JOB_ID = "flink.job-id";
+
+// The max checkpoint id we've committed to iceberg table. As the flink's 
checkpoint is always increasing, so we
+// could correctly commit all the data files whose checkpoint id is 
greater than the max committed one to iceberg
+// table, for avoiding committing the same data files twice. This id will 
be attached to iceberg's meta when
+// committing the iceberg transaction.
+private static final String MAX_COMMITTED_CHECKPOINT_ID = 
"flink.max-committed-checkpoint-id";
+static final String MAX_CONTINUOUS_EMPTY_COMMITS = 
"flink.max-continuous-empty-commits";
+
+// TableLoader to load iceberg table lazily.
+private final TableLoader tableLoader;
+private final boolean replacePartitions;
+
+// A sorted map to maintain the completed data files for each pending 
checkpointId (which have not been committed
+// to iceberg table). We need a sorted map here because there's possible 
that few checkpoints snapshot failed, for
+// example: the 1st checkpoint have 2 data files <1, >, the 
2st checkpoint have 1 data files
+// <2, >. Snapshot for checkpoint#1 interrupted because of 
network/disk failure etc,

[GitHub] [inlong] thesumery commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

2022-10-20 Thread GitBox


thesumery commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1000720450


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##
@@ -43,8 +66,39 @@
  */
 public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaFormat {
 
+private static final Map 
SQL_TYPE_2_ICEBERG_TYPE_MAPPING =

Review Comment:
   Here name is error, it's not business with iceberg. It shoule be called 
SQL_TYPE_2_FLINK_TYPE_MAPPING



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #6240: [INLONG-6239][InLong] Add inlongctl in the root directory

2022-10-20 Thread GitBox


dockerzhang merged PR #6240:
URL: https://github.com/apache/inlong/pull/6240


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6239][InLong] Add inlongctl in the root directory (#6240)

2022-10-20 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 092e581c1 [INLONG-6239][InLong] Add inlongctl in the root directory 
(#6240)
092e581c1 is described below

commit 092e581c175bc7cf1ff9832e7518a49b880cbe71
Author: haifxu 
AuthorDate: Fri Oct 21 10:04:15 2022 +0800

[INLONG-6239][InLong] Add inlongctl in the root directory (#6240)

Co-authored-by: healchow 
---
 bin/inlongctl  | 25 ++
 .../src/main/assemblies/release.xml|  4 ++--
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/bin/inlongctl b/bin/inlongctl
new file mode 100644
index 0..bc1912281
--- /dev/null
+++ b/bin/inlongctl
@@ -0,0 +1,25 @@
+#! /bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+INLONG_HOME=$(
+  # shellcheck disable=SC2164
+  cd "$(dirname $0)"
+  cd ..
+  pwd
+)
+
+bash +x $INLONG_HOME/inlong-manager/bin/managerctl $*
diff --git a/inlong-distribution/src/main/assemblies/release.xml 
b/inlong-distribution/src/main/assemblies/release.xml
index ccc2b43c8..104180208 100644
--- a/inlong-distribution/src/main/assemblies/release.xml
+++ b/inlong-distribution/src/main/assemblies/release.xml
@@ -151,14 +151,14 @@
 ./
 
 
-
+
 
 ../bin
 bin
 0755
 unix
 
-
+
 
 ../conf
 conf



[GitHub] [inlong] healchow commented on a diff in pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

2022-10-20 Thread GitBox


healchow commented on code in PR #6227:
URL: https://github.com/apache/inlong/pull/6227#discussion_r1001287655


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java:
##
@@ -66,24 +76,32 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, 
String operator) {
 }
 
 try {
+List streamInfoList = 
streamService.getTopicList(groupId);
+if (streamInfoList == null || streamInfoList.isEmpty()) {

Review Comment:
   Suggest using `CollectionUtils.isEmpty()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

2022-10-20 Thread GitBox


healchow commented on code in PR #6227:
URL: https://github.com/apache/inlong/pull/6227#discussion_r1001287844


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java:
##
@@ -66,24 +76,32 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, 
String operator) {
 }
 
 try {
+List streamInfoList = 
streamService.getTopicList(groupId);
+if (streamInfoList == null || streamInfoList.isEmpty()) {
+log.warn("skip to create tube topic as no streams for 
groupId={}", groupId);

Review Comment:
   create tube topic? not consumer group?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

2022-10-20 Thread GitBox


healchow commented on code in PR #6227:
URL: https://github.com/apache/inlong/pull/6227#discussion_r1001287844


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java:
##
@@ -66,24 +76,32 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, 
String operator) {
 }
 
 try {
+List streamInfoList = 
streamService.getTopicList(groupId);
+if (streamInfoList == null || streamInfoList.isEmpty()) {
+log.warn("skip to create tube topic as no streams for 
groupId={}", groupId);

Review Comment:
   create tube topic? not `tubemq consumer group`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

2022-10-20 Thread GitBox


healchow commented on code in PR #6227:
URL: https://github.com/apache/inlong/pull/6227#discussion_r1001288781


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java:
##
@@ -66,24 +76,32 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, 
String operator) {
 }
 
 try {
+List streamInfoList = 
streamService.getTopicList(groupId);
+if (streamInfoList == null || streamInfoList.isEmpty()) {
+log.warn("skip to create tube topic as no streams for 
groupId={}", groupId);
+return;
+}
 // 1. create tubemq topic
 String clusterTag = groupInfo.getInlongClusterTag();
 TubeClusterInfo tubeCluster = (TubeClusterInfo) 
clusterService.getOne(clusterTag, null, ClusterType.TUBEMQ);
 String topicName = groupInfo.getMqResource();
 tubeMQOperator.createTopic(tubeCluster, topicName, operator);
 log.info("success to create tubemq topic for groupId={}", groupId);
-
-// 2. create tubemq consumer group
-// consumer naming rules: clusterTag_topicName_consumer_group
-String consumeGroup = clusterTag + "_" + topicName + 
"_consumer_group";
-tubeMQOperator.createConsumerGroup(tubeCluster, topicName, 
consumeGroup, operator);
-log.info("success to create tubemq consumer group for groupId={}", 
groupId);
-
-// insert the consumer group info
-Integer id = consumeService.saveBySystem(groupInfo, topicName, 
consumeGroup);
-log.info("success to save inlong consume [{}] for 
consumerGroup={}, groupId={}, topic={}",
-id, consumeGroup, groupId, topicName);
-
+for (InlongStreamBriefInfo stream : streamInfoList) {
+List streamSinks = sinkService.listSink(groupId, 
stream.getInlongStreamId());
+for (StreamSink sink : streamSinks) {
+// 2. create tubemq consumer group
+// consumer naming rules: 
clusterTag_topicName_consumer_group
+String consumeGroup = clusterTag + "_" + topicName + "_" + 
sink.getId() + "_consumer_group";

Review Comment:
   Suggest extracting a constant, as 
`PulsarResourceOperator#PULSAR_SUBSCRIPTION` did.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6210: [INLONG-6174][Sort] Mysql connector support meta data with debezium format

2022-10-20 Thread GitBox


healchow commented on code in PR #6210:
URL: https://github.com/apache/inlong/pull/6210#discussion_r1001294560


##
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java:
##
@@ -379,6 +415,38 @@ public Object read(SourceRecord record) {
 }
 });
 
+private static StringData getCanalData(SourceRecord record, GenericRowData 
rowData,
+TableChange tableSchema) {
+Struct messageStruct = (Struct) record.value();
+Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+// tableName
+String tableName = getMetaData(record, 
AbstractSourceInfo.TABLE_NAME_KEY);
+// databaseName
+String databaseName = getMetaData(record, 
AbstractSourceInfo.DATABASE_NAME_KEY);
+// opTs
+long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
+// ts
+long ts = (Long) messageStruct.get(FieldName.TIMESTAMP);
+// actual data
+GenericRowData data = rowData;
+Map field = (Map) data.getField(0);
+List> dataList = new ArrayList<>();
+dataList.add(field);
+
+CanalJson canalJson = CanalJson.builder()
+.data(dataList).database(databaseName)
+.sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
+.mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
+.type(getOpType(record)).sqlType(getSqlType(tableSchema)).build();
+
+try {
+ObjectMapper objectMapper = new ObjectMapper();

Review Comment:
   Suggest using a global static instance of ObjectMapper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6247: [INLONG-6246][Sort] Import multiple sink option and schema update handle policy

2022-10-20 Thread GitBox


yunqingmoswu commented on code in PR #6247:
URL: https://github.com/apache/inlong/pull/6247#discussion_r1001295145


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java:
##
@@ -0,0 +1,195 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.base.sink;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Objects;
+
+public interface TableChange {

Review Comment:
   Add some comments?



##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java:
##
@@ -145,15 +146,15 @@ public final class Constants {
 .withDescription("The option 'sink.multiple.enable' "
 + "is used to determine whether to support 
multiple sink writing, default is 'false'.");
 
-public static final ConfigOption SINK_MULTIPLE_ADD_COLUMN_POLICY =
+public static final ConfigOption 
SINK_MULTIPLE_ADD_COLUMN_POLICY =

Review Comment:
   Add some comments?



##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java:
##
@@ -0,0 +1,130 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.base.sink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+import static 
org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE;
+import static 
org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE;
+import static 
org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.TRY_IT_BEST;
+
+public class MultipleSinkOption implements Serializable {

Review Comment:
   Add some comments?



##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java:
##
@@ -145,15 +146,15 @@ public final class Constants {
 .withDescription("The option 'sink.multiple.enable' "
 + "is used to determine whether to support 
multiple sink writing, default is 'false'.");
 
-public static final ConfigOption SINK_MULTIPLE_ADD_COLUMN_POLICY =
+public static final ConfigOption 
SINK_MULTIPLE_ADD_COLUMN_POLICY =
 ConfigOptions.key("sink.multiple.add-column.policy")
-.stringType()
-.noDefaultValue()
-.withDescription("");
+.enumType(SchemaUpdateExceptionPolicy.class)
+.defaultValue(SchemaUpdateExceptionPolicy.TRY_IT_BEST)
+.withDescription("The action to deal with column add.");
 
-public static final ConfigOption SINK_MULTIPLE_DEL_COLUMN_POLICY =
+public static final ConfigOption 
SINK_MULTIPLE_DEL_COLUMN_POLICY =

Review Comment:
   Add some comments?



##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java:
##
@@ -0,0 +1,32 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+

[GitHub] [inlong] vernedeng commented on a diff in pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

2022-10-20 Thread GitBox


vernedeng commented on code in PR #6227:
URL: https://github.com/apache/inlong/pull/6227#discussion_r1001310721


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java:
##
@@ -67,23 +83,31 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, 
String operator) {
 
 try {
 // 1. create tubemq topic
+List streamInfoList = 
streamService.getTopicList(groupId);
+if (CollectionUtils.isEmpty(streamInfoList)) {
+log.warn("skip to create tubemq topic as no streams for 
groupId={}", groupId);
+return;
+}
 String clusterTag = groupInfo.getInlongClusterTag();
 TubeClusterInfo tubeCluster = (TubeClusterInfo) 
clusterService.getOne(clusterTag, null, ClusterType.TUBEMQ);
 String topicName = groupInfo.getMqResource();
 tubeMQOperator.createTopic(tubeCluster, topicName, operator);
 log.info("success to create tubemq topic for groupId={}", groupId);
 
 // 2. create tubemq consumer group
-// consumer naming rules: clusterTag_topicName_consumer_group
-String consumeGroup = clusterTag + "_" + topicName + 
"_consumer_group";
-tubeMQOperator.createConsumerGroup(tubeCluster, topicName, 
consumeGroup, operator);
-log.info("success to create tubemq consumer group for groupId={}", 
groupId);
-
-// insert the consumer group info
-Integer id = consumeService.saveBySystem(groupInfo, topicName, 
consumeGroup);
-log.info("success to save inlong consume [{}] for 
consumerGroup={}, groupId={}, topic={}",
-id, consumeGroup, groupId, topicName);
-
+for (InlongStreamBriefInfo stream : streamInfoList) {
+List streamSinks = sinkService.listSink(groupId, 
stream.getInlongStreamId());
+for (StreamSink sink : streamSinks) {
+String consumeGroup = String.format(TUBEMQ_CONSUMER_GROUP, 
clusterTag, topicName, sink.getId());
+tubeMQOperator.createConsumerGroup(tubeCluster, topicName, 
consumeGroup, operator);
+log.info("success to create tubemq consumer group for 
groupId={}", groupId);

Review Comment:
   it's seem like consumer group for inlong stream, instead of group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] vernedeng commented on a diff in pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

2022-10-20 Thread GitBox


vernedeng commented on code in PR #6227:
URL: https://github.com/apache/inlong/pull/6227#discussion_r1001310721


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java:
##
@@ -67,23 +83,31 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, 
String operator) {
 
 try {
 // 1. create tubemq topic
+List streamInfoList = 
streamService.getTopicList(groupId);
+if (CollectionUtils.isEmpty(streamInfoList)) {
+log.warn("skip to create tubemq topic as no streams for 
groupId={}", groupId);
+return;
+}
 String clusterTag = groupInfo.getInlongClusterTag();
 TubeClusterInfo tubeCluster = (TubeClusterInfo) 
clusterService.getOne(clusterTag, null, ClusterType.TUBEMQ);
 String topicName = groupInfo.getMqResource();
 tubeMQOperator.createTopic(tubeCluster, topicName, operator);
 log.info("success to create tubemq topic for groupId={}", groupId);
 
 // 2. create tubemq consumer group
-// consumer naming rules: clusterTag_topicName_consumer_group
-String consumeGroup = clusterTag + "_" + topicName + 
"_consumer_group";
-tubeMQOperator.createConsumerGroup(tubeCluster, topicName, 
consumeGroup, operator);
-log.info("success to create tubemq consumer group for groupId={}", 
groupId);
-
-// insert the consumer group info
-Integer id = consumeService.saveBySystem(groupInfo, topicName, 
consumeGroup);
-log.info("success to save inlong consume [{}] for 
consumerGroup={}, groupId={}, topic={}",
-id, consumeGroup, groupId, topicName);
-
+for (InlongStreamBriefInfo stream : streamInfoList) {
+List streamSinks = sinkService.listSink(groupId, 
stream.getInlongStreamId());
+for (StreamSink sink : streamSinks) {
+String consumeGroup = String.format(TUBEMQ_CONSUMER_GROUP, 
clusterTag, topicName, sink.getId());
+tubeMQOperator.createConsumerGroup(tubeCluster, topicName, 
consumeGroup, operator);
+log.info("success to create tubemq consumer group for 
groupId={}", groupId);

Review Comment:
   it's seem like consumer group for each inlong stream, instead of inlong group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] woofyzhao opened a new pull request, #6249: [INLONG-6248][Manager] Add sort status query plugin and api

2022-10-20 Thread GitBox


woofyzhao opened a new pull request, #6249:
URL: https://github.com/apache/inlong/pull/6249

   
   - Fixes #6248
   
   ### Motivation
   
   There are some pieces missing with regard to the overall inlong task 
management, for example
   
   - Sort job status is not known once after the sort job starts.
   - Intermediate status such as group CONFIGURING may stuck forever if manager 
is killed or exit abnormally.
   - The status machine is complex and may not integrate well with both 
standard mode or lightweight mode.
   
   ### Modification
   - Add sort job status query interface.
   - Add intermediate status cleanup task on manager recovery startup.
   -  optimize the  status machine  to better integrate with both standard and 
lightweight mode.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] vernedeng commented on a diff in pull request #6230: [INLONG-6216][Manager] Support get group topic info

2022-10-20 Thread GitBox


vernedeng commented on code in PR #6230:
URL: https://github.com/apache/inlong/pull/6230#discussion_r1001323113


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterBriefInfo.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Inlong cluster brief info
+ */
+@Data
+@SuperBuilder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Inlong cluster brief info")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
+public abstract class ClusterBriefInfo {

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] vernedeng commented on a diff in pull request #6230: [INLONG-6216][Manager] Support get group topic info

2022-10-20 Thread GitBox


vernedeng commented on code in PR #6230:
URL: https://github.com/apache/inlong/pull/6230#discussion_r1001326498


##
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java:
##
@@ -57,4 +57,6 @@ List selectByKey(@Param("clusterTag") 
String clusterTag, @P
 
 int deleteByPrimaryKey(Integer id);
 
+List selectByClusterTag(String clusterTag);

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] vernedeng commented on a diff in pull request #6230: [INLONG-6216][Manager] Support get group topic info

2022-10-20 Thread GitBox


vernedeng commented on code in PR #6230:
URL: https://github.com/apache/inlong/pull/6230#discussion_r1001326594


##
inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml:
##
@@ -247,4 +247,14 @@
 where id = #{id,jdbcType=INTEGER}
 
 
+
+select
+
+from inlong_cluster
+
+is_deleted = 0
+and inlong_cluster_tag = #{clusterTag, jdbcType=STRING}

Review Comment:
   my fault, fixed, thx



##
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java:
##
@@ -44,6 +44,9 @@ public interface InlongStreamExtEntityMapper {
 
 List selectByRelatedId(@Param("groupId") String 
groupId, @Param("streamId") String streamId);
 
+InlongStreamExtEntity selectByKey(@Param("groupId") String groupId, 
@Param("streamId") String streamId,
+  @Param("keyName") String keyName);

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] GanfengTan opened a new pull request, #6252: [INLONG-6251][Agent] Fix UT ConcurrentModification error

2022-10-20 Thread GitBox


GanfengTan opened a new pull request, #6252:
URL: https://github.com/apache/inlong/pull/6252

   Fix ConcurrentModification error when TestFileAgent UT
   
   - Fixes #6251 
   
   ### Motivation
   
   Run TestFIle UT error, change ArrayList to CopyOnWriteArrayList.
   
   ### Modifications
   
   ArrayList to CopyOnWriteArrayList
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
 TestFileAgent.testOneJobFullPath()
   
   - [ ] This change added tests and can be verified as follows:
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] thesumery commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

2022-10-20 Thread GitBox


thesumery commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1001429267


##
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends 
AbstractStreamOperator
+implements OneInputStreamOperator {
+
+private final ObjectMapper objectMapper = new ObjectMapper();
+
+private final CatalogLoader catalogLoader;
+private final MultipleSinkOption multipleSinkOption;
+
+private transient Catalog catalog;
+private transient SupportsNamespaces asNamespaceCatalog;
+private transient AbstractDynamicSchemaFormat 
dynamicSchemaFormat;
+
+// record cache, wait schema to consume record
+private transient Map> 
recordQueues;
+
+// schema cache
+private transient Map schemaCache;
+
+public WholeDatabaseMigrationOperator(CatalogLoader catalogLoader,
+MultipleSinkOption multipleSinkOption) {
+this.catalogLoader = catalogLoader;
+this.multipleSinkOption = multipleSinkOption;
+}
+
+@Override
+public void open() throws Exception {
+super.open();
+this.catalog = catalogLoader.loadCatalog();
+this.asNamespaceCatalog =
+catalog instanceof SupportsNamespaces ? (SupportsNamespaces) 
catalog : null;
+this.recordQueues = new HashMap<>();
+this.schemaCache = new HashMap<>();
+this.dynamicSchemaFormat = 
DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+}
+
+@Override
+public void close() throws Exception {
+super.close();
+if (catalog instanceof Closeable) {
+((Closeable) catalog).close();
+}
+}
+
+@Override
+public void processElement(StreamRecord element) throws Exception 
{
+String wholeData = element.getValue().getString(0).toString();
+
+JsonNode jsonNode = objectMapper.readTree(wholeData);
+boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
+if (isDDL) {
+execDDL(jsonNode);
+} else {
+execDML(jsonNode);
+}
+}
+
+private void execDDL(JsonNode jsonNode) {
+// todo:parse ddl sql
+}
+
+private void execDML(JsonNode jsonNode) throws IOException {
+RecordWithSchema record = parseRecord(jsonNode);
+Schema schema = schemaCache.get(record.getTableId());
+Schema dataSchema = record.getSchema();
+recordQueues.compute(record.getTableId(), (k, v) -> {
+   

[GitHub] [inlong] dockerzhang opened a new pull request, #6237: [INLONG-6236][CVE] Fix the CVE-2022-42003 for jackson-databind

2022-10-20 Thread GitBox


dockerzhang opened a new pull request, #6237:
URL: https://github.com/apache/inlong/pull/6237

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes #6236
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve?*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang commented on pull request #5989: [INLONG-5043][Manager] Add Apache Doris load node management

2022-10-20 Thread GitBox


dockerzhang commented on PR #5989:
URL: https://github.com/apache/inlong/pull/5989#issuecomment-1285060746

   duplicated with https://github.com/apache/inlong/pull/6233, close it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang closed pull request #5989: [INLONG-5043][Manager] Add Apache Doris load node management

2022-10-20 Thread GitBox


dockerzhang closed pull request #5989: [INLONG-5043][Manager] Add Apache Doris 
load node management
URL: https://github.com/apache/inlong/pull/5989


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow merged pull request #6221: [INLONG-6220][Manager] Support query cluster nodes by the manager client

2022-10-20 Thread GitBox


healchow merged PR #6221:
URL: https://github.com/apache/inlong/pull/6221


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (ebf93ef09 -> 5be8d1795)

2022-10-20 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from ebf93ef09 [INLONG-6234][DataProxy] Adjust the source report 
information acquisition source (#6235)
 add 5be8d1795 [INLONG-6220][Manager] Support query cluster nodes by the 
manager client (#6221)

No new revisions were added by this update.

Summary of changes:
 .../pojo/dataproxy/DataProxyNodeResponse.java  |   1 +
 .../inlong/manager/client/api/InlongClient.java|  11 ++
 .../manager/client/api/impl/InlongClientImpl.java  |   9 +-
 .../api/inner/client/InlongClusterClient.java  |  17 +++
 .../client/api/service/InlongClusterApi.java   |   6 +
 .../client/api/inner/ClientFactoryTest.java|  27 +++-
 .../service/cluster/InlongClusterService.java  |  10 ++
 .../service/cluster/InlongClusterServiceImpl.java  | 154 -
 .../web/controller/InlongClusterController.java|  17 ++-
 9 files changed, 183 insertions(+), 69 deletions(-)



[GitHub] [inlong] GanfengTan commented on issue #6186: [Umbrella] Support node protocol reporting and query

2022-10-20 Thread GitBox


GanfengTan commented on issue #6186:
URL: https://github.com/apache/inlong/issues/6186#issuecomment-1285069779

   All functions are supported.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

2022-10-20 Thread GitBox


yunqingmoswu commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1000226942


##
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java:
##
@@ -0,0 +1,227 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.IcebergStreamWriter;
+import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+/**
+ * Iceberg writer that can distinguish different sink tables and route and 
distribute data into different IcebergStreamWriter.
+ */
+public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction
+implements CheckpointedFunction, BoundedOneInput {
+private static final Logger LOG = 
LoggerFactory.getLogger(IcebergMultipleStreamWriter.class);
+
+private final boolean appendMode;
+// 可以吧这里的catalogLoad封装在tableLoader中
+private final CatalogLoader catalogLoader;
+
+private transient Catalog catalog;
+// 下面两个的key都是整个db.table的路径
+private transient Map> 
multipleWriters;
+private transient Map multipleTables;
+private transient Map multipleSchemas;
+private transient FunctionInitializationContext 
functionInitializationContext;
+
+
+public IcebergMultipleStreamWriter(boolean appendMode, CatalogLoader 
catalogLoader) {
+this.appendMode = appendMode;
+this.catalogLoader = catalogLoader;
+}
+
+@Override
+public void open(Configuration parameters) throws Exception {
+this.catalog = catalogLoader.loadCatalog();
+this.multipleWriters = new HashMap<>();
+this.multipleTables = new HashMap<>();
+this.multipleSchemas = new HashMap<>();
+}

[GitHub] [inlong-website] gosonzhang opened a new pull request, #571: [INLONG-6238][Doc] Add configuration doc for DataProxy

2022-10-20 Thread GitBox


gosonzhang opened a new pull request, #571:
URL: https://github.com/apache/inlong-website/pull/571

   
   - Fixes : https://github.com/apache/inlong/issues/6238
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong-website] healchow commented on a diff in pull request #571: [INLONG-6238][Doc] Add configuration doc for DataProxy

2022-10-20 Thread GitBox


healchow commented on code in PR #571:
URL: https://github.com/apache/inlong-website/pull/571#discussion_r1000263529


##
docs/modules/dataproxy/configuration.md:
##
@@ -0,0 +1,50 @@
+---
+title: Configuration
+sidebar_position: 3
+---
+## Basic Configuration (common.properties)
+
+| Parameter | Description  
 | Default  
| Notes 
|
+||---|--|---|
+| manager.hosts   | InLong-Manager 
http host and port | 
127.0.0.1:8083| Empty is not allowed, the format is 
configured according to the format of {ip1:port1}[,{ip2:port2}][,{ip3:port3}] |
+| manager.auth.secretId   | InLong-Manager 
authentic secretId | `blank`
 | If InLong-Manager doesn't open authentic 
service, this parameter can be empty.|
+| manager.auth.secretKey  | InLong-Manager 
authentic secretKey| `blank`
|   
|
+| proxy.cluster.tag   | The cluster set 
name where the dataproxy is located   | 
default_cluster  | A cluster set can contain multiple 
dataproxy and MQ clusters  |
+| proxy.cluster.name  | The cluster name 
where dataproxy is located   | 
default_dataproxy| used to distinguish different 
environments
|
+| proxy.cluster.inCharges | The incharge of 
the cluster where dataproxy is located| admin   
 |  
 |
+| configCheckInterval | Configuration file 
checking and loading interval of the conf directory (unit: ms) | 1  
|   
|
+| metricDomains   | JMX domain name
   | DataProxy  
   | Obtain the following configuration items of 
"metricDomains.${metricDomains}.xxx" through this configuration value |
+| metricDomains.DataProxy.domainListeners | The class for 
indicator monitoring, and the service is started through reflection of the 
class name   | 
org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener| If 
there are multiple indicator monitoring class configurations, separate them by 
spaces, carriage returns, or line feeds 
   |
+| metricDomains.DataProxy.snapshotInterval| Time interval for 
periodic reporting of indicators (unit: ms) 
| 6 |   

|
+| prometheusHttpPort  | The port when 
reporting using Prometheus  
 | 9081   | 
  |
+| audit.enable| Whether to enable 
data reporting InLong-Audit service 
| true| 

  |
+| audit.proxys| The address of the 
InLong-Audit service   

[GitHub] [inlong-website] gosonzhang commented on a diff in pull request #571: [INLONG-6238][Doc] Add configuration doc for DataProxy

2022-10-20 Thread GitBox


gosonzhang commented on code in PR #571:
URL: https://github.com/apache/inlong-website/pull/571#discussion_r1000267545


##
docs/modules/dataproxy/configuration.md:
##
@@ -0,0 +1,50 @@
+---
+title: Configuration
+sidebar_position: 3
+---
+## Basic Configuration (common.properties)
+
+| Parameter | Description  
 | Default  
| Notes 
|
+||---|--|---|
+| manager.hosts   | InLong-Manager 
http host and port | 
127.0.0.1:8083| Empty is not allowed, the format is 
configured according to the format of {ip1:port1}[,{ip2:port2}][,{ip3:port3}] |
+| manager.auth.secretId   | InLong-Manager 
authentic secretId | `blank`
 | If InLong-Manager doesn't open authentic 
service, this parameter can be empty.|
+| manager.auth.secretKey  | InLong-Manager 
authentic secretKey| `blank`
|   
|
+| proxy.cluster.tag   | The cluster set 
name where the dataproxy is located   | 
default_cluster  | A cluster set can contain multiple 
dataproxy and MQ clusters  |
+| proxy.cluster.name  | The cluster name 
where dataproxy is located   | 
default_dataproxy| used to distinguish different 
environments
|
+| proxy.cluster.inCharges | The incharge of 
the cluster where dataproxy is located| admin   
 |  
 |
+| configCheckInterval | Configuration file 
checking and loading interval of the conf directory (unit: ms) | 1  
|   
|
+| metricDomains   | JMX domain name
   | DataProxy  
   | Obtain the following configuration items of 
"metricDomains.${metricDomains}.xxx" through this configuration value |
+| metricDomains.DataProxy.domainListeners | The class for 
indicator monitoring, and the service is started through reflection of the 
class name   | 
org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener| If 
there are multiple indicator monitoring class configurations, separate them by 
spaces, carriage returns, or line feeds 
   |
+| metricDomains.DataProxy.snapshotInterval| Time interval for 
periodic reporting of indicators (unit: ms) 
| 6 |   

|
+| prometheusHttpPort  | The port when 
reporting using Prometheus  
 | 9081   | 
  |
+| audit.enable| Whether to enable 
data reporting InLong-Audit service 
| true| 

  |
+| audit.proxys| The address of the 
InLong-Audit service 

[GitHub] [inlong-website] dockerzhang commented on a diff in pull request #571: [INLONG-6238][Doc] Add configuration doc for DataProxy

2022-10-20 Thread GitBox


dockerzhang commented on code in PR #571:
URL: https://github.com/apache/inlong-website/pull/571#discussion_r1000273876


##
i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/dataproxy/configuration.md:
##
@@ -0,0 +1,52 @@
+---
+title: 配置
+sidebar_position: 3
+---
+
+## 基本配置(common.properties)
+
+| 参数  | 描述 
  | 默认值  | 备注   
   |
+|-|--|--|-|
+| manager.hosts   | Manager服务器IP和端口列表  
  | 127.0.0.1:8083  | 不允许为空,格式按照 
{ip1:port1}[,{ip2:port2}][,{ip3:port3}] 格式进行配置   |
+| manager.auth.secretId   | 登录所需帐号 
  | 无| 允许为空不填写  
   |
+| manager.auth.secretKey  | 登录所需密码 
  | 无| 允许为空不填写  
   |
+| proxy.cluster.tag   | dataproxy所处的集群Set名 
  | default_cluster  | 一个集群set里可以包含多个dataproxy、MQ的集群
   |
+| proxy.cluster.name  | dataproxy所处的集群名
  | default_dataproxy| 用于区分不同的环境
   |
+| proxy.cluster.inCharges | dataproxy所处的集群负责人  
  | admin|  
   |
+| configCheckInterval | 
conf目录的配置文件检查及加载间隔(单位ms) | 1  | 
|
+| metricDomains   | JMX域名  
  | DataProxy | 
通过该配置值获取如下“metricDomains.${metricDomains}.xxx”的配置项  |
+| metricDomains.DataProxy.domainListeners | 指标监听的类,通过该类名反射开启服务 
  | org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener 
   | 如果存在多个指标监听类配置,通过空格、回车,或换行符进行分隔 
 |
+| metricDomains.DataProxy.snapshotInterval| 周期性上报指标的时隙(单位ms)   
  | 6 | 
|
+| prometheusHttpPort  | 使用 Prometheus 
上报时设置的端口号  | 9081   |   
  |
+| audit.enable| 是否开启数据上报 
InLong-Audit 服务 | true| 
|
+| audit.proxys| InLong-Audit 服务的地址 
  | 127.0.0.1:10081  | 格式按照 {ip1:port1}[ {ip2:port2}][ 
{ip3:port3}]进行配置,不同ip:port间通过空格、回车,或换行符进行分隔 |
+
+
+## 日志输出配置(log4j2.xml)
+DataProxy采用的是Log4j2输出日志,相关配置基于Log4j2进行设置,本配置只提及常用设置项:
+
+| 参数  | 描述| 默认值   | 备注 |
+|-|---|---||
+| basePath  | 日志文件的存储路径 | ${sys:dataproxy.log.path}||
+| every_file_size  | 每个日志文件的大小,单位字节   | 1G||
+| output_log_level | 日志输出级别   | DEBUG | 线上使用时建议设置为INFO |
+| rolling_max| 同类型日志同一目录下可存放的数量 | 50||
+| debug_max| DEBUG类型日志同一目录下可存放的数量 | 7 ||
+| info_max| INFO类型日志同一目录下可存放的数量  | 7 ||
+| warn_max| WARN类型日志同一目录下可存放的数量  | 7 ||
+| error_max| ERROR类型日志同一目录下可存放的数量 | 7 ||
+
+
+## source-channel-sink管道配置(dataproxy-*.conf)

Review Comment:
   dataproxy-*.conf
   ->
   dataproxy-{tube|pulsar}.conf



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong-website] healchow commented on a diff in pull request #571: [INLONG-6238][Doc] Add configuration doc for DataProxy

2022-10-20 Thread GitBox


healchow commented on code in PR #571:
URL: https://github.com/apache/inlong-website/pull/571#discussion_r1000290536


##
i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/dataproxy/configuration.md:
##
@@ -0,0 +1,52 @@
+---
+title: 配置
+sidebar_position: 3
+---
+
+## 基本配置(common.properties)
+
+| 参数  | 描述 
  | 默认值  | 备注   
   |
+|-|--|--|-|
+| manager.hosts   | Manager服务器IP和端口列表  
  | 127.0.0.1:8083  | 不允许为空,格式按照 
{ip1:port1}[,{ip2:port2}][,{ip3:port3}] 格式进行配置   |

Review Comment:
   Suggest adding one blank before or between English and Chinese.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong-website] healchow commented on a diff in pull request #571: [INLONG-6238][Doc] Add configuration doc for DataProxy

2022-10-20 Thread GitBox


healchow commented on code in PR #571:
URL: https://github.com/apache/inlong-website/pull/571#discussion_r1000292138


##
i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/dataproxy/configuration.md:
##
@@ -0,0 +1,52 @@
+---
+title: 配置
+sidebar_position: 3
+---
+
+## 基本配置(common.properties)
+
+| 参数  | 描述 
  | 默认值  | 备注   
   |
+|-|--|--|-|
+| manager.hosts   | Manager服务器IP和端口列表  
  | 127.0.0.1:8083  | 不允许为空,格式按照 
{ip1:port1}[,{ip2:port2}][,{ip3:port3}] 格式进行配置   |
+| manager.auth.secretId   | 登录所需帐号 
  | 无| 允许为空不填写  
   |
+| manager.auth.secretKey  | 登录所需密码 
  | 无| 允许为空不填写  
   |
+| proxy.cluster.tag   | dataproxy所处的集群Set名 
  | default_cluster  | 一个集群set里可以包含多个dataproxy、MQ的集群
   |

Review Comment:
   `dataproxy所处的集群Set名` -> `DataProxy 所处的集群 Tag 名`.
   
   `Set` is a new name, suggest unifying it to `Tag`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong-website] healchow commented on a diff in pull request #571: [INLONG-6238][Doc] Add configuration doc for DataProxy

2022-10-20 Thread GitBox


healchow commented on code in PR #571:
URL: https://github.com/apache/inlong-website/pull/571#discussion_r1000294768


##
i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/dataproxy/configuration.md:
##
@@ -0,0 +1,52 @@
+---
+title: 配置
+sidebar_position: 3
+---
+
+## 基本配置(common.properties)
+
+| 参数  | 描述 
  | 默认值  | 备注   
   |
+|-|--|--|-|
+| manager.hosts   | Manager服务器IP和端口列表  
  | 127.0.0.1:8083  | 不允许为空,格式按照 
{ip1:port1}[,{ip2:port2}][,{ip3:port3}] 格式进行配置   |
+| manager.auth.secretId   | 登录所需帐号 
  | 无| 允许为空不填写  
   |
+| manager.auth.secretKey  | 登录所需密码 
  | 无| 允许为空不填写  
   |
+| proxy.cluster.tag   | dataproxy所处的集群Set名 
  | default_cluster  | 一个集群set里可以包含多个dataproxy、MQ的集群
   |
+| proxy.cluster.name  | dataproxy所处的集群名
  | default_dataproxy| 用于区分不同的环境
   |
+| proxy.cluster.inCharges | dataproxy所处的集群负责人  
  | admin|  
   |
+| configCheckInterval | 
conf目录的配置文件检查及加载间隔(单位ms) | 1  | 
|
+| metricDomains   | JMX域名  
  | DataProxy | 
通过该配置值获取如下“metricDomains.${metricDomains}.xxx”的配置项  |
+| metricDomains.DataProxy.domainListeners | 指标监听的类,通过该类名反射开启服务 
  | org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener 
   | 如果存在多个指标监听类配置,通过空格、回车,或换行符进行分隔 
 |
+| metricDomains.DataProxy.snapshotInterval| 周期性上报指标的时隙(单位ms)   
  | 6 | 
|
+| prometheusHttpPort  | 使用 Prometheus 
上报时设置的端口号  | 9081   |   
  |
+| audit.enable| 是否开启数据上报 
InLong-Audit 服务 | true| 
|
+| audit.proxys| InLong-Audit 服务的地址 
  | 127.0.0.1:10081  | 格式按照 {ip1:port1}[ {ip2:port2}][ 
{ip3:port3}]进行配置,不同ip:port间通过空格、回车,或换行符进行分隔 |
+
+
+## 日志输出配置(log4j2.xml)
+DataProxy采用的是Log4j2输出日志,相关配置基于Log4j2进行设置,本配置只提及常用设置项:
+
+| 参数  | 描述| 默认值   | 备注 |
+|-|---|---||
+| basePath  | 日志文件的存储路径 | ${sys:dataproxy.log.path}||
+| every_file_size  | 每个日志文件的大小,单位字节   | 1G||
+| output_log_level | 日志输出级别   | DEBUG | 线上使用时建议设置为INFO |
+| rolling_max| 同类型日志同一目录下可存放的数量 | 50||
+| debug_max| DEBUG类型日志同一目录下可存放的数量 | 7 ||
+| info_max| INFO类型日志同一目录下可存放的数量  | 7 ||
+| warn_max| WARN类型日志同一目录下可存放的数量  | 7 ||
+| error_max| ERROR类型日志同一目录下可存放的数量 | 7 ||
+
+
+## source-channel-sink管道配置(dataproxy-*.conf)
+
+DataProxy 支持配置化的 source-channel-sink,配置方式与 flume 的配置文件结构类似。配置文件放在 
dataproxy-*.conf 文件中,目前支持 dataproxy-pulsar.conf 和 dataproxy-tube.conf 
两种,用于区分不同的中间件类型,具体类型可以在启动时指定,默认(未指定时)使用 dataproxy-pulsar.conf 作为配置文件。
+
+
+## 其他自动更新配置
+DataProxy启动并成功链接到Manager后,会自动从Manager同步运行配置,并定期更新,如下部分配置不可修改
+
+| 配置文件名| 描述|  备注 |
+|--|---|-|
+| mq_cluster.properties | MQ集群配置| |
+| topics.properties  | groupId到MQ的Topic的映射配置 | |

Review Comment:
   `groupId到MQ的Topic的映射配置` -> `InlongGroupId 和不同类型 MQ 的 Topic 的映射配置`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log o

[GitHub] [inlong] haifxu opened a new pull request, #6240: [INLONG-6239][Manager] Add inlongctl in the root directory

2022-10-20 Thread GitBox


haifxu opened a new pull request, #6240:
URL: https://github.com/apache/inlong/pull/6240

   ### Prepare a Pull Request
   
   - Fixes #6239 
   
   ### Modifications
   
   Add `bin/inlongctl` in the root directory to execute 
`inlong-manager/bin/managerctl`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] lucaspeng12138 opened a new pull request, #6242: [INLONG-6241][Docker] Add manager and audit database table configuration

2022-10-20 Thread GitBox


lucaspeng12138 opened a new pull request, #6242:
URL: https://github.com/apache/inlong/pull/6242

   ### Prepare a Pull Request
   
   Add manager and audit database table configuration
   - Fixes #6241 
   
   ### Motivation
   
   Add manager and audit database table configuration
   
   ### Modifications
   
   Add manager and audit database table configuration
   
   ### Verifying this change
   
   Add manager and audit database table configuration


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6240: [INLONG-6239][InLong] Add inlongctl in the root directory

2022-10-20 Thread GitBox


healchow commented on code in PR #6240:
URL: https://github.com/apache/inlong/pull/6240#discussion_r1000376108


##
bin/inlongctl:
##
@@ -0,0 +1,25 @@
+#! /bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

Review Comment:
   ```suggestion
   # contributor license agreements. See the NOTICE file distributed with
   # this work for additional information regarding copyright ownership.
   # The ASF licenses this file to You under the Apache License, Version 2.0
   # (the "License"); you may not use this file except in compliance with
   # the License. You may obtain a copy of the License at
   #
   # http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing, software
   # distributed under the License is distributed on an "AS IS" BASIS,
   # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   # See the License for the specific language governing permissions and
   # limitations under the License.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6240: [INLONG-6239][InLong] Add inlongctl in the root directory

2022-10-20 Thread GitBox


healchow commented on code in PR #6240:
URL: https://github.com/apache/inlong/pull/6240#discussion_r1000380309


##
bin/inlongctl:
##
@@ -0,0 +1,24 @@
+#! /bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+INLONG_HOME=$(
+  cd $(dirname $0)

Review Comment:
   ```suggestion
 # shellcheck disable=SC2164
 cd $(dirname $0)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu opened a new pull request, #6244: [INLONG-6243][Sort] Add custom name for sort job

2022-10-20 Thread GitBox


yunqingmoswu opened a new pull request, #6244:
URL: https://github.com/apache/inlong/pull/6244

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   Title: [INLONG-6243][Sort] Add custom name for sort job
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   Fixes #6243 
   
   ### Motivation
   
   Add custom name for sort job, if it is not specified, it will be 
'InLong-Sort-Job'.
   
   ### Modifications
   
   1. Add job name setting with 'org.apache.inlong.sort.Entrance'
   2. Add  JOB_NAME constant  for 
'org.apache.inlong.sort.configuration.Constants'
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #6244: [INLONG-6243][Sort] Add custom name for sort job

2022-10-20 Thread GitBox


EMsnap commented on code in PR #6244:
URL: https://github.com/apache/inlong/pull/6244#discussion_r1000413351


##
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java:
##
@@ -52,6 +52,7 @@ public static void main(String[] args) throws Exception {
 EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner()
 .inStreamingMode().build();
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+tableEnv.getConfig().getConfiguration().setString("pipeline.name", 
config.getString(Constants.JOB_NAME));

Review Comment:
   All string in entrance are extracted as constant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] haibo-duan opened a new pull request, #6245: [INLONG-4986][Agent] Support MQTT Source

2022-10-20 Thread GitBox


haibo-duan opened a new pull request, #6245:
URL: https://github.com/apache/inlong/pull/6245

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - About #4986 
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve?*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #6242: [INLONG-6241][Docker] Add manager and audit database name configuration

2022-10-20 Thread GitBox


dockerzhang merged PR #6242:
URL: https://github.com/apache/inlong/pull/6242


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6241][Docker] Add manager and audit database name configuration (#6242)

2022-10-20 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 055bd93dd [INLONG-6241][Docker] Add manager and audit database name 
configuration (#6242)
055bd93dd is described below

commit 055bd93ddbce0398861c6c54170923924889826c
Author: Lucas <100204617+lucaspeng12...@users.noreply.github.com>
AuthorDate: Thu Oct 20 19:04:03 2022 +0800

[INLONG-6241][Docker] Add manager and audit database name configuration 
(#6242)
---
 inlong-audit/audit-docker/Dockerfile|  2 ++
 inlong-audit/audit-docker/audit-docker.sh   | 10 --
 inlong-manager/manager-docker/Dockerfile|  2 ++
 inlong-manager/manager-docker/manager-docker.sh |  7 ++-
 4 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/inlong-audit/audit-docker/Dockerfile 
b/inlong-audit/audit-docker/Dockerfile
index 091830fed..0a1042f3e 100644
--- a/inlong-audit/audit-docker/Dockerfile
+++ b/inlong-audit/audit-docker/Dockerfile
@@ -30,6 +30,7 @@ ENV PULSAR_BROKER_LIST="127.0.0.1:6650"
 ENV PULSAR_AUDIT_TOPIC="persistent:\/\/public\/default\/inlong-audit"
 ENV TUBE_MASTER_LIST="localhost:8715"
 ENV TUBE_AUDIT_TOPIC="inlong-audit"
+ENV AUDIT_DBNAME="apache_inlong_audit"
 # mysql / clickhouse / elasticsearch
 ENV STORE_MODE=mysql
 # mysql
@@ -40,6 +41,7 @@ ENV PASSWORD=inlong
 ENV STORE_CK_URL=127.0.0.1:8123
 ENV STORE_CK_USERNAME=default
 ENV STORE_CK_PASSWD=default
+ENV STORE_CK_DBNAME="apache_inlong_audit"
 # elasticsearch
 ENV STORE_ES_HOST=127.0.0.1
 ENV STORE_ES_PORT=9200
diff --git a/inlong-audit/audit-docker/audit-docker.sh 
b/inlong-audit/audit-docker/audit-docker.sh
index 6a76fa670..474f68934 100755
--- a/inlong-audit/audit-docker/audit-docker.sh
+++ b/inlong-audit/audit-docker/audit-docker.sh
@@ -21,6 +21,8 @@ file_path=$(cd "$(dirname "$0")"/../;pwd)
 store_conf_file=${file_path}/conf/application.properties
 # proxy config
 proxy_conf_file=${file_path}/conf/audit-proxy-${MQ_TYPE}.conf
+sql_file="${file_path}"/sql/apache_inlong_audit.sql
+sql_ck_file="${file_path}"/sql/apache_inlong_audit_clickhouse.sql
 
 # replace the configuration for audit proxy
 if [ "${MQ_TYPE}" = "pulsar" ]; then
@@ -45,13 +47,17 @@ if [ -n "${STORE_MODE}" ]; then
   sed -i 
"s/audit.config.store.mode=.*$/audit.config.store.mode=${STORE_MODE}/g" 
"${store_conf_file}"
 fi
 # DB
-sed -i "s/127.0.0.1:3306/${JDBC_URL}/g" "${store_conf_file}"
+sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${JDBC_URL}\/${AUDIT_DBNAME}/g" 
"${store_conf_file}"
 sed -i 
"s/spring.datasource.druid.username=.*$/spring.datasource.druid.username=${USERNAME}/g"
 "${store_conf_file}"
 sed -i 
"s/spring.datasource.druid.password=.*$/spring.datasource.druid.password=${PASSWORD}/g"
 "${store_conf_file}"
+# mysql file for audit
+sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_file}"
 # clickhouse
-sed -i 
"s/clickhouse.url=.*$/clickhouse.url=jdbc:clickhouse:\/\/${STORE_CK_URL}\/apache_inlong_audit/g"
 "${store_conf_file}"
+sed -i 
"s/clickhouse.url=.*$/clickhouse.url=jdbc:clickhouse:\/\/${STORE_CK_URL}\/${STORE_CK_DBNAME}/g"
 "${store_conf_file}"
 sed -i "s/clickhouse.username=.*$/clickhouse.username=${STORE_CK_USERNAME}/g" 
"${store_conf_file}"
 sed -i "s/clickhouse.password=.*$/clickhouse.password=${STORE_CK_PASSWD}/g" 
"${store_conf_file}"
+# mysql file for clickhouse
+sed -i "s/apache_inlong_audit/${STORE_CK_DBNAME}/g" "${sql_ck_file}"
 # elasticsearch
 sed -i "s/elasticsearch.host=.*$/elasticsearch.host=${STORE_ES_HOST}/g" 
"${store_conf_file}"
 sed -i "s/elasticsearch.port=.*$/elasticsearch.port=${STORE_ES_PORT}/g" 
"${store_conf_file}"
diff --git a/inlong-manager/manager-docker/Dockerfile 
b/inlong-manager/manager-docker/Dockerfile
index 405b3e533..a97ed4248 100644
--- a/inlong-manager/manager-docker/Dockerfile
+++ b/inlong-manager/manager-docker/Dockerfile
@@ -28,6 +28,7 @@ ENV USERNAME=root
 ENV PASSWORD=inlong
 ENV ZK_URL=127.0.0.1:2181
 ENV CLEANSING_ENABLE=false
+ENV MANAGER_DBNAME=apache_inlong_manager
 # support download plugins from remote address.
 ENV PLUGINS_URL=default
 # for flink-sort-plugin.properties
@@ -50,6 +51,7 @@ ADD target/sort-connector-hive-${VERSION}.jar 
/opt/inlong-sort/connectors/
 ENV AUDIT_CK_URL=127.0.0.1:8123
 ENV AUDIT_CK_USERNAME=default
 ENV AUDIT_CK_PASSWD=default
+ENV AUDIT_CK_DBNAME="apache_inlong_audit"
 # add mysql connector
 RUN wget -P lib/ 
https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar
 ADD manager-docker.sh bin/
diff --git a/inlong-manager/manager-docker/manager-docker.sh 
b/inlong-manager/manager-docker/manager-docker.sh
index 924f07ea9..478e5b280 100644
--- a/inlong-manager/manager-docker/manager-docker.sh
+++ b/inlong-manager/manager-docker/manager-docker.sh
@@ -27,26 +27,31 @@ fi
 
 conf_file="${file_path}"/conf/application-"${ACTIVE_PROFILE}".properties
 flink_conf

[GitHub] [inlong-website] dockerzhang merged pull request #571: [INLONG-6238][Doc] Add configuration doc for DataProxy

2022-10-20 Thread GitBox


dockerzhang merged PR #571:
URL: https://github.com/apache/inlong-website/pull/571


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong-website] branch master updated: [INLONG-6238][Doc] Add configuration doc for DataProxy (#571)

2022-10-20 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
 new 3953f07a4c [INLONG-6238][Doc] Add configuration doc for DataProxy 
(#571)
3953f07a4c is described below

commit 3953f07a4c48514bebfb1a740fe0c47dbedafd10
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Thu Oct 20 19:06:35 2022 +0800

[INLONG-6238][Doc] Add configuration doc for DataProxy (#571)
---
 docs/modules/dataproxy/configuration.md| 186 
 docs/modules/dataproxy/overview.md | 139 +--
 docs/modules/dataproxy/quick_start.md  |   1 +
 .../current/modules/dataproxy/configuration.md | 187 +
 .../current/modules/dataproxy/overview.md  | 138 +--
 .../current/modules/dataproxy/quick_start.md   |   1 +
 6 files changed, 377 insertions(+), 275 deletions(-)

diff --git a/docs/modules/dataproxy/configuration.md 
b/docs/modules/dataproxy/configuration.md
new file mode 100644
index 00..e345ce4806
--- /dev/null
+++ b/docs/modules/dataproxy/configuration.md
@@ -0,0 +1,186 @@
+---
+title: Configuration
+sidebar_position: 3
+---
+## Basic Configuration (common.properties)
+
+| Parameter | Description  

   | Default  | Notes   

   |
+||-|--||
+| manager.hosts   | InLong-Manager 
http host and port  
 | 127.0.0.1:8083| Empty is not allowed, the format 
is configured according to the format of 
{ip1:port1}[,{ip2:port2}][,{ip3:port3}] 
 |
+| manager.auth.secretId   | InLong-Manager 
authentic secretId  
 | `blank` | If InLong-Manager doesn't open 
authentic service, this parameter can be empty. 
|
+| manager.auth.secretKey  | InLong-Manager 
authentic secretKey 
 | `blank`| 

   |
+| proxy.cluster.tag   | The cluster Tag 
name where the dataproxy is located 
| default_cluster  | A cluster Tag can contain multiple 
dataproxy and MQ clusters   
|
+| proxy.cluster.name  | The cluster name 
where dataproxy is located  
   | default_dataproxy| used to distinguish different 
environments
 |
+| proxy.cluster.inCharges | The incharge of 
the cluster where dataproxy is located  
| admin|

|
+| configCheckInterval | Configuration file 
checking and loading interval of the conf directory (unit: ms)  
 | 1  | 

   |
+| metricDomains   | JMX domain name

 | DataProxy | Obtain the following configuration 
items of "metricDomains.${metricDomains}.xxx" through this configuration