[PR] [INLONG-10298][Agent] Delete useless code [inlong]
justinwwhuang opened a new pull request, #10299: URL: https://github.com/apache/inlong/pull/10299 Fixes #10298 ### Motivation The command related code is no longer useful ### Modifications Delete useless code ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation No doc needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10297][Sort] Fix mysql connector cannot submit to flink cluster [inlong]
EMsnap opened a new pull request, #10301: URL: https://github.com/apache/inlong/pull/10301 Fixes #10297 ### Motivation Fix 10297 ### Modifications 1、Move the initialization of audit operator to where the mysql reader initialized (on flink cluster) 2、Add mysql jdbc connector to fat jar 3、 Add metrics report when delete data ### Verifying this change Test on flink connector with audit service open Mysql: https://github.com/apache/inlong/assets/26538404/16f9c3d9-ed4e-4e59-a879-2a5aa6183a4f";> Starrocks: https://github.com/apache/inlong/assets/26538404/ad339ed2-8c2e-466f-a2f4-3bc714ff6fc5";> Audit: https://github.com/apache/inlong/assets/26538404/7e09494f-1a75-4b27-83c7-a41dd06fe918";> ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10292][SDK] Fix panic in connpool.UpdateEndpoints() of Golang… [inlong]
dockerzhang merged PR #10295: URL: https://github.com/apache/inlong/pull/10295 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-10292][SDK] Fix panic in connpool.UpdateEndpoints() of Golang SDK (#10295)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new c07316253e [INLONG-10292][SDK] Fix panic in connpool.UpdateEndpoints() of Golang SDK (#10295) c07316253e is described below commit c07316253e06a17d2dda20e26cc487d5691e58e2 Author: gunli <24350...@qq.com> AuthorDate: Wed May 29 16:23:43 2024 +0800 [INLONG-10292][SDK] Fix panic in connpool.UpdateEndpoints() of Golang SDK (#10295) Co-authored-by: gunli --- .../dataproxy-sdk-golang/connpool/connpool.go | 24 +- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go index ff3faa033e..259159af27 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go @@ -20,6 +20,7 @@ import ( "context" "errors" "math" + "runtime/debug" "sync" "time" @@ -181,7 +182,7 @@ func (p *connPool) Put(conn gnet.Conn, err error) { return case <-time.After(1 * time.Second): // connChan is full, close the new conn - newConn.Close() + _ = newConn.Close() return } } @@ -195,6 +196,13 @@ func (p *connPool) Put(conn gnet.Conn, err error) { } func (p *connPool) UpdateEndpoints(all, add, del []string) { + defer func() { + if rec := recover(); rec != nil { + p.log.Errorf("panic when update endpoints:", rec) + p.log.Error(string(debug.Stack())) + } + }() + if len(all) == 0 { return } @@ -226,7 +234,7 @@ func (p *connPool) UpdateEndpoints(all, add, del []string) { continue case <-time.After(1 * time.Second): // connChan is full, close the new conn - conn.Close() + _ = conn.Close() continue } } @@ -251,6 +259,12 @@ func (p *connPool) UpdateEndpoints(all, add, del []string) { break } + remoteAddr := conn.RemoteAddr() + if remoteAddr == nil { + CloseConn(conn, 0) + continue + } + addr := conn.RemoteAddr().String() _, ok = delEndpoints[addr] if ok { @@ -269,7 +283,7 @@ func (p *connPool) NumPooled() int { // CloseConn closes a connection after a duration of time func CloseConn(conn gnet.Conn, after time.Duration) { if after <= 0 { - conn.Close() + _ = conn.Close() return } @@ -277,10 +291,10 @@ func CloseConn(conn gnet.Conn, after time.Duration) { go func() { select { case <-time.After(after): - conn.Close() + _ = conn.Close() return case <-ctx.Done(): - conn.Close() + _ = conn.Close() return } }()
Re: [PR] [INLONG-10291][SDK] Fix incorrect initializing of gnet in Golang SDK [inlong]
dockerzhang merged PR #10294: URL: https://github.com/apache/inlong/pull/10294 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-10291][SDK] Fix incorrect initializing of gnet in Golang SDK (#10294)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new d43e1e84ec [INLONG-10291][SDK] Fix incorrect initializing of gnet in Golang SDK (#10294) d43e1e84ec is described below commit d43e1e84ec9e4d326fcc177479f725a5ad15ea68 Author: gunli <24350...@qq.com> AuthorDate: Wed May 29 16:24:10 2024 +0800 [INLONG-10291][SDK] Fix incorrect initializing of gnet in Golang SDK (#10294) Co-authored-by: gunli --- .../dataproxy-sdk-golang/dataproxy/client.go | 12 ++-- .../dataproxy-sdk-golang/go.mod| 28 .../dataproxy-sdk-golang/go.sum| 77 -- 3 files changed, 65 insertions(+), 52 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go index fab00c2949..73063d2e49 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go @@ -85,11 +85,7 @@ func NewClient(opts ...Option) (Client, error) { cli.Close() return nil, err } - err = cli.netClient.Start() - if err != nil { - cli.Close() - return nil, err - } + return cli, nil } @@ -143,6 +139,12 @@ func (c *client) initNetClient() error { if err != nil { return err } + + err = netClient.Start() + if err != nil { + return err + } + // save net client c.netClient = netClient return nil diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod index 96ac6e6053..b22578bb0d 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod @@ -16,35 +16,37 @@ module github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang -go 1.19 +go 1.21 + +toolchain go1.21.4 require ( github.com/bwmarrin/snowflake v0.3.0 - github.com/go-resty/resty/v2 v2.11.0 + github.com/go-resty/resty/v2 v2.13.1 github.com/gofrs/uuid v4.4.0+incompatible github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c - github.com/panjf2000/gnet/v2 v2.3.4 - github.com/prometheus/client_golang v1.18.0 + github.com/panjf2000/gnet/v2 v2.5.2 + github.com/prometheus/client_golang v1.19.1 github.com/zentures/cityhash v0.0.0-20131128155616-cdd6a94144ab ) require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.46.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.53.0 // indirect + github.com/prometheus/procfs v0.15.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.23.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.18.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.20.0 // indirect google.golang.org/protobuf v1.34.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) require ( - github.com/klauspost/compress v1.17.5 + github.com/klauspost/compress v1.17.8 go.uber.org/atomic v1.11.0 - go.uber.org/zap v1.26.0 // indirect + go.uber.org/zap v1.27.0 // indirect ) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.sum b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.sum index f4eb64ea3a..834654ed7f 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.sum +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.sum @@ -2,35 +2,39 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2
[PR] [INLONG-10302][Agent] Increase the interface for limiting the number of instances obtained [inlong]
justinwwhuang opened a new pull request, #10303: URL: https://github.com/apache/inlong/pull/10303 Fixes #10302 ### Motivation The Task base class needs to add an interface that limits the number of instances obtained ### Modifications Increase the interface for limiting the number of instances obtained ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation No doc needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10300][Manager] Allow unsubmitted groups to modify mq type [inlong]
fuweng11 opened a new pull request, #10304: URL: https://github.com/apache/inlong/pull/10304 - Fixes #10300 ### Motivation Allow unsubmitted groups to modify mq type. ### Modifications Allow unsubmitted groups to modify mq type. When the group modifies the mq type, it is allowed to be modified if it is in a pending submission state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10305][Manager] Delete k8s related parameters in file collection [inlong]
fuweng11 opened a new pull request, #10307: URL: https://github.com/apache/inlong/pull/10307 - Fixes #10305 ### Motivation Delete k8s related parameters in file collection. Because these parameters are no longer in use. ### Modifications Delete k8s related parameters in file collection. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10297][Sort] Fix mysql connector cannot submit to flink cluster [inlong]
EMsnap merged PR #10301: URL: https://github.com/apache/inlong/pull/10301 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated (d43e1e84ec -> 4555ebfe1c)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from d43e1e84ec [INLONG-10291][SDK] Fix incorrect initializing of gnet in Golang SDK (#10294) add 4555ebfe1c [INLONG-10297][Sort] Fix mysql connector cannot submit to flink cluster (#10301) No new revisions were added by this update. Summary of changes: .../sort-connectors/mysql-cdc/pom.xml | 1 + .../apache/inlong/sort/mysql/MySqlTableSource.java | 5 +- .../mysql/RowDataDebeziumDeserializeSchema.java| 31 +++-- .../inlong/sort}/mysql/source/MySqlSource.java | 140 - .../sort}/mysql/source/MySqlSourceBuilder.java | 113 + licenses/inlong-sort-connectors/LICENSE| 2 + 6 files changed, 112 insertions(+), 180 deletions(-) copy inlong-sort/sort-flink/{sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc => sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort}/mysql/source/MySqlSource.java (62%) copy inlong-sort/sort-flink/{sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc => sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort}/mysql/source/MySqlSourceBuilder.java (74%)
[PR] [INLONG-10306][Audit] Compatible with scenarios where the Audit Tag is empty [inlong]
doleyzi opened a new pull request, #10308: URL: https://github.com/apache/inlong/pull/10308 - Fixes #10306 ### Motivation  Compatible with scenarios where the Audit Tag is empty ### Modifications Compatible with scenarios where the Audit Tag is empty -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10296][Sort] Connectors AuditOperator was not serialized [inlong]
XiaoYou201 opened a new pull request, #10309: URL: https://github.com/apache/inlong/pull/10309 [INLONG-10296][Sort] Connectors AuditOperator was not serialized Fixes #10296 ### Motivation Connectors AuditOperator was not serialized ### Modifications Some deserialization Code ### Verifying this change    -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10306][Audit] Compatible with scenarios where the Audit Tag is empty [inlong]
gosonzhang merged PR #10308: URL: https://github.com/apache/inlong/pull/10308 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-10306][Audit] Compatible with scenarios where the Audit Tag is empty (#10308)
This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new e5ad544716 [INLONG-10306][Audit] Compatible with scenarios where the Audit Tag is empty (#10308) e5ad544716 is described below commit e5ad544716d34fc1cdc4997de07711ad5f805cef Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Wed May 29 19:44:25 2024 +0800 [INLONG-10306][Audit] Compatible with scenarios where the Audit Tag is empty (#10308) * Compatible with scenarios where the Audit Tag is empty * update source.query.ids.sql --- .../org/apache/inlong/audit/AuditReporterImpl.java | 33 +++ .../apache/inlong/audit/AuditReporterImplTest.java | 48 ++ .../apache/inlong/audit/config/SqlConstants.java | 26 ++-- .../apache/inlong/audit/service/ApiService.java| 4 +- .../org/apache/inlong/audit/source/JdbcSource.java | 10 ++--- 5 files changed, 108 insertions(+), 13 deletions(-) diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java index 1c8672748d..da3a1144e5 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java @@ -522,6 +522,38 @@ public class AuditReporterImpl implements Serializable { return AuditManagerUtils.buildAuditId(baseAuditId, success, isRealtime, discard, retry); } +public int buildSuccessfulAuditId(AuditIdEnum baseAuditId) { +return buildAuditId(baseAuditId, true, true, false, false); +} + +public int buildSuccessfulAuditId(AuditIdEnum baseAuditId, boolean isRealtime) { +return buildAuditId(baseAuditId, true, isRealtime, false, false); +} + +public int buildFailedAuditId(AuditIdEnum baseAuditId) { +return buildAuditId(baseAuditId, false, true, false, false); +} + +public int buildFailedAuditId(AuditIdEnum baseAuditId, boolean isRealtime) { +return buildAuditId(baseAuditId, false, isRealtime, false, false); +} + +public int buildDiscardAuditId(AuditIdEnum baseAuditId) { +return buildAuditId(baseAuditId, true, true, true, false); +} + +public int buildDiscardAuditId(AuditIdEnum baseAuditId, boolean isRealtime) { +return buildAuditId(baseAuditId, true, isRealtime, true, false); +} + +public int buildRetryAuditId(AuditIdEnum baseAuditId) { +return buildAuditId(baseAuditId, true, true, false, true); +} + +public int buildRetryAuditId(AuditIdEnum baseAuditId, boolean isRealtime) { +return buildAuditId(baseAuditId, true, isRealtime, false, true); +} + public AuditInformation buildAuditInformation(String auditType, FlowType dataFlow, boolean success, @@ -530,6 +562,7 @@ public class AuditReporterImpl implements Serializable { boolean retry) { return AuditManagerUtils.buildAuditInformation(auditType, dataFlow, success, isRealtime, discard, retry); } + public List getAllAuditInformation() { return AuditManagerUtils.getAllAuditInformation(); } diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditReporterImplTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditReporterImplTest.java new file mode 100644 index 00..e579466dc6 --- /dev/null +++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditReporterImplTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit; + +import org.junit.Test; + +import static org.apache.inlong.audit.AuditIdEnum.AGENT_INPUT; +import static org.apache.inlong.audit.AuditIdEnum.SORT_HIVE_INPUT; +import static org.junit.Assert.assertEquals; + +public class AuditReporterImplTest { + +@Test +public void TestBuildAuditId() { +int auditId = AuditOperator.getInstance()
[PR] Upgrade flink default version to 1.15 [inlong]
wallezhang opened a new pull request, #10310: URL: https://github.com/apache/inlong/pull/10310 Fixes #10284 ### Motivation ### Modifications ### Verifying this change *(Please pick either of the following options)* - [x] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10302][Agent] Add an interface for limiting the number of instances obtained [inlong]
dockerzhang merged PR #10303: URL: https://github.com/apache/inlong/pull/10303 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-10302][Agent] Add an interface for limiting the number of instances obtained (#10303)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 12efb53dae [INLONG-10302][Agent] Add an interface for limiting the number of instances obtained (#10303) 12efb53dae is described below commit 12efb53dae42dc38075746a5c062ea69dd6bb7fd Author: justinwwhuang AuthorDate: Wed May 29 19:59:01 2024 +0800 [INLONG-10302][Agent] Add an interface for limiting the number of instances obtained (#10303) --- .../main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java | 5 +++-- .../main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java| 6 ++ .../main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java | 6 ++ .../main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java | 6 ++ .../java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java | 5 + 5 files changed, 26 insertions(+), 2 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java index a6d8d03482..acf8287817 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java @@ -19,7 +19,6 @@ package org.apache.inlong.agent.plugin.task; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.core.instance.ActionType; import org.apache.inlong.agent.core.instance.InstanceAction; import org.apache.inlong.agent.core.instance.InstanceManager; @@ -58,7 +57,7 @@ public abstract class AbstractTask extends Task { this.taskProfile = taskProfile; this.basicDb = basicDb; auditVersion = Long.parseLong(taskProfile.get(TASK_AUDIT_VERSION)); -instanceManager = new InstanceManager(taskProfile.getTaskId(), taskProfile.getInt(TaskConstants.FILE_MAX_NUM), +instanceManager = new InstanceManager(taskProfile.getTaskId(), getInstanceLimit(), basicDb, taskManager.getTaskDb()); try { instanceManager.start(); @@ -69,6 +68,8 @@ public abstract class AbstractTask extends Task { initOK = true; } +protected abstract int getInstanceLimit(); + protected abstract void initTask(); protected void releaseTask() { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java index f83104e8a7..e7058c9929 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java @@ -36,10 +36,16 @@ public class KafkaTask extends AbstractTask { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTask.class); public static final String DEFAULT_KAFKA_INSTANCE = "org.apache.inlong.agent.plugin.instance.KafkaInstance"; +public static final int DEFAULT_INSTANCE_LIMIT = 1; private boolean isAdded = false; private String topic; private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("MMddHH"); +@Override +protected int getInstanceLimit() { +return DEFAULT_INSTANCE_LIMIT; +} + @Override protected void initTask() { LOGGER.info("kafka commonInit: {}", taskProfile.toJsonStr()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java index 2a022ddb78..01f97029b2 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java @@ -35,10 +35,16 @@ public class MongoDBTask extends AbstractTask { private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBTask.class); public static final String DEFAULT_MONGODB_INSTANCE = "org.apache.inlong.agent.plugin.instance.MongoDBInstance"; +public static final int DEFAULT_INSTANCE_LIMIT = 1; private boolean isAdded = false; private String collection; private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("MMddHH"); +@Override +protected int getInstanceLimit() { +return DEFAULT_INSTANCE_LIMIT; +} + @Override protected void initTask() { LOGGER.info("mongoDB commonInit: {}", taskProfile.t
Re: [PR] [INLONG-10300][Manager] Allow unsubmitted groups to modify mq type [inlong]
dockerzhang merged PR #10304: URL: https://github.com/apache/inlong/pull/10304 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-10300][Manager] Allow unsubmitted groups to modify mq type (#10304)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new b0a70bda57 [INLONG-10300][Manager] Allow unsubmitted groups to modify mq type (#10304) b0a70bda57 is described below commit b0a70bda57b7525a30664325500fc2aa56e902ad Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed May 29 20:06:38 2024 +0800 [INLONG-10300][Manager] Allow unsubmitted groups to modify mq type (#10304) --- .../apache/inlong/manager/service/group/InlongGroupServiceImpl.java | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index 49e55cfbd0..6bdda6fd45 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -714,8 +714,10 @@ public class InlongGroupServiceImpl implements InlongGroupService { private void chkUnmodifiableParams(InlongGroupEntity entity, InlongGroupRequest request) { // check mqType -Preconditions.expectEquals(entity.getMqType(), request.getMqType(), -ErrorCodeEnum.INVALID_PARAMETER, "mqType not allowed modify"); +Preconditions.expectTrue( +Objects.equals(entity.getMqType(), request.getMqType()) +|| Objects.equals(entity.getStatus(), GroupStatus.TO_BE_SUBMIT.getCode()), +"mqType not allowed modify"); // check record version Preconditions.expectEquals(entity.getVersion(), request.getVersion(), ErrorCodeEnum.CONFIG_EXPIRED,
Re: [PR] [INLONG-10298][Agent] Delete useless code [inlong]
justinwwhuang merged PR #10299: URL: https://github.com/apache/inlong/pull/10299 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-10298][Agent] Delete useless code (#10299)
This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 229174675b [INLONG-10298][Agent] Delete useless code (#10299) 229174675b is described below commit 229174675bf9b687f1fdd2605e670b20b6272944 Author: justinwwhuang AuthorDate: Wed May 29 20:17:13 2024 +0800 [INLONG-10298][Agent] Delete useless code (#10299) --- .../java/org/apache/inlong/agent/db/CommandDb.java | 61 .../main/java/org/apache/inlong/agent/db/Db.java | 75 +-- .../org/apache/inlong/agent/db/RocksDbImp.java | 104 + .../org/apache/inlong/agent/db/TestRocksDbImp.java | 20 +--- .../apache/inlong/agent/core/task/TaskManager.java | 4 + .../inlong/agent/plugin/task/TestLogFileTask.java | 5 +- 6 files changed, 10 insertions(+), 259 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java deleted file mode 100644 index 393b4eda8e..00 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.db; - -import org.apache.inlong.common.constant.Constants; -import org.apache.inlong.common.db.CommandEntity; - -import java.util.List; - -/** - * Command for database - */ -public class CommandDb { - -private final Db db; - -public CommandDb(Db db) { -this.db = db; -} - -/** - * store manager command to db - */ -public void storeCommand(CommandEntity commandEntity) { -db.putCommand(commandEntity); -} - -/** - * get those commands not ack to manager - */ -public List getUnackedCommands() { -return db.searchCommands(false); -} - -/** - * save special command result for trigger (retry\makeup\check) - */ -public void saveSpecialCmds(Integer id, Integer taskId, boolean success) { -CommandEntity entity = new CommandEntity(); -entity.setId(String.valueOf(id)); -entity.setTaskId(taskId); -entity.setAcked(false); -entity.setCommandResult(success ? Constants.RESULT_SUCCESS : Constants.RESULT_FAIL); -storeCommand(entity); -} -} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java index b1a0e897c9..8043c911fc 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java @@ -17,10 +17,6 @@ package org.apache.inlong.agent.db; -import org.apache.inlong.common.db.CommandEntity; - -import javax.management.openmbean.KeyAlreadyExistsException; - import java.io.Closeable; import java.util.List; @@ -29,35 +25,14 @@ import java.util.List; */ public interface Db extends Closeable { -abstract KeyValueEntity get(String key); - -/** - * get command by command id - */ -CommandEntity getCommand(String commandId); - -/** - * put command entity in db - */ -CommandEntity putCommand(CommandEntity entity); - -/** - * store keyValue, if key has exists, throw exception. - * - * @param entity key/value - * @throws NullPointerException key should not be null - * @throws KeyAlreadyExistsException key already exists - */ -void set(KeyValueEntity entity); +KeyValueEntity get(String key); /** * store keyValue, if key has exists, overwrite it. * * @param entity key/value - * @return null or old value which is overwritten. - * @throws NullPointerException key should not be null. */ -KeyValueEntity put(KeyValueEntity entity); +void put(KeyValueEntity entity); /** * remove keyValue by key. @@ -68,52 +43,6 @@ public interface Db extends Closeable { */ KeyValueEntity remove(String key); -
Re: [PR] [INLONG-10305][Manager] Delete k8s related parameters in file collection [inlong]
dockerzhang merged PR #10307: URL: https://github.com/apache/inlong/pull/10307 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-10305][Manager] Delete k8s related parameters in file collection (#10307)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 9d94ef72df [INLONG-10305][Manager] Delete k8s related parameters in file collection (#10307) 9d94ef72df is described below commit 9d94ef72df78b5a2aed3c2f560060c257ea27e90 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed May 29 20:52:51 2024 +0800 [INLONG-10305][Manager] Delete k8s related parameters in file collection (#10307) --- .../java/org/apache/inlong/manager/pojo/source/file/FileSource.java| 3 --- .../java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java | 3 --- .../org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java | 3 --- 3 files changed, 9 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java index de2dc20af8..4b0df6f416 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java @@ -95,9 +95,6 @@ public class FileSource extends StreamSource { @ApiModelProperty("End time") private Long endTime; -@ApiModelProperty("Metadata filters by label, special parameters for K8S") -private Map filterMetaByLabels; - public FileSource() { this.setSourceType(SourceType.FILE); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java index 98d2ac046c..ea78c9cbec 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java @@ -101,9 +101,6 @@ public class FileSourceDTO { @ApiModelProperty(value = "Audit version") private String auditVersion; -@ApiModelProperty("Metadata filters by label, special parameters for K8S") -private Map filterMetaByLabels; - public static FileSourceDTO getFromRequest(@NotNull FileSourceRequest fileSourceRequest, String extParams) { FileSourceDTO dto = StringUtils.isNotBlank(extParams) ? FileSourceDTO.getFromJson(extParams) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java index 1e5889ceb8..2ba1a1f7ff 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java @@ -90,9 +90,6 @@ public class FileSourceRequest extends SourceRequest { @ApiModelProperty("End time") private Long endTime; -@ApiModelProperty("Metadata filters by label, special parameters for K8S") -private Map filterMetaByLabels; - public FileSourceRequest() { this.setSourceType(SourceType.FILE); this.setSerializationType(DataFormat.CSV.getName());
Re: [PR] [INLONG-10296][Sort] Connectors AuditOperator was not serialized [inlong]
EMsnap commented on PR #10309: URL: https://github.com/apache/inlong/pull/10309#issuecomment-2138557964 Please add licenses -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10313][DataProxy] Replace audit ID macro with audit API [inlong]
gosonzhang opened a new pull request, #10315: URL: https://github.com/apache/inlong/pull/10315 Fixes #10313 Replace public static final int AUDIT_ID_DATAPROXY_READ_SUCCESS = 5; public static final int AUDIT_ID_DATAPROXY_SEND_SUCCESS = 6; with: buildSuccessfulAuditId() buildFailedAuditId() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-10314][DashBoard] Add an operation time to the operation log table [inlong]
wohainilaodou opened a new pull request, #10316: URL: https://github.com/apache/inlong/pull/10316 Fixes #10314 ### Motivation Add an operation time to the operation log table ### Modifications Added an operation time column to the operation log table ### Verifying this change   -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-10313][DataProxy] Replace audit ID macro with audit API [inlong]
justinwwhuang commented on code in PR #10315: URL: https://github.com/apache/inlong/pull/10315#discussion_r1619808506 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java: ## @@ -59,12 +57,32 @@ public static void initAudit() { } /** - * Add audit data + * Add input audit data + * + * @param eventevent to be counted */ -public static void add(int auditID, Event event) { -if (!CommonConfigHolder.getInstance().isEnableAudit() || event == null) { +public static void addInputSuccess(Event event) { +if (event == null || !CommonConfigHolder.getInstance().isEnableAudit()) { return; } +addAuditData(event, + AuditOperator.getInstance().buildSuccessfulAuditId(AuditIdEnum.DATA_PROXY_INPUT)); Review Comment: It is recommended to calculate in advance and not to do it every time it is called -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org