This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e5ad544716 [INLONG-10306][Audit] Compatible with scenarios where the Audit Tag is empty (#10308) e5ad544716 is described below commit e5ad544716d34fc1cdc4997de07711ad5f805cef Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Wed May 29 19:44:25 2024 +0800 [INLONG-10306][Audit] Compatible with scenarios where the Audit Tag is empty (#10308) * Compatible with scenarios where the Audit Tag is empty * update source.query.ids.sql --- .../org/apache/inlong/audit/AuditReporterImpl.java | 33 +++++++++++++++ .../apache/inlong/audit/AuditReporterImplTest.java | 48 ++++++++++++++++++++++ .../apache/inlong/audit/config/SqlConstants.java | 26 ++++++++++-- .../apache/inlong/audit/service/ApiService.java | 4 +- .../org/apache/inlong/audit/source/JdbcSource.java | 10 ++--- 5 files changed, 108 insertions(+), 13 deletions(-) diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java index 1c8672748d..da3a1144e5 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java @@ -522,6 +522,38 @@ public class AuditReporterImpl implements Serializable { return AuditManagerUtils.buildAuditId(baseAuditId, success, isRealtime, discard, retry); } + public int buildSuccessfulAuditId(AuditIdEnum baseAuditId) { + return buildAuditId(baseAuditId, true, true, false, false); + } + + public int buildSuccessfulAuditId(AuditIdEnum baseAuditId, boolean isRealtime) { + return buildAuditId(baseAuditId, true, isRealtime, false, false); + } + + public int buildFailedAuditId(AuditIdEnum baseAuditId) { + return buildAuditId(baseAuditId, false, true, false, false); + } + + public int buildFailedAuditId(AuditIdEnum baseAuditId, boolean isRealtime) { + return buildAuditId(baseAuditId, false, isRealtime, false, false); + } + + public int buildDiscardAuditId(AuditIdEnum baseAuditId) { + return buildAuditId(baseAuditId, true, true, true, false); + } + + public int buildDiscardAuditId(AuditIdEnum baseAuditId, boolean isRealtime) { + return buildAuditId(baseAuditId, true, isRealtime, true, false); + } + + public int buildRetryAuditId(AuditIdEnum baseAuditId) { + return buildAuditId(baseAuditId, true, true, false, true); + } + + public int buildRetryAuditId(AuditIdEnum baseAuditId, boolean isRealtime) { + return buildAuditId(baseAuditId, true, isRealtime, false, true); + } + public AuditInformation buildAuditInformation(String auditType, FlowType dataFlow, boolean success, @@ -530,6 +562,7 @@ public class AuditReporterImpl implements Serializable { boolean retry) { return AuditManagerUtils.buildAuditInformation(auditType, dataFlow, success, isRealtime, discard, retry); } + public List<AuditInformation> getAllAuditInformation() { return AuditManagerUtils.getAllAuditInformation(); } diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditReporterImplTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditReporterImplTest.java new file mode 100644 index 0000000000..e579466dc6 --- /dev/null +++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditReporterImplTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit; + +import org.junit.Test; + +import static org.apache.inlong.audit.AuditIdEnum.AGENT_INPUT; +import static org.apache.inlong.audit.AuditIdEnum.SORT_HIVE_INPUT; +import static org.junit.Assert.assertEquals; + +public class AuditReporterImplTest { + + @Test + public void TestBuildAuditId() { + int auditId = AuditOperator.getInstance().buildSuccessfulAuditId(AGENT_INPUT); + assertEquals(3, auditId); + auditId = AuditOperator.getInstance().buildFailedAuditId(AGENT_INPUT); + assertEquals(524291, auditId); + auditId = AuditOperator.getInstance().buildRetryAuditId(AGENT_INPUT); + assertEquals(65539, auditId); + auditId = AuditOperator.getInstance().buildDiscardAuditId(AGENT_INPUT); + assertEquals(131075, auditId); + + auditId = AuditOperator.getInstance().buildSuccessfulAuditId(SORT_HIVE_INPUT, false); + assertEquals(262151, auditId); + auditId = AuditOperator.getInstance().buildFailedAuditId(SORT_HIVE_INPUT, false); + assertEquals(786439, auditId); + auditId = AuditOperator.getInstance().buildDiscardAuditId(SORT_HIVE_INPUT, false); + assertEquals(393223, auditId); + auditId = AuditOperator.getInstance().buildRetryAuditId(SORT_HIVE_INPUT, false); + assertEquals(327687, auditId); + } +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java index 884172d32a..f0eebca6de 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java @@ -46,7 +46,12 @@ public class SqlConstants { "\t\t, t_all_version.cnt, t_all_version.size, t_all_version.delay\n" + "\tFROM (\n" + "\t\tSELECT audit_version, log_ts, inlong_group_id, inlong_stream_id, audit_id\n" + - "\t\t\t, audit_tag, SUM(count) AS cnt, SUM(size) AS size\n" + + "\t\t\t, " + + "CASE \n" + + " WHEN audit_tag = '' THEN '-1'\n" + + " ELSE audit_tag\n" + + "END AS audit_tag " + + ", SUM(count) AS cnt, SUM(size) AS size\n" + "\t\t\t, SUM(delay) AS delay\n" + "\t\tFROM audit_data\n" + "\t\tWHERE log_ts BETWEEN ? AND ?\n" + @@ -55,7 +60,11 @@ public class SqlConstants { "\t) t_all_version\n" + "\t\tJOIN (\n" + "\t\t\tSELECT max(audit_version) AS audit_version, log_ts, inlong_group_id, inlong_stream_id\n" + - "\t\t\t\t, audit_id, audit_tag\n" + + "\t\t\t\t, audit_id, " + + "CASE \n" + + " WHEN audit_tag = '' THEN '-1'\n" + + " ELSE audit_tag\n" + + "END AS audit_tag \n" + "\t\t\tFROM audit_data\n" + "\t\t\tWHERE log_ts BETWEEN ? AND ?\n" + "\t\t\t\tAND audit_id = ?\n" + @@ -83,7 +92,11 @@ public class SqlConstants { public static final String KEY_SOURCE_QUERY_IDS_SQL = "source.query.ids.sql"; public static final String DEFAULT_SOURCE_QUERY_IDS_SQL = - "SELECT inlong_group_id, inlong_stream_id, audit_id, audit_tag\n" + + "SELECT inlong_group_id, inlong_stream_id, audit_id, " + + "CASE \n" + + " WHEN audit_tag = '' THEN '-1'\n" + + " ELSE audit_tag\n" + + "END AS audit_tag \n" + "\t, sum(count) AS cnt, sum(size) AS size\n" + "\t, sum(delay) AS delay\n" + "FROM audit_data\n" + @@ -100,7 +113,12 @@ public class SqlConstants { "FROM (\n" + "\tSELECT audit_version, docker_id, thread_id, sdk_ts, packet_id\n" + "\t\t, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id\n" + - "\t\t, audit_tag, count, size, delay\n" + + "\t\t, " + + " CASE \n" + + " WHEN audit_tag ='' THEN '-1'\n" + + " ELSE audit_tag\n" + + " END AS audit_tag ," + + " count, size, delay\n" + "\tFROM audit_data\n" + "\tWHERE log_ts BETWEEN ? AND ?\n" + "\t\tAND inlong_group_id = ?\n" + diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java index d8dcb7ebbf..3474df4968 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java @@ -55,7 +55,6 @@ import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTE import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS; import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE; import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_PARAMS_AUDIT_TAG; import static org.apache.inlong.audit.config.OpenApiConstants.HTTP_RESPOND_CODE; import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_BACKLOG_SIZE; import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_DAY_PATH; @@ -79,6 +78,7 @@ import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_STRE import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_IP; import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_START_TIME; import static org.apache.inlong.audit.config.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE; +import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG; import static org.apache.inlong.audit.entities.ApiType.DAY; import static org.apache.inlong.audit.entities.ApiType.GET_IDS; import static org.apache.inlong.audit.entities.ApiType.GET_IPS; @@ -180,7 +180,7 @@ public class ApiService { } } } - params.putIfAbsent(PARAMS_AUDIT_TAG, DEFAULT_PARAMS_AUDIT_TAG); + params.putIfAbsent(PARAMS_AUDIT_TAG, DEFAULT_AUDIT_TAG); return params; } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java index 89ee94b9e1..89157ab9a2 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java @@ -28,6 +28,7 @@ import org.apache.inlong.audit.utils.CacheUtils; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import lombok.Data; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,10 +71,9 @@ import static org.apache.inlong.audit.config.ConfigConstants.KEY_STAT_BACK_INITI import static org.apache.inlong.audit.config.ConfigConstants.KEY_STAT_THREAD_POOL_SIZE; import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE; import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_PARAMS_AUDIT_TAG; +import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG; import static org.apache.inlong.audit.entities.AuditCycle.DAY; import static org.apache.inlong.audit.entities.AuditCycle.HOUR; - /** * Jdbc source */ @@ -300,11 +300,7 @@ public class JdbcSource { data.setInlongStreamId(resultSet.getString(2)); data.setAuditId(resultSet.getString(3)); String auditTag = resultSet.getString(4); - if (null == auditTag) { - data.setAuditTag(DEFAULT_PARAMS_AUDIT_TAG); - } else { - data.setAuditTag(auditTag); - } + data.setAuditTag(StringUtils.isBlank(auditTag) ? DEFAULT_AUDIT_TAG : auditTag); long count = resultSet.getLong(5); data.setCount(count); data.setSize(resultSet.getLong(6));