This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e5ad544716 [INLONG-10306][Audit] Compatible with scenarios where the 
Audit Tag is empty (#10308)
e5ad544716 is described below

commit e5ad544716d34fc1cdc4997de07711ad5f805cef
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Wed May 29 19:44:25 2024 +0800

    [INLONG-10306][Audit] Compatible with scenarios where the Audit Tag is 
empty (#10308)
    
    * Compatible with scenarios where the Audit Tag is empty
    
    * update source.query.ids.sql
---
 .../org/apache/inlong/audit/AuditReporterImpl.java | 33 +++++++++++++++
 .../apache/inlong/audit/AuditReporterImplTest.java | 48 ++++++++++++++++++++++
 .../apache/inlong/audit/config/SqlConstants.java   | 26 ++++++++++--
 .../apache/inlong/audit/service/ApiService.java    |  4 +-
 .../org/apache/inlong/audit/source/JdbcSource.java | 10 ++---
 5 files changed, 108 insertions(+), 13 deletions(-)

diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index 1c8672748d..da3a1144e5 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -522,6 +522,38 @@ public class AuditReporterImpl implements Serializable {
         return AuditManagerUtils.buildAuditId(baseAuditId, success, 
isRealtime, discard, retry);
     }
 
+    public int buildSuccessfulAuditId(AuditIdEnum baseAuditId) {
+        return buildAuditId(baseAuditId, true, true, false, false);
+    }
+
+    public int buildSuccessfulAuditId(AuditIdEnum baseAuditId, boolean 
isRealtime) {
+        return buildAuditId(baseAuditId, true, isRealtime, false, false);
+    }
+
+    public int buildFailedAuditId(AuditIdEnum baseAuditId) {
+        return buildAuditId(baseAuditId, false, true, false, false);
+    }
+
+    public int buildFailedAuditId(AuditIdEnum baseAuditId, boolean isRealtime) 
{
+        return buildAuditId(baseAuditId, false, isRealtime, false, false);
+    }
+
+    public int buildDiscardAuditId(AuditIdEnum baseAuditId) {
+        return buildAuditId(baseAuditId, true, true, true, false);
+    }
+
+    public int buildDiscardAuditId(AuditIdEnum baseAuditId, boolean 
isRealtime) {
+        return buildAuditId(baseAuditId, true, isRealtime, true, false);
+    }
+
+    public int buildRetryAuditId(AuditIdEnum baseAuditId) {
+        return buildAuditId(baseAuditId, true, true, false, true);
+    }
+
+    public int buildRetryAuditId(AuditIdEnum baseAuditId, boolean isRealtime) {
+        return buildAuditId(baseAuditId, true, isRealtime, false, true);
+    }
+
     public AuditInformation buildAuditInformation(String auditType,
             FlowType dataFlow,
             boolean success,
@@ -530,6 +562,7 @@ public class AuditReporterImpl implements Serializable {
             boolean retry) {
         return AuditManagerUtils.buildAuditInformation(auditType, dataFlow, 
success, isRealtime, discard, retry);
     }
+
     public List<AuditInformation> getAllAuditInformation() {
         return AuditManagerUtils.getAllAuditInformation();
     }
diff --git 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditReporterImplTest.java
 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditReporterImplTest.java
new file mode 100644
index 0000000000..e579466dc6
--- /dev/null
+++ 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditReporterImplTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit;
+
+import org.junit.Test;
+
+import static org.apache.inlong.audit.AuditIdEnum.AGENT_INPUT;
+import static org.apache.inlong.audit.AuditIdEnum.SORT_HIVE_INPUT;
+import static org.junit.Assert.assertEquals;
+
+public class AuditReporterImplTest {
+
+    @Test
+    public void TestBuildAuditId() {
+        int auditId = 
AuditOperator.getInstance().buildSuccessfulAuditId(AGENT_INPUT);
+        assertEquals(3, auditId);
+        auditId = AuditOperator.getInstance().buildFailedAuditId(AGENT_INPUT);
+        assertEquals(524291, auditId);
+        auditId = AuditOperator.getInstance().buildRetryAuditId(AGENT_INPUT);
+        assertEquals(65539, auditId);
+        auditId = AuditOperator.getInstance().buildDiscardAuditId(AGENT_INPUT);
+        assertEquals(131075, auditId);
+
+        auditId = 
AuditOperator.getInstance().buildSuccessfulAuditId(SORT_HIVE_INPUT, false);
+        assertEquals(262151, auditId);
+        auditId = 
AuditOperator.getInstance().buildFailedAuditId(SORT_HIVE_INPUT, false);
+        assertEquals(786439, auditId);
+        auditId = 
AuditOperator.getInstance().buildDiscardAuditId(SORT_HIVE_INPUT, false);
+        assertEquals(393223, auditId);
+        auditId = 
AuditOperator.getInstance().buildRetryAuditId(SORT_HIVE_INPUT, false);
+        assertEquals(327687, auditId);
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index 884172d32a..f0eebca6de 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -46,7 +46,12 @@ public class SqlConstants {
                     "\t\t, t_all_version.cnt, t_all_version.size, 
t_all_version.delay\n" +
                     "\tFROM (\n" +
                     "\t\tSELECT audit_version, log_ts, inlong_group_id, 
inlong_stream_id, audit_id\n" +
-                    "\t\t\t, audit_tag, SUM(count) AS cnt, SUM(size) AS 
size\n" +
+                    "\t\t\t, " +
+                    "CASE \n" +
+                    "    WHEN audit_tag = '' THEN '-1'\n" +
+                    "    ELSE audit_tag\n" +
+                    "END AS audit_tag " +
+                    ", SUM(count) AS cnt, SUM(size) AS size\n" +
                     "\t\t\t, SUM(delay) AS delay\n" +
                     "\t\tFROM audit_data\n" +
                     "\t\tWHERE log_ts BETWEEN ? AND ?\n" +
@@ -55,7 +60,11 @@ public class SqlConstants {
                     "\t) t_all_version\n" +
                     "\t\tJOIN (\n" +
                     "\t\t\tSELECT max(audit_version) AS audit_version, log_ts, 
inlong_group_id, inlong_stream_id\n" +
-                    "\t\t\t\t, audit_id, audit_tag\n" +
+                    "\t\t\t\t, audit_id, " +
+                    "CASE \n" +
+                    "    WHEN audit_tag = '' THEN '-1'\n" +
+                    "    ELSE audit_tag\n" +
+                    "END AS audit_tag \n" +
                     "\t\t\tFROM audit_data\n" +
                     "\t\t\tWHERE log_ts BETWEEN ? AND ?\n" +
                     "\t\t\t\tAND audit_id = ?\n" +
@@ -83,7 +92,11 @@ public class SqlConstants {
 
     public static final String KEY_SOURCE_QUERY_IDS_SQL = 
"source.query.ids.sql";
     public static final String DEFAULT_SOURCE_QUERY_IDS_SQL =
-            "SELECT inlong_group_id, inlong_stream_id, audit_id, audit_tag\n" +
+            "SELECT inlong_group_id, inlong_stream_id, audit_id, " +
+                    "CASE \n" +
+                    "    WHEN audit_tag = '' THEN '-1'\n" +
+                    "    ELSE audit_tag\n" +
+                    "END AS audit_tag \n" +
                     "\t, sum(count) AS cnt, sum(size) AS size\n" +
                     "\t, sum(delay) AS delay\n" +
                     "FROM audit_data\n" +
@@ -100,7 +113,12 @@ public class SqlConstants {
                     "FROM (\n" +
                     "\tSELECT audit_version, docker_id, thread_id, sdk_ts, 
packet_id\n" +
                     "\t\t, log_ts, ip, inlong_group_id, inlong_stream_id, 
audit_id\n" +
-                    "\t\t, audit_tag, count, size, delay\n" +
+                    "\t\t, " +
+                    "   CASE \n" +
+                    "        WHEN audit_tag ='' THEN '-1'\n" +
+                    "        ELSE audit_tag\n" +
+                    "    END AS audit_tag ," +
+                    " count, size, delay\n" +
                     "\tFROM audit_data\n" +
                     "\tWHERE log_ts BETWEEN ? AND ?\n" +
                     "\t\tAND inlong_group_id = ?\n" +
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
index d8dcb7ebbf..3474df4968 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java
@@ -55,7 +55,6 @@ import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTE
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_PARAMS_AUDIT_TAG;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.HTTP_RESPOND_CODE;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.KEY_API_BACKLOG_SIZE;
 import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_DAY_PATH;
@@ -79,6 +78,7 @@ import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_STRE
 import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_IP;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.PARAMS_START_TIME;
 import static 
org.apache.inlong.audit.config.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE;
+import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
 import static org.apache.inlong.audit.entities.ApiType.DAY;
 import static org.apache.inlong.audit.entities.ApiType.GET_IDS;
 import static org.apache.inlong.audit.entities.ApiType.GET_IPS;
@@ -180,7 +180,7 @@ public class ApiService {
                     }
                 }
             }
-            params.putIfAbsent(PARAMS_AUDIT_TAG, DEFAULT_PARAMS_AUDIT_TAG);
+            params.putIfAbsent(PARAMS_AUDIT_TAG, DEFAULT_AUDIT_TAG);
             return params;
         }
 
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
index 89ee94b9e1..89157ab9a2 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
@@ -28,6 +28,7 @@ import org.apache.inlong.audit.utils.CacheUtils;
 import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.HikariDataSource;
 import lombok.Data;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,10 +71,9 @@ import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_STAT_BACK_INITI
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_STAT_THREAD_POOL_SIZE;
 import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
 import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
-import static 
org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_PARAMS_AUDIT_TAG;
+import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
 import static org.apache.inlong.audit.entities.AuditCycle.DAY;
 import static org.apache.inlong.audit.entities.AuditCycle.HOUR;
-
 /**
  * Jdbc source
  */
@@ -300,11 +300,7 @@ public class JdbcSource {
                         data.setInlongStreamId(resultSet.getString(2));
                         data.setAuditId(resultSet.getString(3));
                         String auditTag = resultSet.getString(4);
-                        if (null == auditTag) {
-                            data.setAuditTag(DEFAULT_PARAMS_AUDIT_TAG);
-                        } else {
-                            data.setAuditTag(auditTag);
-                        }
+                        data.setAuditTag(StringUtils.isBlank(auditTag) ? 
DEFAULT_AUDIT_TAG : auditTag);
                         long count = resultSet.getLong(5);
                         data.setCount(count);
                         data.setSize(resultSet.getLong(6));

Reply via email to