Re: [I] [Feature][Manager]Support Tube MQ sink [inlong]
vernedeng commented on issue #9303: URL: https://github.com/apache/inlong/issues/9303#issuecomment-1831443725 This issue will be done in the following version, but now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9354][Dashboard] Data access File sources optimization [inlong]
leezng commented on code in PR #9356: URL: https://github.com/apache/inlong/pull/9356#discussion_r1409061918 ## inlong-dashboard/src/plugins/sources/defaults/File.ts: ## @@ -28,6 +28,112 @@ const { I18n } = DataWithBackend; const { FieldDecorator } = RenderRow; const { ColumnDecorator } = RenderList; +const timeZoneList = [ Review Comment: I would suggest using an array, for example `[-12,13].map(...)` loop to generate the data. However, it won't have much impact. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9355][SDK] Optimize resource isolation for CPP SDK [inlong]
dockerzhang merged PR #9357: URL: https://github.com/apache/inlong/pull/9357 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9355][SDK] Optimize resource isolation for CPP SDK (#9357)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 7178b39a3f [INLONG-9355][SDK] Optimize resource isolation for CPP SDK (#9357) 7178b39a3f is described below commit 7178b39a3fd30d1af9a26d5259f72b727167c519 Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Wed Nov 29 18:21:10 2023 +0800 [INLONG-9355][SDK] Optimize resource isolation for CPP SDK (#9357) --- .../dataproxy-sdk-cpp/release/demo/send_demo.cc| 2 +- .../dataproxy-sdk-cpp/release/inc/sdk_conf.h | 3 +- .../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 19 +- .../dataproxy-sdk-cpp/src/group/recv_group.cc | 29 +--- .../dataproxy-sdk-cpp/src/group/recv_group.h | 1 + .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 40 +++--- .../dataproxy-sdk-cpp/src/manager/proxy_manager.h | 3 +- .../dataproxy-sdk-cpp/src/manager/send_manager.cc | 24 ++--- .../dataproxy-sdk-cpp/src/manager/send_manager.h | 1 + .../dataproxy-sdk-cpp/src/utils/capi_constant.h| 3 ++ 10 files changed, 76 insertions(+), 49 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc index a4a783e0dc..b09f8463f1 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc @@ -53,7 +53,7 @@ int main(int argc, char const *argv[]) { cout << ">start sdk successfully" << endl; int count = 1000; - string inlong_group_id = "test_cpp_sdk_20230404"; + string inlong_group_id = "test_cpp_sdk"; string inlong_stream_id = "stream1"; if (4 == argc) { inlong_group_id = argv[2]; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h index 578278c5fd..6d7b23dc21 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h @@ -83,7 +83,7 @@ private: uint64_t max_proxy_num_; uint64_t reserve_proxy_num_; uint32_t msg_type_; - bool enable_isolation_; + uint32_t isolation_level_; // Network parameters bool enable_tcp_nagle_; @@ -92,6 +92,7 @@ private: bool enable_balance_; bool enable_local_cache_; + // auth settings bool need_auth_; std::string auth_id_; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc index e9d679cd8c..319262adc2 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc @@ -92,6 +92,7 @@ void SdkConfig::defaultInit() { load_balance_interval_ = constants::kLoadBalanceInterval; heart_beat_interval_ = constants::kHeartBeatInterval; enable_balance_ = constants::kEnableBalance; + isolation_level_=constants::IsolationLevel::kLevelSecond; // cache parameter send_buf_size_ = constants::kSendBufSize; @@ -120,7 +121,6 @@ void SdkConfig::defaultInit() { manager_update_interval_ = constants::kManagerUpdateInterval; manager_url_timeout_ = constants::kManagerTimeout; max_proxy_num_ = constants::kMaxProxyNum; - enable_isolation_ = constants::kEnableIsolation; reserve_proxy_num_ = constants::kReserveProxyNum; enable_local_cache_ = constants::kEnableLocalCache; @@ -360,13 +360,6 @@ void SdkConfig::InitManagerParam(const rapidjson::Value &doc) { std::string inlong_group_ids_str = obj.GetString(); Utils::splitOperate(inlong_group_ids_str, inlong_group_ids_, ","); } - // enable isolation - if (doc.HasMember("enable_isolation") && doc["enable_isolation"].IsBool()) { -const rapidjson::Value &obj = doc["enable_isolation"]; -enable_isolation_ = obj.GetBool(); - } else { -enable_isolation_ = constants::kEnableIsolation; - } // enable local cache if (doc.HasMember("enable_local_cache") && doc["enable_local_cache"].IsBool()) { @@ -375,6 +368,14 @@ void SdkConfig::InitManagerParam(const rapidjson::Value &doc) { } else { enable_local_cache_ = constants::kEnableLocalCache; } + + // isolation level + if (doc.HasMember("isolation_level") && doc["isolation_level"].IsInt() && doc["isolation_level"].GetInt() > 0) { +const rapidjson::Value &obj = doc["isolation_level"]; +isolation_level_ = obj.GetInt(); + } else { +isolation_level_ = constants::IsolationLevel::kLevelSecond; + } } void SdkConfig::InitTcpParam(const rapidjson::Value &doc) { @@ -564,7 +565,7 @@ void SdkConfig::ShowClientConfig() { LO
Re: [PR] [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error [inlong]
EMsnap merged PR #9361: URL: https://github.com/apache/inlong/pull/9361 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error (#9361)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new ff3cf5110a [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error (#9361) ff3cf5110a is described below commit ff3cf5110a9eb9d4149ab6d44604a345e5e67dc1 Author: Sting AuthorDate: Wed Nov 29 18:59:57 2023 +0800 [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error (#9361) --- .../sort/protocol/node/load/IcebergLoadNode.java | 5 - .../java/org/apache/inlong/sort/base/Constants.java | 2 +- .../sort/iceberg/FlinkDynamicTableFactory.java | 2 ++ .../sink/multiple/DynamicSchemaHandleOperator.java | 20 +--- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java index a418fd930b..9e3fb9e8dd 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java @@ -30,7 +30,6 @@ import org.apache.inlong.sort.protocol.node.LoadNode; import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.transformation.FieldRelation; import org.apache.inlong.sort.protocol.transformation.FilterFunction; -import org.apache.inlong.sort.util.SchemaChangeUtils; import com.google.common.base.Preconditions; import lombok.Data; @@ -54,7 +53,6 @@ import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIP import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_ENABLE; import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_FORMAT; import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_TABLE_PATTERN; -import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_SCHEMA_CHANGE_POLICIES; @JsonTypeName("icebergLoad") @Data @@ -177,8 +175,6 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata, public Map tableOptions() { Map options = super.tableOptions(); options.put(IcebergConstant.CONNECTOR_KEY, IcebergConstant.CONNECTOR); -// for test sink.ignore.changelog -// options.put("sink.ignore.changelog", "true"); options.put(IcebergConstant.DATABASE_KEY, dbName); options.put(IcebergConstant.TABLE_KEY, tableName); options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName); @@ -197,7 +193,6 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata, options.put(SINK_MULTIPLE_FORMAT, Objects.requireNonNull(sinkMultipleFormat).identifier()); options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern); options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern); -options.put(SINK_SCHEMA_CHANGE_POLICIES, SchemaChangeUtils.serialize(policyMap)); } else { options.put(SINK_MULTIPLE_ENABLE, "false"); } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java index ac7baac4dd..ba47c74356 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -419,7 +419,7 @@ public final class Constants { public static final ConfigOption SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT = ConfigOptions.key("sink.multiple.auto-create-table-when-snapshot") .booleanType() -.defaultValue(false) +.defaultValue(true) .withDescription("Whether supporting auto create table when snapshot, default value is 'false'"); public static final ConfigOption INNER_FORMAT = diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java index bb33e896bd..9a6056e614 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java @@ -57,6 +57,7 @@ import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; import
[PR] [INLONG-9364][Agent] Remove expired instance from db [inlong]
justinwwhuang opened a new pull request, #9365: URL: https://github.com/apache/inlong/pull/9365 [INLONG-9364][Agent] Remove expired instance from db - Fixes #9364 ### Motivation Remove expired instance from db ### Modifications Remove expired instance from db ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-8358][Sort] Add kafka connector on flink 1.15 [inlong]
hnrainll commented on PR #8713: URL: https://github.com/apache/inlong/pull/8713#issuecomment-1832036602 > Sorry but there should be UT for this feature, any problem with implementing it ? No problem, I'm in the process of implementing UT's functionality. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9362][Manager] Iceberg support config all migrate [inlong]
EMsnap commented on code in PR #9363: URL: https://github.com/apache/inlong/pull/9363#discussion_r1410074229 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java: ## @@ -128,4 +137,32 @@ public List getMetaFields() { fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(), MetaField.AUDIT_DATA_TIME)); return fieldInfos; } + +@Override +public Boolean isSinkMultiple(StreamNode nodeInfo) { +IcebergSink icebergSink = (IcebergSink) nodeInfo; +return icebergSink.getSinkMultipleEnable(); +} + +@Override +public List addStreamFieldsForSinkMultiple(List streamFields) { +if (CollectionUtils.isEmpty(streamFields)) { +streamFields = new ArrayList<>(); +} +streamFields.add(0, +new StreamField(0, "varbinary", MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal", null, 1, +MetaField.DATA_BYTES_CANAL.name())); +return streamFields; +} + +@Override +public List addSinkFieldsForSinkMultiple(List sinkFields) { +if (CollectionUtils.isEmpty(sinkFields)) { +sinkFields = new ArrayList<>(); +} +sinkFields.add(0, new SinkField(0, "varbinary", MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal", Review Comment: sink should be normal field with bytes type, not meta field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9362][Manager] Iceberg support config all migrate [inlong]
EMsnap commented on code in PR #9363: URL: https://github.com/apache/inlong/pull/9363#discussion_r1410074229 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java: ## @@ -128,4 +137,32 @@ public List getMetaFields() { fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(), MetaField.AUDIT_DATA_TIME)); return fieldInfos; } + +@Override +public Boolean isSinkMultiple(StreamNode nodeInfo) { +IcebergSink icebergSink = (IcebergSink) nodeInfo; +return icebergSink.getSinkMultipleEnable(); +} + +@Override +public List addStreamFieldsForSinkMultiple(List streamFields) { +if (CollectionUtils.isEmpty(streamFields)) { +streamFields = new ArrayList<>(); +} +streamFields.add(0, +new StreamField(0, "varbinary", MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal", null, 1, +MetaField.DATA_BYTES_CANAL.name())); +return streamFields; +} + +@Override +public List addSinkFieldsForSinkMultiple(List sinkFields) { +if (CollectionUtils.isEmpty(sinkFields)) { +sinkFields = new ArrayList<>(); +} +sinkFields.add(0, new SinkField(0, "varbinary", MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal", Review Comment: sink should be normal field with bytes type, not meta field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9358][Manager] Supplement time zone information for creation time [inlong]
dockerzhang merged PR #9360: URL: https://github.com/apache/inlong/pull/9360 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9358][Manager] Supplement time zone information for creation time (#9360)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new b34c451cde [INLONG-9358][Manager] Supplement time zone information for creation time (#9360) b34c451cde is described below commit b34c451cde15046727cd618a7005b1d369eeacd9 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Thu Nov 30 10:45:44 2023 +0800 [INLONG-9358][Manager] Supplement time zone information for creation time (#9360) --- .../apache/inlong/common/pojo/agent/TaskSnapshotRequest.java | 2 +- .../org/apache/inlong/manager/client/ut/Kafka2HiveTest.java | 12 +--- .../org/apache/inlong/manager/client/cli/pojo/GroupInfo.java | 2 +- .../org/apache/inlong/manager/client/cli/pojo/SinkInfo.java | 2 +- .../apache/inlong/manager/client/cli/pojo/SourceInfo.java| 2 +- .../apache/inlong/manager/client/cli/pojo/StreamInfo.java| 2 +- .../apache/inlong/manager/client/cli/pojo/TenantInfo.java| 4 ++-- .../inlong/manager/client/cli/pojo/TenantRoleInfo.java | 4 ++-- .../org/apache/inlong/manager/client/cli/pojo/UserInfo.java | 2 +- .../inlong/manager/client/api/impl/InlongGroupImplTest.java | 6 +++--- .../inlong/manager/client/api/inner/ClientFactoryTest.java | 7 --- .../inlong/manager/pojo/audit/AuditSourceResponse.java | 4 ++-- .../org/apache/inlong/manager/pojo/cluster/ClusterInfo.java | 4 ++-- .../inlong/manager/pojo/cluster/ClusterNodeResponse.java | 4 ++-- .../inlong/manager/pojo/cluster/ClusterTagResponse.java | 4 ++-- .../inlong/manager/pojo/cluster/TenantClusterTagInfo.java| 4 ++-- .../inlong/manager/pojo/consume/InlongConsumeBriefInfo.java | 4 ++-- .../inlong/manager/pojo/consume/InlongConsumeInfo.java | 4 ++-- .../inlong/manager/pojo/group/InlongGroupBriefInfo.java | 4 ++-- .../apache/inlong/manager/pojo/group/InlongGroupInfo.java| 4 ++-- .../manager/pojo/heartbeat/ComponentHeartbeatResponse.java | 2 +- .../manager/pojo/heartbeat/GroupHeartbeatResponse.java | 2 +- .../manager/pojo/heartbeat/StreamHeartbeatResponse.java | 2 +- .../org/apache/inlong/manager/pojo/node/DataNodeInfo.java| 4 ++-- .../manager/pojo/operationLog/OperationLogResponse.java | 2 +- .../java/org/apache/inlong/manager/pojo/sink/StreamSink.java | 4 ++-- .../org/apache/inlong/manager/pojo/source/StreamSource.java | 4 ++-- .../inlong/manager/pojo/stream/InlongStreamBriefInfo.java| 4 ++-- .../apache/inlong/manager/pojo/stream/InlongStreamInfo.java | 4 ++-- .../apache/inlong/manager/pojo/tenant/InlongTenantInfo.java | 4 ++-- .../org/apache/inlong/manager/pojo/user/InlongRoleInfo.java | 4 ++-- .../org/apache/inlong/manager/pojo/user/TenantRoleInfo.java | 4 ++-- .../java/org/apache/inlong/manager/pojo/user/UserInfo.java | 6 +++--- .../inlong/manager/pojo/workflow/ApproverResponse.java | 4 ++-- 34 files changed, 61 insertions(+), 70 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java index 232a3fd327..c5f7b64f52 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java @@ -42,7 +42,7 @@ public class TaskSnapshotRequest { /** * Report Time */ -@JsonFormat(pattern = "-MM-dd HH:mm:ss") +@JsonFormat(pattern = "-MM-dd'T'HH:mm:ss.SSSZ", timezone = "GMT+8") private Date reportTime; /** diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java index 3503afb0c4..3ab6954dc4 100644 --- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java +++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java @@ -115,8 +115,8 @@ class Kafka2HiveTest extends BaseTest { + "\"dailyStorage\":1,\"peakRecords\":10,\"maxLength\":1," + "\"inCharges\":\"test_inCharges,admin\",\"followers\":null,\"status\":101," + "\"creator\":\"admin\",\"modifier\":\"admin\"," -+ "\"createTime\":\"2022-06-06 09:59:10\"," -+ "\"modifyTime\":\"2022-06-06 02:24:50\",\"extList\":[],\"tenant\":null," ++ "\"createTime\":\"2023-11-29T18:23:36.000+0800\"," +
Re: [PR] [INLONG-9364][Agent] Remove expired instance from db [inlong]
luchunliang merged PR #9365: URL: https://github.com/apache/inlong/pull/9365 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9364][Agent] Remove expired instance from db (#9365)
This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new f62b2e87a1 [INLONG-9364][Agent] Remove expired instance from db (#9365) f62b2e87a1 is described below commit f62b2e87a12d5c7c2d75417715d3edb422230dab Author: justinwwhuang AuthorDate: Thu Nov 30 11:16:23 2023 +0800 [INLONG-9364][Agent] Remove expired instance from db (#9365) --- .../apache/inlong/agent/utils/DateTransUtils.java | 40 .../agent/core/instance/InstanceManager.java | 74 ++ .../inlong/agent/core/task/file/TaskManager.java | 22 +-- .../agent/core/instance/TestInstanceManager.java | 6 +- .../agent/plugin/task/filecollect/FileScanner.java | 4 +- .../task/filecollect/LogFileCollectTask.java | 6 +- .../agent/plugin/utils/file/NewDateUtils.java | 44 + .../inlong/agent/plugin/utils/TestUtils.java | 14 ++-- .../apache/inlong/common/enums/TaskStateEnum.java | 4 +- 9 files changed, 139 insertions(+), 75 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java index 55182c7dd8..fe6257d64e 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java @@ -111,4 +111,44 @@ public class DateTransUtils { return retTime; } +/** + * Calculate offset time based on offset + * The current offset will only be offset forward, or it can be offset backward to be compatible with the previous + * calculation method (subtraction). + * When it is offset backward, it returns negative; + * When offset forward, return positive + * + * @param timeOffset offset,such as -1d,-4h,-10m; + * @return + */ +public static long calcOffset(String timeOffset) { +if (timeOffset.length() == 0) { +return 0; +} +String offsetUnit = timeOffset.substring(timeOffset.length() - 1); +int startIndex; +int symbol; +if (timeOffset.charAt(0) == '-') { +symbol = -1; +startIndex = 1; +} else { +symbol = 1; +startIndex = 0; +} + +String strOffset = timeOffset.substring(startIndex, timeOffset.length() - 1); +if (strOffset.length() == 0) { +return 0; +} +int offsetTime = Integer.parseInt(strOffset); +if ("d".equalsIgnoreCase(offsetUnit)) { +return offsetTime * 24 * 3600 * 1000 * symbol; +} else if ("h".equalsIgnoreCase(offsetUnit)) { +return offsetTime * 3600 * 1000 * symbol; +} else if ("m".equalsIgnoreCase(offsetUnit)) { +return offsetTime * 60 * 1000 * symbol; +} +return 0; +} + } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index a80ce8b53b..260e5a477f 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -21,16 +21,22 @@ import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.db.InstanceDb; +import org.apache.inlong.agent.db.TaskProfileDb; import org.apache.inlong.agent.plugin.Instance; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.enums.InstanceStateEnum; +import org.apache.inlong.common.enums.TaskStateEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -38,6 +44,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * handle the instance created by task, including add, delete, update etc. @@ -47,11 +54,14 @@ public class InstanceManager extends AbstractDa
[PR] [INLONG-9366][Agent] Remove useless offset record [inlong]
justinwwhuang opened a new pull request, #9367: URL: https://github.com/apache/inlong/pull/9367 [INLONG-9366][Agent] Remove useless offset record - Fixes #9366 ### Motivation Remove useless offset record ### Modifications Remove useless offset record ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9366][Agent] Remove useless offset record [inlong]
dockerzhang merged PR #9367: URL: https://github.com/apache/inlong/pull/9367 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9366][Agent] Remove useless offset record (#9367)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new e87f3605d4 [INLONG-9366][Agent] Remove useless offset record (#9367) e87f3605d4 is described below commit e87f3605d485b14a4c548de3afcb1c375650530d Author: justinwwhuang AuthorDate: Thu Nov 30 14:05:42 2023 +0800 [INLONG-9366][Agent] Remove useless offset record (#9367) --- .../java/org/apache/inlong/agent/db/OffsetDb.java | 20 +-- .../org/apache/inlong/agent/core/AgentManager.java | 2 - .../agent/core/instance/InstanceManager.java | 2 +- .../inlong/agent/core/task/OffsetManager.java | 64 -- .../inlong/agent/core/task/file/TaskManager.java | 3 + .../inlong/agent/plugin/instance/FileInstance.java | 2 +- .../agent/plugin/sinks/filecollect/ProxySink.java | 2 +- .../inlong/agent/plugin/sources/LogFileSource.java | 1 - .../agent/plugin/sources/TestLogFileSource.java| 12 9 files changed, 90 insertions(+), 18 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java index 5ceeb2e4ea..5c31a2f88a 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java @@ -17,9 +17,7 @@ package org.apache.inlong.agent.db; -import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.OffsetProfile; -import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.constant.CommonConstants; import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.utils.AgentUtils; @@ -27,6 +25,9 @@ import org.apache.inlong.agent.utils.AgentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + /** * db interface for task profile. */ @@ -34,11 +35,9 @@ public class OffsetDb { private static final Logger LOGGER = LoggerFactory.getLogger(OffsetDb.class); private final Db db; -private final AgentConfiguration agentConf; -public OffsetDb() { -agentConf = AgentConfiguration.getAgentConf(); -db = initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET)); +public OffsetDb(Db db) { +this.db = db; } /** @@ -54,6 +53,15 @@ public class OffsetDb { } } +public List listAllOffsets() { +List result = this.db.findAll(""); +List offsetList = new ArrayList<>(); +for (KeyValueEntity entity : result) { +offsetList.add(entity.getAsOffsetProfile()); +} +return offsetList; +} + public OffsetProfile getOffset(String taskId, String instanceId) { KeyValueEntity result = db.get(getKey(taskId, instanceId)); if (result == null) { diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java index 5c28e172b6..29c000dcc4 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java @@ -21,7 +21,6 @@ import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.ProfileFetcher; import org.apache.inlong.agent.constant.AgentConstants; -import org.apache.inlong.agent.core.task.OffsetManager; import org.apache.inlong.agent.core.task.file.TaskManager; import org.slf4j.Logger; @@ -48,7 +47,6 @@ public class AgentManager extends AbstractDaemon { public AgentManager() { conf = AgentConfiguration.getAgentConf(); agentConfMonitor = Executors.newSingleThreadExecutor(); -OffsetManager.init(); taskManager = new TaskManager(); fetcher = initFetcher(this); heartbeatManager = HeartbeatManager.getInstance(this); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 260e5a477f..9f5773d587 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -58,7 +58,7 @@ public class InstanceManager extends AbstractDaemon { public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000; public static final int INSTANCE_DB_CLEAN_INTERVAL_MS = 1; priva
Re: [PR] [INLONG-9362][Manager] Iceberg support config all migrate [inlong]
dockerzhang merged PR #9363: URL: https://github.com/apache/inlong/pull/9363 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9362][Manager] Iceberg support config all migrate (#9363)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new ffb627a01c [INLONG-9362][Manager] Iceberg support config all migrate (#9363) ffb627a01c is described below commit ffb627a01ca5da9ea4e58a1d75d7a6b84bf1d2fd Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Thu Nov 30 15:30:07 2023 +0800 [INLONG-9362][Manager] Iceberg support config all migrate (#9363) --- .../manager/pojo/sink/iceberg/IcebergSink.java | 15 .../manager/pojo/sink/iceberg/IcebergSinkDTO.java | 18 ++ .../pojo/sink/iceberg/IcebergSinkRequest.java | 15 .../inlong/manager/pojo/sort/node/NodeFactory.java | 4 +++ .../pojo/sort/node/base/ExtractNodeProvider.java | 4 +++ .../pojo/sort/node/base/LoadNodeProvider.java | 13 +++ .../pojo/sort/node/provider/IcebergProvider.java | 41 -- .../manager/pojo/stream/InlongStreamBriefInfo.java | 3 ++ .../manager/pojo/stream/InlongStreamExtParam.java | 3 ++ .../manager/pojo/stream/InlongStreamInfo.java | 3 ++ .../manager/pojo/stream/InlongStreamRequest.java | 3 ++ .../main/resources/h2/apache_inlong_manager.sql| 3 ++ .../manager-web/sql/apache_inlong_manager.sql | 3 ++ inlong-manager/manager-web/sql/changes-1.10.0.sql | 4 +-- 14 files changed, 128 insertions(+), 4 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java index 19b988b79d..5fc2afc586 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java @@ -75,6 +75,21 @@ public class IcebergSink extends StreamSink { @ApiModelProperty("append mode, UPSERT or APPEND") private String appendMode; +@ApiModelProperty("The multiple enable of sink") +private Boolean sinkMultipleEnable = false; + +@ApiModelProperty("The multiple format of sink") +private String sinkMultipleFormat; + +@ApiModelProperty("database pattern") +private String databasePattern; + +@ApiModelProperty("table pattern") +private String tablePattern; + +@ApiModelProperty("enable schema change") +private Boolean enableSchemaChange = false; + public IcebergSink() { this.setSinkType(SinkType.ICEBERG); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java index 2c8dc3f7bf..379aaa0a75 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java @@ -68,6 +68,24 @@ public class IcebergSinkDTO { @ApiModelProperty("Partition type, like: H-hour, D-day, W-week, M-month, O-once, R-regulation") private String partitionType; +@ApiModelProperty("The multiple enable of sink") +private Boolean sinkMultipleEnable = false; + +@ApiModelProperty("The multiple format of sink") +private String sinkMultipleFormat; + +@ApiModelProperty("database pattern") +private String databasePattern; + +@ApiModelProperty("table pattern") +private String tablePattern; + +@ApiModelProperty("append mode, UPSERT or APPEND") +private String appendMode; + +@ApiModelProperty("enable schema change") +private Boolean enableSchemaChange = false; + @ApiModelProperty("Primary key") private String primaryKey; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java index aa3c606b3f..68bb94cc68 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java @@ -70,4 +70,19 @@ public class IcebergSinkRequest extends SinkRequest { @Pattern(regexp = "(?i)(UPSERT|APPEND)", message = "Invalid append mode") private String appendMode; +@ApiModelProperty("The multiple enable of sink") +private Boolean sinkMultipleEnable = false; + +@ApiModelProperty("The multiple format of sink") +private String sinkMultipleFormat; + +@ApiModelProperty("database pattern") +private String databasePattern; + +@ApiModel