Re: [I] [Feature][Manager]Support Tube MQ sink [inlong]

2023-11-29 Thread via GitHub


vernedeng commented on issue #9303:
URL: https://github.com/apache/inlong/issues/9303#issuecomment-1831443725

   This issue will be done in the following version, but now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9354][Dashboard] Data access File sources optimization [inlong]

2023-11-29 Thread via GitHub


leezng commented on code in PR #9356:
URL: https://github.com/apache/inlong/pull/9356#discussion_r1409061918


##
inlong-dashboard/src/plugins/sources/defaults/File.ts:
##
@@ -28,6 +28,112 @@ const { I18n } = DataWithBackend;
 const { FieldDecorator } = RenderRow;
 const { ColumnDecorator } = RenderList;
 
+const timeZoneList = [

Review Comment:
   I would suggest using an array, for example `[-12,13].map(...)` loop to 
generate the data.
   
   However, it won't have much impact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9355][SDK] Optimize resource isolation for CPP SDK [inlong]

2023-11-29 Thread via GitHub


dockerzhang merged PR #9357:
URL: https://github.com/apache/inlong/pull/9357


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-9355][SDK] Optimize resource isolation for CPP SDK (#9357)

2023-11-29 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 7178b39a3f [INLONG-9355][SDK] Optimize resource isolation for CPP SDK 
(#9357)
7178b39a3f is described below

commit 7178b39a3fd30d1af9a26d5259f72b727167c519
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Wed Nov 29 18:21:10 2023 +0800

[INLONG-9355][SDK] Optimize resource isolation for CPP SDK (#9357)
---
 .../dataproxy-sdk-cpp/release/demo/send_demo.cc|  2 +-
 .../dataproxy-sdk-cpp/release/inc/sdk_conf.h   |  3 +-
 .../dataproxy-sdk-cpp/src/config/sdk_conf.cc   | 19 +-
 .../dataproxy-sdk-cpp/src/group/recv_group.cc  | 29 +---
 .../dataproxy-sdk-cpp/src/group/recv_group.h   |  1 +
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 40 +++---
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.h  |  3 +-
 .../dataproxy-sdk-cpp/src/manager/send_manager.cc  | 24 ++---
 .../dataproxy-sdk-cpp/src/manager/send_manager.h   |  1 +
 .../dataproxy-sdk-cpp/src/utils/capi_constant.h|  3 ++
 10 files changed, 76 insertions(+), 49 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc
index a4a783e0dc..b09f8463f1 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc
@@ -53,7 +53,7 @@ int main(int argc, char const *argv[]) {
   cout << ">start sdk successfully" << endl;
 
   int count = 1000;
-  string inlong_group_id = "test_cpp_sdk_20230404";
+  string inlong_group_id = "test_cpp_sdk";
   string inlong_stream_id = "stream1";
   if (4 == argc) {
 inlong_group_id = argv[2];
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
index 578278c5fd..6d7b23dc21 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
@@ -83,7 +83,7 @@ private:
   uint64_t max_proxy_num_;
   uint64_t reserve_proxy_num_;
   uint32_t msg_type_;
-  bool enable_isolation_;
+  uint32_t isolation_level_;
 
   // Network parameters
   bool enable_tcp_nagle_;
@@ -92,6 +92,7 @@ private:
   bool enable_balance_;
   bool enable_local_cache_;
 
+
   // auth settings
   bool need_auth_;
   std::string auth_id_;
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index e9d679cd8c..319262adc2 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -92,6 +92,7 @@ void SdkConfig::defaultInit() {
   load_balance_interval_ = constants::kLoadBalanceInterval;
   heart_beat_interval_ = constants::kHeartBeatInterval;
   enable_balance_ = constants::kEnableBalance;
+  isolation_level_=constants::IsolationLevel::kLevelSecond;
 
   // cache parameter
   send_buf_size_ = constants::kSendBufSize;
@@ -120,7 +121,6 @@ void SdkConfig::defaultInit() {
   manager_update_interval_ = constants::kManagerUpdateInterval;
   manager_url_timeout_ = constants::kManagerTimeout;
   max_proxy_num_ = constants::kMaxProxyNum;
-  enable_isolation_ = constants::kEnableIsolation;
   reserve_proxy_num_ = constants::kReserveProxyNum;
   enable_local_cache_ = constants::kEnableLocalCache;
 
@@ -360,13 +360,6 @@ void SdkConfig::InitManagerParam(const rapidjson::Value 
&doc) {
 std::string inlong_group_ids_str = obj.GetString();
 Utils::splitOperate(inlong_group_ids_str, inlong_group_ids_, ",");
   }
-  // enable isolation
-  if (doc.HasMember("enable_isolation") && doc["enable_isolation"].IsBool()) {
-const rapidjson::Value &obj = doc["enable_isolation"];
-enable_isolation_ = obj.GetBool();
-  } else {
-enable_isolation_ = constants::kEnableIsolation;
-  }
 
   // enable local cache
   if (doc.HasMember("enable_local_cache") && 
doc["enable_local_cache"].IsBool()) {
@@ -375,6 +368,14 @@ void SdkConfig::InitManagerParam(const rapidjson::Value 
&doc) {
   } else {
 enable_local_cache_ = constants::kEnableLocalCache;
   }
+
+  // isolation level
+  if (doc.HasMember("isolation_level") && doc["isolation_level"].IsInt() && 
doc["isolation_level"].GetInt() > 0) {
+const rapidjson::Value &obj = doc["isolation_level"];
+isolation_level_ = obj.GetInt();
+  } else {
+isolation_level_ = constants::IsolationLevel::kLevelSecond;
+  }
 }
 
 void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
@@ -564,7 +565,7 @@ void SdkConfig::ShowClientConfig() {
   LO

Re: [PR] [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error [inlong]

2023-11-29 Thread via GitHub


EMsnap merged PR #9361:
URL: https://github.com/apache/inlong/pull/9361


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error (#9361)

2023-11-29 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new ff3cf5110a [INLONG-9359][Sort] Fix iceberg all migrate connector stack 
overflow error (#9361)
ff3cf5110a is described below

commit ff3cf5110a9eb9d4149ab6d44604a345e5e67dc1
Author: Sting 
AuthorDate: Wed Nov 29 18:59:57 2023 +0800

[INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error 
(#9361)
---
 .../sort/protocol/node/load/IcebergLoadNode.java |  5 -
 .../java/org/apache/inlong/sort/base/Constants.java  |  2 +-
 .../sort/iceberg/FlinkDynamicTableFactory.java   |  2 ++
 .../sink/multiple/DynamicSchemaHandleOperator.java   | 20 +---
 4 files changed, 20 insertions(+), 9 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index a418fd930b..9e3fb9e8dd 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -30,7 +30,6 @@ import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
-import org.apache.inlong.sort.util.SchemaChangeUtils;
 
 import com.google.common.base.Preconditions;
 import lombok.Data;
@@ -54,7 +53,6 @@ import static 
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIP
 import static 
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_ENABLE;
 import static 
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_FORMAT;
 import static 
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_TABLE_PATTERN;
-import static 
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_SCHEMA_CHANGE_POLICIES;
 
 @JsonTypeName("icebergLoad")
 @Data
@@ -177,8 +175,6 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Metadata,
 public Map tableOptions() {
 Map options = super.tableOptions();
 options.put(IcebergConstant.CONNECTOR_KEY, IcebergConstant.CONNECTOR);
-// for test sink.ignore.changelog
-// options.put("sink.ignore.changelog", "true");
 options.put(IcebergConstant.DATABASE_KEY, dbName);
 options.put(IcebergConstant.TABLE_KEY, tableName);
 options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
@@ -197,7 +193,6 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Metadata,
 options.put(SINK_MULTIPLE_FORMAT, 
Objects.requireNonNull(sinkMultipleFormat).identifier());
 options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern);
 options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern);
-options.put(SINK_SCHEMA_CHANGE_POLICIES, 
SchemaChangeUtils.serialize(policyMap));
 } else {
 options.put(SINK_MULTIPLE_ENABLE, "false");
 }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index ac7baac4dd..ba47c74356 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -419,7 +419,7 @@ public final class Constants {
 public static final ConfigOption 
SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT =
 ConfigOptions.key("sink.multiple.auto-create-table-when-snapshot")
 .booleanType()
-.defaultValue(false)
+.defaultValue(true)
 .withDescription("Whether supporting auto create table 
when snapshot, default value is 'false'");
 
 public static final ConfigOption INNER_FORMAT =
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index bb33e896bd..9a6056e614 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -57,6 +57,7 @@ import static 
org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import 

[PR] [INLONG-9364][Agent] Remove expired instance from db [inlong]

2023-11-29 Thread via GitHub


justinwwhuang opened a new pull request, #9365:
URL: https://github.com/apache/inlong/pull/9365

   [INLONG-9364][Agent] Remove expired instance from db
   - Fixes #9364 
   
   ### Motivation
   
   Remove expired instance from db
   ### Modifications
   
   Remove expired instance from db
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-8358][Sort] Add kafka connector on flink 1.15 [inlong]

2023-11-29 Thread via GitHub


hnrainll commented on PR #8713:
URL: https://github.com/apache/inlong/pull/8713#issuecomment-1832036602

   > Sorry but there should be UT for this feature, any problem with 
implementing it ?
   
   No problem, I'm in the process of implementing UT's functionality.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9362][Manager] Iceberg support config all migrate [inlong]

2023-11-29 Thread via GitHub


EMsnap commented on code in PR #9363:
URL: https://github.com/apache/inlong/pull/9363#discussion_r1410074229


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java:
##
@@ -128,4 +137,32 @@ public List getMetaFields() {
 fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(), 
MetaField.AUDIT_DATA_TIME));
 return fieldInfos;
 }
+
+@Override
+public Boolean isSinkMultiple(StreamNode nodeInfo) {
+IcebergSink icebergSink = (IcebergSink) nodeInfo;
+return icebergSink.getSinkMultipleEnable();
+}
+
+@Override
+public List addStreamFieldsForSinkMultiple(List 
streamFields) {
+if (CollectionUtils.isEmpty(streamFields)) {
+streamFields = new ArrayList<>();
+}
+streamFields.add(0,
+new StreamField(0, "varbinary", 
MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal", null, 1,
+MetaField.DATA_BYTES_CANAL.name()));
+return streamFields;
+}
+
+@Override
+public List addSinkFieldsForSinkMultiple(List 
sinkFields) {
+if (CollectionUtils.isEmpty(sinkFields)) {
+sinkFields = new ArrayList<>();
+}
+sinkFields.add(0, new SinkField(0, "varbinary", 
MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal",

Review Comment:
   sink should be normal field with bytes type, not meta field 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9362][Manager] Iceberg support config all migrate [inlong]

2023-11-29 Thread via GitHub


EMsnap commented on code in PR #9363:
URL: https://github.com/apache/inlong/pull/9363#discussion_r1410074229


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java:
##
@@ -128,4 +137,32 @@ public List getMetaFields() {
 fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(), 
MetaField.AUDIT_DATA_TIME));
 return fieldInfos;
 }
+
+@Override
+public Boolean isSinkMultiple(StreamNode nodeInfo) {
+IcebergSink icebergSink = (IcebergSink) nodeInfo;
+return icebergSink.getSinkMultipleEnable();
+}
+
+@Override
+public List addStreamFieldsForSinkMultiple(List 
streamFields) {
+if (CollectionUtils.isEmpty(streamFields)) {
+streamFields = new ArrayList<>();
+}
+streamFields.add(0,
+new StreamField(0, "varbinary", 
MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal", null, 1,
+MetaField.DATA_BYTES_CANAL.name()));
+return streamFields;
+}
+
+@Override
+public List addSinkFieldsForSinkMultiple(List 
sinkFields) {
+if (CollectionUtils.isEmpty(sinkFields)) {
+sinkFields = new ArrayList<>();
+}
+sinkFields.add(0, new SinkField(0, "varbinary", 
MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal",

Review Comment:
   sink should be normal field with bytes type, not meta field 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9358][Manager] Supplement time zone information for creation time [inlong]

2023-11-29 Thread via GitHub


dockerzhang merged PR #9360:
URL: https://github.com/apache/inlong/pull/9360


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-9358][Manager] Supplement time zone information for creation time (#9360)

2023-11-29 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new b34c451cde [INLONG-9358][Manager] Supplement time zone information for 
creation time (#9360)
b34c451cde is described below

commit b34c451cde15046727cd618a7005b1d369eeacd9
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Thu Nov 30 10:45:44 2023 +0800

[INLONG-9358][Manager] Supplement time zone information for creation time 
(#9360)
---
 .../apache/inlong/common/pojo/agent/TaskSnapshotRequest.java |  2 +-
 .../org/apache/inlong/manager/client/ut/Kafka2HiveTest.java  | 12 +---
 .../org/apache/inlong/manager/client/cli/pojo/GroupInfo.java |  2 +-
 .../org/apache/inlong/manager/client/cli/pojo/SinkInfo.java  |  2 +-
 .../apache/inlong/manager/client/cli/pojo/SourceInfo.java|  2 +-
 .../apache/inlong/manager/client/cli/pojo/StreamInfo.java|  2 +-
 .../apache/inlong/manager/client/cli/pojo/TenantInfo.java|  4 ++--
 .../inlong/manager/client/cli/pojo/TenantRoleInfo.java   |  4 ++--
 .../org/apache/inlong/manager/client/cli/pojo/UserInfo.java  |  2 +-
 .../inlong/manager/client/api/impl/InlongGroupImplTest.java  |  6 +++---
 .../inlong/manager/client/api/inner/ClientFactoryTest.java   |  7 ---
 .../inlong/manager/pojo/audit/AuditSourceResponse.java   |  4 ++--
 .../org/apache/inlong/manager/pojo/cluster/ClusterInfo.java  |  4 ++--
 .../inlong/manager/pojo/cluster/ClusterNodeResponse.java |  4 ++--
 .../inlong/manager/pojo/cluster/ClusterTagResponse.java  |  4 ++--
 .../inlong/manager/pojo/cluster/TenantClusterTagInfo.java|  4 ++--
 .../inlong/manager/pojo/consume/InlongConsumeBriefInfo.java  |  4 ++--
 .../inlong/manager/pojo/consume/InlongConsumeInfo.java   |  4 ++--
 .../inlong/manager/pojo/group/InlongGroupBriefInfo.java  |  4 ++--
 .../apache/inlong/manager/pojo/group/InlongGroupInfo.java|  4 ++--
 .../manager/pojo/heartbeat/ComponentHeartbeatResponse.java   |  2 +-
 .../manager/pojo/heartbeat/GroupHeartbeatResponse.java   |  2 +-
 .../manager/pojo/heartbeat/StreamHeartbeatResponse.java  |  2 +-
 .../org/apache/inlong/manager/pojo/node/DataNodeInfo.java|  4 ++--
 .../manager/pojo/operationLog/OperationLogResponse.java  |  2 +-
 .../java/org/apache/inlong/manager/pojo/sink/StreamSink.java |  4 ++--
 .../org/apache/inlong/manager/pojo/source/StreamSource.java  |  4 ++--
 .../inlong/manager/pojo/stream/InlongStreamBriefInfo.java|  4 ++--
 .../apache/inlong/manager/pojo/stream/InlongStreamInfo.java  |  4 ++--
 .../apache/inlong/manager/pojo/tenant/InlongTenantInfo.java  |  4 ++--
 .../org/apache/inlong/manager/pojo/user/InlongRoleInfo.java  |  4 ++--
 .../org/apache/inlong/manager/pojo/user/TenantRoleInfo.java  |  4 ++--
 .../java/org/apache/inlong/manager/pojo/user/UserInfo.java   |  6 +++---
 .../inlong/manager/pojo/workflow/ApproverResponse.java   |  4 ++--
 34 files changed, 61 insertions(+), 70 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java
index 232a3fd327..c5f7b64f52 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java
@@ -42,7 +42,7 @@ public class TaskSnapshotRequest {
 /**
  * Report Time
  */
-@JsonFormat(pattern = "-MM-dd HH:mm:ss")
+@JsonFormat(pattern = "-MM-dd'T'HH:mm:ss.SSSZ", timezone = "GMT+8")
 private Date reportTime;
 
 /**
diff --git 
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
index 3503afb0c4..3ab6954dc4 100644
--- 
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
+++ 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
@@ -115,8 +115,8 @@ class Kafka2HiveTest extends BaseTest {
 + 
"\"dailyStorage\":1,\"peakRecords\":10,\"maxLength\":1,"
 + 
"\"inCharges\":\"test_inCharges,admin\",\"followers\":null,\"status\":101,"
 + 
"\"creator\":\"admin\",\"modifier\":\"admin\","
-+ "\"createTime\":\"2022-06-06 
09:59:10\","
-+ "\"modifyTime\":\"2022-06-06 
02:24:50\",\"extList\":[],\"tenant\":null,"
++ 
"\"createTime\":\"2023-11-29T18:23:36.000+0800\","
+ 

Re: [PR] [INLONG-9364][Agent] Remove expired instance from db [inlong]

2023-11-29 Thread via GitHub


luchunliang merged PR #9365:
URL: https://github.com/apache/inlong/pull/9365


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-9364][Agent] Remove expired instance from db (#9365)

2023-11-29 Thread luchunliang
This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new f62b2e87a1 [INLONG-9364][Agent] Remove expired instance from db (#9365)
f62b2e87a1 is described below

commit f62b2e87a12d5c7c2d75417715d3edb422230dab
Author: justinwwhuang 
AuthorDate: Thu Nov 30 11:16:23 2023 +0800

[INLONG-9364][Agent] Remove expired instance from db (#9365)
---
 .../apache/inlong/agent/utils/DateTransUtils.java  | 40 
 .../agent/core/instance/InstanceManager.java   | 74 ++
 .../inlong/agent/core/task/file/TaskManager.java   | 22 +--
 .../agent/core/instance/TestInstanceManager.java   |  6 +-
 .../agent/plugin/task/filecollect/FileScanner.java |  4 +-
 .../task/filecollect/LogFileCollectTask.java   |  6 +-
 .../agent/plugin/utils/file/NewDateUtils.java  | 44 +
 .../inlong/agent/plugin/utils/TestUtils.java   | 14 ++--
 .../apache/inlong/common/enums/TaskStateEnum.java  |  4 +-
 9 files changed, 139 insertions(+), 75 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
index 55182c7dd8..fe6257d64e 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
@@ -111,4 +111,44 @@ public class DateTransUtils {
 return retTime;
 }
 
+/**
+ * Calculate offset time based on offset
+ * The current offset will only be offset forward, or it can be offset 
backward to be compatible with the previous
+ * calculation method (subtraction).
+ * When it is offset backward, it returns negative;
+ * When offset forward, return positive
+ *
+ * @param timeOffset offset,such as -1d,-4h,-10m;
+ * @return
+ */
+public static long calcOffset(String timeOffset) {
+if (timeOffset.length() == 0) {
+return 0;
+}
+String offsetUnit = timeOffset.substring(timeOffset.length() - 1);
+int startIndex;
+int symbol;
+if (timeOffset.charAt(0) == '-') {
+symbol = -1;
+startIndex = 1;
+} else {
+symbol = 1;
+startIndex = 0;
+}
+
+String strOffset = timeOffset.substring(startIndex, 
timeOffset.length() - 1);
+if (strOffset.length() == 0) {
+return 0;
+}
+int offsetTime = Integer.parseInt(strOffset);
+if ("d".equalsIgnoreCase(offsetUnit)) {
+return offsetTime * 24 * 3600 * 1000 * symbol;
+} else if ("h".equalsIgnoreCase(offsetUnit)) {
+return offsetTime * 3600 * 1000 * symbol;
+} else if ("m".equalsIgnoreCase(offsetUnit)) {
+return offsetTime * 60 * 1000 * symbol;
+}
+return 0;
+}
+
 }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index a80ce8b53b..260e5a477f 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -21,16 +21,22 @@ import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.db.InstanceDb;
+import org.apache.inlong.agent.db.TaskProfileDb;
 import org.apache.inlong.agent.plugin.Instance;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.enums.InstanceStateEnum;
+import org.apache.inlong.common.enums.TaskStateEnum;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -38,6 +44,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * handle the instance created by task, including add, delete, update etc.
@@ -47,11 +54,14 @@ public class InstanceManager extends AbstractDa

[PR] [INLONG-9366][Agent] Remove useless offset record [inlong]

2023-11-29 Thread via GitHub


justinwwhuang opened a new pull request, #9367:
URL: https://github.com/apache/inlong/pull/9367

   [INLONG-9366][Agent] Remove useless offset record
   - Fixes #9366 
   
   ### Motivation
   
   Remove useless offset record
   ### Modifications
   Remove useless offset record
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-9366][Agent] Remove useless offset record [inlong]

2023-11-29 Thread via GitHub


dockerzhang merged PR #9367:
URL: https://github.com/apache/inlong/pull/9367


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-9366][Agent] Remove useless offset record (#9367)

2023-11-29 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new e87f3605d4 [INLONG-9366][Agent] Remove useless offset record (#9367)
e87f3605d4 is described below

commit e87f3605d485b14a4c548de3afcb1c375650530d
Author: justinwwhuang 
AuthorDate: Thu Nov 30 14:05:42 2023 +0800

[INLONG-9366][Agent] Remove useless offset record (#9367)
---
 .../java/org/apache/inlong/agent/db/OffsetDb.java  | 20 +--
 .../org/apache/inlong/agent/core/AgentManager.java |  2 -
 .../agent/core/instance/InstanceManager.java   |  2 +-
 .../inlong/agent/core/task/OffsetManager.java  | 64 --
 .../inlong/agent/core/task/file/TaskManager.java   |  3 +
 .../inlong/agent/plugin/instance/FileInstance.java |  2 +-
 .../agent/plugin/sinks/filecollect/ProxySink.java  |  2 +-
 .../inlong/agent/plugin/sources/LogFileSource.java |  1 -
 .../agent/plugin/sources/TestLogFileSource.java| 12 
 9 files changed, 90 insertions(+), 18 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
index 5ceeb2e4ea..5c31a2f88a 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
@@ -17,9 +17,7 @@
 
 package org.apache.inlong.agent.db;
 
-import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.OffsetProfile;
-import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.utils.AgentUtils;
@@ -27,6 +25,9 @@ import org.apache.inlong.agent.utils.AgentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * db interface for task profile.
  */
@@ -34,11 +35,9 @@ public class OffsetDb {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(OffsetDb.class);
 private final Db db;
-private final AgentConfiguration agentConf;
 
-public OffsetDb() {
-agentConf = AgentConfiguration.getAgentConf();
-db = initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
+public OffsetDb(Db db) {
+this.db = db;
 }
 
 /**
@@ -54,6 +53,15 @@ public class OffsetDb {
 }
 }
 
+public List listAllOffsets() {
+List result = this.db.findAll("");
+List offsetList = new ArrayList<>();
+for (KeyValueEntity entity : result) {
+offsetList.add(entity.getAsOffsetProfile());
+}
+return offsetList;
+}
+
 public OffsetProfile getOffset(String taskId, String instanceId) {
 KeyValueEntity result = db.get(getKey(taskId, instanceId));
 if (result == null) {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 5c28e172b6..29c000dcc4 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -21,7 +21,6 @@ import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.ProfileFetcher;
 import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.core.task.file.TaskManager;
 
 import org.slf4j.Logger;
@@ -48,7 +47,6 @@ public class AgentManager extends AbstractDaemon {
 public AgentManager() {
 conf = AgentConfiguration.getAgentConf();
 agentConfMonitor = Executors.newSingleThreadExecutor();
-OffsetManager.init();
 taskManager = new TaskManager();
 fetcher = initFetcher(this);
 heartbeatManager = HeartbeatManager.getInstance(this);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 260e5a477f..9f5773d587 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -58,7 +58,7 @@ public class InstanceManager extends AbstractDaemon {
 public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
 public static final int INSTANCE_DB_CLEAN_INTERVAL_MS = 1;
 priva

Re: [PR] [INLONG-9362][Manager] Iceberg support config all migrate [inlong]

2023-11-29 Thread via GitHub


dockerzhang merged PR #9363:
URL: https://github.com/apache/inlong/pull/9363


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-9362][Manager] Iceberg support config all migrate (#9363)

2023-11-29 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new ffb627a01c [INLONG-9362][Manager] Iceberg support  config all migrate 
(#9363)
ffb627a01c is described below

commit ffb627a01ca5da9ea4e58a1d75d7a6b84bf1d2fd
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Thu Nov 30 15:30:07 2023 +0800

[INLONG-9362][Manager] Iceberg support  config all migrate (#9363)
---
 .../manager/pojo/sink/iceberg/IcebergSink.java | 15 
 .../manager/pojo/sink/iceberg/IcebergSinkDTO.java  | 18 ++
 .../pojo/sink/iceberg/IcebergSinkRequest.java  | 15 
 .../inlong/manager/pojo/sort/node/NodeFactory.java |  4 +++
 .../pojo/sort/node/base/ExtractNodeProvider.java   |  4 +++
 .../pojo/sort/node/base/LoadNodeProvider.java  | 13 +++
 .../pojo/sort/node/provider/IcebergProvider.java   | 41 --
 .../manager/pojo/stream/InlongStreamBriefInfo.java |  3 ++
 .../manager/pojo/stream/InlongStreamExtParam.java  |  3 ++
 .../manager/pojo/stream/InlongStreamInfo.java  |  3 ++
 .../manager/pojo/stream/InlongStreamRequest.java   |  3 ++
 .../main/resources/h2/apache_inlong_manager.sql|  3 ++
 .../manager-web/sql/apache_inlong_manager.sql  |  3 ++
 inlong-manager/manager-web/sql/changes-1.10.0.sql  |  4 +--
 14 files changed, 128 insertions(+), 4 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
index 19b988b79d..5fc2afc586 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
@@ -75,6 +75,21 @@ public class IcebergSink extends StreamSink {
 @ApiModelProperty("append mode, UPSERT or APPEND")
 private String appendMode;
 
+@ApiModelProperty("The multiple enable of sink")
+private Boolean sinkMultipleEnable = false;
+
+@ApiModelProperty("The multiple format of sink")
+private String sinkMultipleFormat;
+
+@ApiModelProperty("database pattern")
+private String databasePattern;
+
+@ApiModelProperty("table pattern")
+private String tablePattern;
+
+@ApiModelProperty("enable schema change")
+private Boolean enableSchemaChange = false;
+
 public IcebergSink() {
 this.setSinkType(SinkType.ICEBERG);
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
index 2c8dc3f7bf..379aaa0a75 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -68,6 +68,24 @@ public class IcebergSinkDTO {
 @ApiModelProperty("Partition type, like: H-hour, D-day, W-week, M-month, 
O-once, R-regulation")
 private String partitionType;
 
+@ApiModelProperty("The multiple enable of sink")
+private Boolean sinkMultipleEnable = false;
+
+@ApiModelProperty("The multiple format of sink")
+private String sinkMultipleFormat;
+
+@ApiModelProperty("database pattern")
+private String databasePattern;
+
+@ApiModelProperty("table pattern")
+private String tablePattern;
+
+@ApiModelProperty("append mode, UPSERT or APPEND")
+private String appendMode;
+
+@ApiModelProperty("enable schema change")
+private Boolean enableSchemaChange = false;
+
 @ApiModelProperty("Primary key")
 private String primaryKey;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
index aa3c606b3f..68bb94cc68 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -70,4 +70,19 @@ public class IcebergSinkRequest extends SinkRequest {
 @Pattern(regexp = "(?i)(UPSERT|APPEND)", message = "Invalid append mode")
 private String appendMode;
 
+@ApiModelProperty("The multiple enable of sink")
+private Boolean sinkMultipleEnable = false;
+
+@ApiModelProperty("The multiple format of sink")
+private String sinkMultipleFormat;
+
+@ApiModelProperty("database pattern")
+private String databasePattern;
+
+@ApiModel