This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 1d76d14334 KYLIN-5973 fix streaming job
1d76d14334 is described below

commit 1d76d143341b70fcc326782e8111bfbd28cdc21c
Author: Zhimin Wu <[email protected]>
AuthorDate: Thu Sep 12 16:23:37 2024 +0800

    KYLIN-5973 fix streaming job
---
 .../MetadataBackupServiceJdbcMetadataTest.java     |  3 +-
 .../apache/kylin/common/persistence/AuditLog.java  |  2 ++
 .../kylin/common/persistence/RawResource.java      |  4 +++
 .../metadata/FileSystemMetadataStore.java          |  2 +-
 .../persistence/metadata/JdbcAuditLogStore.java    | 34 +++++++++++++---------
 .../metadata/JdbcPartialAuditLogStore.java         |  4 +--
 .../persistence/metadata/MemoryAuditLogStore.java  |  8 +++--
 .../metadata/jdbc/AuditLogRowMapper.java           | 13 +++++----
 .../metadata/mapper/FusionModelMapper.java         |  8 +++--
 .../mapper/StreamingJobDynamicSqlSupport.java      |  7 +++++
 .../metadata/mapper/StreamingJobMapper.java        |  1 +
 .../persistence/resources/DataflowRawResource.java |  5 ++++
 .../resources/IndexPlanRawResource.java            |  5 ++++
 .../resources/LayoutDetailsRawResource.java        |  6 ++++
 .../persistence/resources/LayoutRawResource.java   |  4 +++
 .../persistence/resources/ModelRawResource.java    |  5 ++++
 .../resources/StreamingJobRawResource.java         |  3 ++
 .../transaction/AbstractAuditLogReplayWorker.java  |  5 ++--
 .../transaction/AuditLogReplayWorker.java          |  3 +-
 .../resources/metadata-jdbc-default.properties     |  1 +
 .../src/main/resources/metadata-jdbc-h2.properties |  1 +
 .../main/resources/metadata-jdbc-mysql.properties  |  2 ++
 .../resources/metadata-jdbc-postgresql.properties  |  2 ++
 .../metadata/JdbcAuditLogReplayerTest.java         |  2 +-
 .../metadata/JdbcAuditLogStoreTest.java            | 22 +++++++-------
 .../metadata/JdbcAuditLogStoreTool.java            | 25 +++++++++++-----
 .../metadata/JdbcMetadataStoreTest.java            |  2 +-
 .../jdbc/JdbcPartialAuditLogStoreTest.java         | 17 +++++------
 .../transaction/AuditReplayWorkerTest.java         | 21 ++++++++-----
 .../kylin/common/util/AuditLogCheckerTest.java     |  6 ++--
 .../apache/kylin/metadata/model/FusionModel.java   |  2 --
 .../streaming/FusionModelJdbcMetadataTest.java     | 27 +++++++++++++++++
 .../org/apache/kylin/event/HAMetadataTest.java     |  4 +--
 .../kylin/streaming/app/StreamingApplication.java  |  7 +----
 .../streaming/jobs/impl/StreamingJobLauncher.java  |  4 ++-
 .../jobs/impl/StreamingJobLauncherTest.java        |  3 +-
 .../UpdateAuditLogTableColumnLengthCLI.java        |  3 +-
 .../org/apache/kylin/tool/AuditLogToolTest.java    |  5 ++--
 .../org/apache/kylin/tool/AuditLogWorkerTest.java  |  3 +-
 .../tool/security/KylinPasswordResetCLITest.java   |  3 +-
 .../ut_audit_log/ke_metadata_test_audit_log.json   | 29 +++++++++++++++++-
 41 files changed, 224 insertions(+), 89 deletions(-)

diff --git 
a/src/common-service/src/test/java/org/apache/kylin/rest/service/MetadataBackupServiceJdbcMetadataTest.java
 
b/src/common-service/src/test/java/org/apache/kylin/rest/service/MetadataBackupServiceJdbcMetadataTest.java
index da12d50e1d..66decdab6e 100644
--- 
a/src/common-service/src/test/java/org/apache/kylin/rest/service/MetadataBackupServiceJdbcMetadataTest.java
+++ 
b/src/common-service/src/test/java/org/apache/kylin/rest/service/MetadataBackupServiceJdbcMetadataTest.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.persistence.ImageDesc;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.StringEntity;
+import org.apache.kylin.common.persistence.metadata.JdbcAuditLogStore;
 import org.apache.kylin.common.persistence.metadata.JdbcAuditLogStoreTool;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
@@ -72,7 +73,7 @@ class MetadataBackupServiceJdbcMetadataTest {
         val url = getTestConfig().getMetadataUrl();
         val jdbcTemplate = info.getJdbcTemplate();
         JdbcAuditLogStoreTool.prepareJdbcAuditLogStore(null, jdbcTemplate, 
100);
-        val table = url.getIdentifier() + "_audit_log";
+        val table = url.getIdentifier() + JdbcAuditLogStore.AUDIT_LOG_SUFFIX;
         long count = jdbcTemplate.queryForObject("select count(1) from " + 
table, Long.class);
 
         val rootPath = new 
Path(kylinConfig.getHdfsWorkingDirectory()).getParent();
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/AuditLog.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/AuditLog.java
index 4c4db1da91..e8a84cf535 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/AuditLog.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/AuditLog.java
@@ -55,6 +55,8 @@ public class AuditLog {
 
     private String unitId;
 
+    private String modelUuid;
+
     private String operator;
 
     private String instance;
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
index 9874c6b911..d6b76ecebb 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/RawResource.java
@@ -220,6 +220,10 @@ public class RawResource {
         return GLOBAL_PROJECT;
     }
 
+    public String getModelUuid() {
+        return null;
+    }
+
     public void setProject(String resPath) {
         // Filed project is only defined in partial subclasses.
         // Please override this method if needed.
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileSystemMetadataStore.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileSystemMetadataStore.java
index 8ffb2c59f8..b68b921375 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileSystemMetadataStore.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileSystemMetadataStore.java
@@ -876,7 +876,7 @@ public class FileSystemMetadataStore extends MetadataStore {
         }
     }
 
-    private static class SnapShotCompressHandler implements 
CompressHandlerInterface {
+    public static class SnapShotCompressHandler implements 
CompressHandlerInterface {
         @Override
         @SuppressWarnings("unchecked")
         public <T extends RawResource> T read(InputStream in, String resPath, 
long time, MetadataType type)
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java
index c8cf58e594..924202af5d 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java
@@ -68,7 +68,7 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class JdbcAuditLogStore implements AuditLogStore {
 
-    static final String AUDIT_LOG_SUFFIX = "_audit_log";
+    public static final String AUDIT_LOG_SUFFIX = "_audit_log_v2";
 
     public static final String SELECT_TERM = "select ";
 
@@ -77,6 +77,7 @@ public class JdbcAuditLogStore implements AuditLogStore {
     static final String AUDIT_LOG_TABLE_CONTENT = "meta_content";
     static final String AUDIT_LOG_TABLE_TS = "meta_ts";
     static final String AUDIT_LOG_TABLE_MVCC = "meta_mvcc";
+    static final String AUDIT_LOG_MODEL_UUID = "model_uuid";
     static final String AUDIT_LOG_TABLE_UNIT = "unit_id";
     static final String AUDIT_LOG_TABLE_OPERATOR = "operator";
     static final String AUDIT_LOG_TABLE_INSTANCE = "instance";
@@ -90,20 +91,24 @@ public class JdbcAuditLogStore implements AuditLogStore {
 
     static final String INSERT_SQL = "insert into %s (" + 
Joiner.on(",").join(AUDIT_LOG_TABLE_KEY,
             AUDIT_LOG_TABLE_CONTENT, AUDIT_LOG_TABLE_TS, AUDIT_LOG_TABLE_MVCC, 
AUDIT_LOG_TABLE_UNIT,
-            AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, PROJECT_FIELD, 
AUDIT_LOG_DIFF_FLAG)
-            + ") values (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+            AUDIT_LOG_MODEL_UUID, AUDIT_LOG_TABLE_OPERATOR, 
AUDIT_LOG_TABLE_INSTANCE, PROJECT_FIELD,
+            AUDIT_LOG_DIFF_FLAG)
+            + ") values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
     static final String SELECT_BY_RANGE_SQL = SELECT_TERM
-            + Joiner.on(",").join(AUDIT_LOG_TABLE_ID, AUDIT_LOG_TABLE_KEY, 
AUDIT_LOG_TABLE_CONTENT, AUDIT_LOG_TABLE_TS,
-                    AUDIT_LOG_TABLE_MVCC, AUDIT_LOG_TABLE_UNIT, 
AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, PROJECT_FIELD, 
AUDIT_LOG_DIFF_FLAG)
+            + Joiner.on(",").join(AUDIT_LOG_TABLE_ID, AUDIT_LOG_TABLE_KEY, 
AUDIT_LOG_TABLE_CONTENT,
+            AUDIT_LOG_TABLE_TS, AUDIT_LOG_TABLE_MVCC, AUDIT_LOG_TABLE_UNIT, 
AUDIT_LOG_MODEL_UUID,
+            AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, PROJECT_FIELD, 
AUDIT_LOG_DIFF_FLAG)
             + " from %s where id > %d and id <= %d order by id";
     static final String SELECT_BY_ID_SQL = SELECT_TERM
-            + Joiner.on(",").join(AUDIT_LOG_TABLE_ID, AUDIT_LOG_TABLE_KEY, 
AUDIT_LOG_TABLE_CONTENT, AUDIT_LOG_TABLE_TS,
-                    AUDIT_LOG_TABLE_MVCC, AUDIT_LOG_TABLE_UNIT, 
AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, PROJECT_FIELD, 
AUDIT_LOG_DIFF_FLAG)
+            + Joiner.on(",").join(AUDIT_LOG_TABLE_ID, AUDIT_LOG_TABLE_KEY, 
AUDIT_LOG_TABLE_CONTENT,
+            AUDIT_LOG_TABLE_TS, AUDIT_LOG_TABLE_MVCC, AUDIT_LOG_TABLE_UNIT, 
AUDIT_LOG_MODEL_UUID,
+            AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, PROJECT_FIELD, 
AUDIT_LOG_DIFF_FLAG)
             + " from %s where id in(%s) order by id";
 
     static final String SELECT_BY_PROJECT_RANGE_SQL = SELECT_TERM
-            + Joiner.on(",").join(AUDIT_LOG_TABLE_ID, AUDIT_LOG_TABLE_KEY, 
AUDIT_LOG_TABLE_CONTENT, AUDIT_LOG_TABLE_TS,
-                    AUDIT_LOG_TABLE_MVCC, AUDIT_LOG_TABLE_UNIT, 
AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, PROJECT_FIELD, 
AUDIT_LOG_DIFF_FLAG)
+            + Joiner.on(",").join(AUDIT_LOG_TABLE_ID, AUDIT_LOG_TABLE_KEY, 
AUDIT_LOG_TABLE_CONTENT,
+            AUDIT_LOG_TABLE_TS, AUDIT_LOG_TABLE_MVCC, AUDIT_LOG_TABLE_UNIT, 
AUDIT_LOG_MODEL_UUID,
+            AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, PROJECT_FIELD, 
AUDIT_LOG_DIFF_FLAG)
             + " from %s where meta_key like '/%s/%%' and id > %d and id <= %d 
order by id";
 
     static final String SELECT_MAX_ID_SQL = "select max(id) from %s";
@@ -113,7 +118,8 @@ public class JdbcAuditLogStore implements AuditLogStore {
 
     static final String SELECT_LIST_TERM = SELECT_TERM + 
Joiner.on(",").join(AUDIT_LOG_TABLE_ID, AUDIT_LOG_TABLE_KEY,
             AUDIT_LOG_TABLE_CONTENT, AUDIT_LOG_TABLE_TS, AUDIT_LOG_TABLE_MVCC, 
AUDIT_LOG_TABLE_UNIT,
-            AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE, PROJECT_FIELD, 
AUDIT_LOG_DIFF_FLAG);
+            AUDIT_LOG_MODEL_UUID, AUDIT_LOG_TABLE_OPERATOR, 
AUDIT_LOG_TABLE_INSTANCE, PROJECT_FIELD,
+            AUDIT_LOG_DIFF_FLAG);
     static final String SELECT_TS_RANGE = SELECT_LIST_TERM
             + " from %s where id < %d and meta_ts between %d and %d order by 
id desc limit %d";
 
@@ -198,7 +204,9 @@ public class JdbcAuditLogStore implements AuditLogStore {
                                             CompressionUtils
                                                     
.compress(ByteSource.wrap(createEvent.getMetaContent()).read()),
                                             
createEvent.getCreatedOrUpdated().getTs(),
-                                            
createEvent.getCreatedOrUpdated().getMvcc(), unitId, operator, instance,
+                                            
createEvent.getCreatedOrUpdated().getMvcc(), unitId,
+                                            
createEvent.getCreatedOrUpdated().getModelUuid(),
+                                            operator, instance,
                                             
createEvent.getCreatedOrUpdated().getProject(),
                                             
createEvent.getCreatedOrUpdated().getContentDiff() != null };
                                 } catch (IOException ignore) {
@@ -207,7 +215,7 @@ public class JdbcAuditLogStore implements AuditLogStore {
                             } else if (e instanceof ResourceDeleteEvent) {
                                 ResourceDeleteEvent deleteEvent = 
(ResourceDeleteEvent) e;
                                 return new Object[] { 
deleteEvent.getResPath(), null, System.currentTimeMillis(), null,
-                                        unitId, operator, instance, 
deleteEvent.getKey(), false };
+                                        unitId, null, operator, instance, 
deleteEvent.getKey(), false };
                             }
                             return null;
                         
}).filter(Objects::nonNull).collect(Collectors.toList())),
@@ -220,7 +228,7 @@ public class JdbcAuditLogStore implements AuditLogStore {
                     try {
                         val bs = Objects.isNull(x.getByteSource()) ? null : 
x.getByteSource().read();
                         return new Object[] { x.getResPath(), 
CompressionUtils.compress(bs), x.getTimestamp(),
-                                x.getMvcc(), x.getUnitId(), x.getOperator(), 
x.getInstance(), x.getProject(),
+                                x.getMvcc(), x.getUnitId(), x.getModelUuid(), 
x.getOperator(), x.getInstance(), x.getProject(),
                                 x.isDiffFlag() };
                     } catch (IOException e) {
                         return null;
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcPartialAuditLogStore.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcPartialAuditLogStore.java
index 3b9f8afcd1..d2aecd0a89 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcPartialAuditLogStore.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcPartialAuditLogStore.java
@@ -26,8 +26,8 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class JdbcPartialAuditLogStore extends JdbcAuditLogStore {
 
-    public JdbcPartialAuditLogStore(KylinConfig config, Predicate<String> 
filterByResPath) throws Exception {
+    public JdbcPartialAuditLogStore(KylinConfig config, String modelUuid) 
throws Exception {
         super(config);
-        replayWorker.setFilterByResPath(filterByResPath);
+        replayWorker.setModelUuid(modelUuid);
     }
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/MemoryAuditLogStore.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/MemoryAuditLogStore.java
index 2ccaf90461..ec0bad982f 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/MemoryAuditLogStore.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/MemoryAuditLogStore.java
@@ -56,6 +56,7 @@ public class MemoryAuditLogStore implements AuditLogStore {
         this.config = config;
         this.replayWorker = new AuditLogReplayWorker(config, this);
     }
+
     @Override
     public void save(UnitMessages unitMessages) {
         val unitId = unitMessages.getUnitId();
@@ -67,11 +68,12 @@ public class MemoryAuditLogStore implements AuditLogStore {
                 ResourceCreateOrUpdateEvent e = (ResourceCreateOrUpdateEvent) 
event;
                 RawResource raw = e.getCreatedOrUpdated();
                 return new AuditLog(id, e.getResPath(), 
ByteSource.wrap(e.getMetaContent()), raw.getTs(), raw.getMvcc(),
-                        unitId, operator, instance, raw.getProject(), 
raw.getContentDiff() != null);
+                        unitId, raw.getModelUuid(), operator, instance, 
raw.getProject(),
+                        raw.getContentDiff() != null);
             } else if (event instanceof ResourceDeleteEvent) {
                 ResourceDeleteEvent e = (ResourceDeleteEvent) event;
-                return new AuditLog(id, e.getResPath(), null, 
System.currentTimeMillis(), null, unitId, operator,
-                        instance, e.getKey(), false);
+                return new AuditLog(id, e.getResPath(), null, 
System.currentTimeMillis(), null, unitId,
+                        null, operator, instance, e.getKey(), false);
             }
             return null;
         }).filter(Objects::nonNull).collect(Collectors.toList());
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/AuditLogRowMapper.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/AuditLogRowMapper.java
index a01763998b..7b4dc55ccc 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/AuditLogRowMapper.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/AuditLogRowMapper.java
@@ -44,12 +44,13 @@ public class AuditLogRowMapper implements 
RowMapper<AuditLog> {
             mvcc = null;
         }
         val unitId = rs.getString(6);
-        val operator = rs.getString(7);
-        val instance = rs.getString(8);
-        val project = rs.getString(9);
-        val diffFlag = rs.getBoolean(10);
+        val modelUuid = rs.getString(7);
+        val operator = rs.getString(8);
+        val instance = rs.getString(9);
+        val project = rs.getString(10);
+        val diffFlag = rs.getBoolean(11);
 
-        return new AuditLog(id, resPath, content == null ? null : 
ByteSource.wrap(content), ts, mvcc, unitId, operator,
-                instance, project, diffFlag);
+        return new AuditLog(id, resPath, content == null ? null : 
ByteSource.wrap(content), ts, mvcc, unitId,
+                modelUuid, operator, instance, project, diffFlag);
     }
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java
index 96f302e516..ed36d80473 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java
@@ -20,12 +20,11 @@ package org.apache.kylin.common.persistence.metadata.mapper;
 import static 
org.apache.kylin.common.persistence.metadata.mapper.FusionModelDynamicSqlSupport.sqlTable;
 
 import java.util.List;
+import java.util.Optional;
 
-import org.apache.ibatis.annotations.InsertProvider;
 import org.apache.ibatis.annotations.Result;
 import org.apache.ibatis.annotations.ResultMap;
 import org.apache.ibatis.annotations.Results;
-import org.apache.ibatis.annotations.SelectKey;
 import org.apache.ibatis.annotations.SelectProvider;
 import org.apache.ibatis.type.JdbcType;
 import org.apache.kylin.common.persistence.metadata.jdbc.ContentTypeHandler;
@@ -40,6 +39,11 @@ import org.mybatis.dynamic.sql.util.SqlProviderAdapter;
 
 public interface FusionModelMapper extends BasicMapper<FusionModelRawResource> 
{
 
+    @Override
+    @SelectProvider(type = SqlProviderAdapter.class, method = "select")
+    @ResultMap("FusionModelResult")
+    Optional<FusionModelRawResource> selectOne(SelectStatementProvider 
selectStatement);
+
     @Override
     @SelectProvider(type = SqlProviderAdapter.class, method = "select")
     @Results(id = "FusionModelResult", value = {
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobDynamicSqlSupport.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobDynamicSqlSupport.java
index 78b1e04365..235de0d3a7 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobDynamicSqlSupport.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobDynamicSqlSupport.java
@@ -1,5 +1,9 @@
 package org.apache.kylin.common.persistence.metadata.mapper;
 
+import org.mybatis.dynamic.sql.SqlColumn;
+
+import java.sql.JDBCType;
+
 public final class StreamingJobDynamicSqlSupport {
 
     public static final StreamingJob sqlTable = new StreamingJob();
@@ -8,6 +12,9 @@ public final class StreamingJobDynamicSqlSupport {
     }
 
     public static final class StreamingJob extends BasicSqlTable<StreamingJob> 
{
+
+        public final SqlColumn<String> modelUuid = column("model_uuid", 
JDBCType.CHAR);
+
         public StreamingJob() {
             super("streaming_job", StreamingJob::new);
         }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java
index bf583364b0..406c84bc80 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java
@@ -39,6 +39,7 @@ public interface StreamingJobMapper extends 
BasicMapper<StreamingJobRawResource>
             @Result(column = "meta_key", property = "metaKey", jdbcType = 
JdbcType.VARCHAR),
             @Result(column = "project", property = "project", jdbcType = 
JdbcType.VARCHAR),
             @Result(column = "uuid", property = "uuid", jdbcType = 
JdbcType.CHAR),
+            @Result(column = "model_uuid", property = "modelUuid", jdbcType = 
JdbcType.CHAR),
             @Result(column = "mvcc", property = "mvcc", jdbcType = 
JdbcType.BIGINT),
             @Result(column = "ts", property = "ts", jdbcType = 
JdbcType.BIGINT),
             @Result(column = "reserved_filed_1", property = "reservedFiled1", 
jdbcType = JdbcType.VARCHAR),
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/DataflowRawResource.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/DataflowRawResource.java
index 389011760d..2271c30b76 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/DataflowRawResource.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/DataflowRawResource.java
@@ -31,4 +31,9 @@ import lombok.NoArgsConstructor;
 public class DataflowRawResource extends RawResource {
     @JsonProperty("project")
     private String project;
+
+    @Override
+    public String getModelUuid() {
+        return this.getUuid();
+    }
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/IndexPlanRawResource.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/IndexPlanRawResource.java
index 80f1facb48..d4b8a0454c 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/IndexPlanRawResource.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/IndexPlanRawResource.java
@@ -32,4 +32,9 @@ public class IndexPlanRawResource extends RawResource {
     @JsonProperty("project")
     private String project;
 
+    @Override
+    public String getModelUuid() {
+        return this.getUuid();
+    }
+
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/LayoutDetailsRawResource.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/LayoutDetailsRawResource.java
index 0910fb1d5f..335f387721 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/LayoutDetailsRawResource.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/LayoutDetailsRawResource.java
@@ -38,4 +38,10 @@ public class LayoutDetailsRawResource extends RawResource {
 
     @JsonProperty("layout")
     private long layoutId;
+
+    @Override
+    public String getModelUuid() {
+        return this.dataflowId;
+    }
+
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/LayoutRawResource.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/LayoutRawResource.java
index 49133eee20..072843763f 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/LayoutRawResource.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/LayoutRawResource.java
@@ -35,4 +35,8 @@ public class LayoutRawResource extends RawResource {
     @JsonProperty("dataflow")
     private String dataflowId;
 
+    @Override
+    public String getModelUuid() {
+        return this.dataflowId;
+    }
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/ModelRawResource.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/ModelRawResource.java
index 5e57e16a05..843df50448 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/ModelRawResource.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/ModelRawResource.java
@@ -34,4 +34,9 @@ public class ModelRawResource extends RawResource {
 
     @JsonProperty("alias")
     private String alias;
+
+    @Override
+    public String getModelUuid() {
+        return this.getUuid();
+    }
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/StreamingJobRawResource.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/StreamingJobRawResource.java
index 959c6aabb7..33be049aff 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/StreamingJobRawResource.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/StreamingJobRawResource.java
@@ -29,4 +29,7 @@ import lombok.EqualsAndHashCode;
 public class StreamingJobRawResource extends RawResource {
     @JsonProperty("project")
     private String project;
+
+    @JsonProperty("model_id")
+    private String modelUuid;
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.java
index 312b69ec2a..8863888412 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.java
@@ -26,7 +26,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Predicate;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -64,7 +63,7 @@ public abstract class AbstractAuditLogReplayWorker {
     protected final long replayWaitMaxTimeoutMills;
 
     @Setter
-    protected Predicate<String> filterByResPath;
+    protected String modelUuid;
 
     protected AbstractAuditLogReplayWorker(KylinConfig config, AuditLogStore 
auditLogStore) {
         this.config = config;
@@ -102,7 +101,7 @@ public abstract class AbstractAuditLogReplayWorker {
         }
         Map<String, UnitMessages> messagesMap = Maps.newLinkedHashMap();
         for (AuditLog log : logs) {
-            if (filterByResPath != null && 
!filterByResPath.test(log.getResPath())) {
+            if (modelUuid != null && !modelUuid.equals(log.getModelUuid())) {
                 continue;
             }
 
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AuditLogReplayWorker.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AuditLogReplayWorker.java
index d7df7a142e..32bf14be1f 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AuditLogReplayWorker.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AuditLogReplayWorker.java
@@ -280,7 +280,8 @@ public class AuditLogReplayWorker extends 
AbstractAuditLogReplayWorker {
                     correctedResource.getMvcc());
             String resPath = correctedResource.generateKeyWithType();
             val fixResource = new AuditLog(0L, resPath, 
correctedResource.getByteSource(), correctedResource.getTs(),
-                    originResource.getMvcc() + 1, null, null, null, 
correctedResource.getProject(), false);
+                    originResource.getMvcc() + 1, null, null, null,
+                    null, correctedResource.getProject(), false);
             replayer.replay(new 
UnitMessages(Lists.newArrayList(Event.fromLog(fixResource))));
 
             val currentAuditLog = 
resourceStore.getAuditLogStore().get(resPath, targetResource.getMvcc());
diff --git 
a/src/core-common/src/main/resources/metadata-jdbc-default.properties 
b/src/core-common/src/main/resources/metadata-jdbc-default.properties
index 6bbb03b2a0..5eecd4ce58 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-default.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-default.properties
@@ -21,6 +21,7 @@ create.auditlog.store.table=create table if not exists %s ( \
   %s bigint, \
   %s bigint, \
   unit_id varchar(50), \
+  model_uuid char(36), \
   operator varchar(200), \
   instance varchar(100), \
   %s varchar(255), \
diff --git a/src/core-common/src/main/resources/metadata-jdbc-h2.properties 
b/src/core-common/src/main/resources/metadata-jdbc-h2.properties
index a782d17047..1b5f558840 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-h2.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-h2.properties
@@ -509,6 +509,7 @@ id               bigint AUTO_INCREMENT NOT NULL,\
 meta_key         varchar(255)          NOT NULL,\
 project          varchar(255)          NOT NULL,\
 uuid             CHAR(64)              NOT NULL,\
+model_uuid       CHAR(36)              NOT NULL,\
 mvcc             bigint                NOT NULL,\
 ts               bigint                NOT NULL,\
 content          bytea                 NOT NULL,\
diff --git a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties 
b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
index f61640a013..3b7dd85199 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
@@ -566,6 +566,7 @@ CREATE TABLE IF NOT EXISTS `%s_streaming_job` \
 `meta_key`         varchar(255)    NOT NULL, \
 `project`          varchar(255)    NOT NULL, \
 `uuid`             CHAR(64)        NOT NULL COLLATE utf8_bin, \
+`model_uuid`       CHAR(36)        NOT NULL, \
 `mvcc`             bigint          NOT NULL, \
 `ts`               bigint          NOT NULL, \
 `content`          longblob        NOT NULL, \
@@ -604,6 +605,7 @@ create.auditlog.store.table=create table if not exists %s ( 
\
   %s bigint, \
   %s bigint, \
   unit_id varchar(50), \
+  model_uuid CHAR(36), \
   operator varchar(200), \
   instance varchar(100), \
   %s varchar(255), \
diff --git 
a/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties 
b/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
index c81fd29676..734720d886 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
@@ -509,6 +509,7 @@ id               bigserial            NOT NULL,\
 meta_key         varchar(255)         NOT NULL,\
 project          varchar(255)         NOT NULL,\
 uuid             CHAR(64) COLLATE "C" NOT NULL,\
+model_uuid       CHAR(36) COLLATE "C" NOT NULL,\
 mvcc             bigint               NOT NULL,\
 ts               bigint               NOT NULL,\
 content          bytea                NOT NULL,\
@@ -543,6 +544,7 @@ create.auditlog.store.table=create table if not exists %s ( 
\
   %s bigint, \
   %s bigint, \
   unit_id varchar(50), \
+  model_uuid char(36), \
   operator varchar(200), \
   instance varchar(100), \
   %s varchar(255), \
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogReplayerTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogReplayerTest.java
index 5cc2f0dc82..4f72a6c333 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogReplayerTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogReplayerTest.java
@@ -53,7 +53,7 @@ public class JdbcAuditLogReplayerTest {
         auditLogStore.restore(0);
         Assert.assertEquals(2, 
workerStore.listResourcesRecursively(MetadataType.ALL.name()).size());
 
-        val auditLogTableName = info.getTableName() + "_audit_log";
+        val auditLogTableName = info.getTableName() + 
JdbcAuditLogStore.AUDIT_LOG_SUFFIX;
 
         jdbcTemplate.batchUpdate("ALTER TABLE " + auditLogTableName + " RENAME 
TO TEST_AUDIT_LOG_TEST",
                 "ALTER TABLE " + info.getTableName() + "_project RENAME TO 
TEST_TEST");
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStoreTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStoreTest.java
index 67c588e82f..4a06284d95 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStoreTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStoreTest.java
@@ -111,7 +111,7 @@ class JdbcAuditLogStoreTest {
         prepare2Resource();
         val url = getTestConfig().getMetadataUrl();
         val jdbcTemplate = info.getJdbcTemplate();
-        val all = jdbcTemplate.query(SELECT_LIST_TERM + " from " + 
url.getIdentifier() + "_audit_log",
+        val all = jdbcTemplate.query(SELECT_LIST_TERM + " from " + 
url.getIdentifier() + JdbcAuditLogStore.AUDIT_LOG_SUFFIX,
                 new AuditLogRowMapper());
 
         Assert.assertEquals(5, all.size());
@@ -141,7 +141,7 @@ class JdbcAuditLogStoreTest {
             return 0;
         }, "_global");
 
-        val allStep2 = jdbcTemplate.query(SELECT_LIST_TERM + " from " + 
url.getIdentifier() + "_audit_log",
+        val allStep2 = jdbcTemplate.query(SELECT_LIST_TERM + " from " + 
url.getIdentifier() + JdbcAuditLogStore.AUDIT_LOG_SUFFIX,
                 new AuditLogRowMapper());
 
         Assert.assertEquals(7, allStep2.size());
@@ -261,7 +261,7 @@ class JdbcAuditLogStoreTest {
         String unitId = RandomUtil.randomUUIDStr();
         String sql = "insert into %s (id, 
meta_key,meta_content,meta_ts,meta_mvcc,unit_id,operator,instance,project,"
                 + "diff_flag) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, sql, 
url.getIdentifier() + "_audit_log"),
+        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, sql, 
url.getIdentifier() + JdbcAuditLogStore.AUDIT_LOG_SUFFIX),
                 Arrays.asList(
                         new Object[] { 22, "PROJECT/abc",
                                 ("{ \"uuid\" : \"" + UUID.randomUUID()
@@ -287,7 +287,7 @@ class JdbcAuditLogStoreTest {
         String unitId = RandomUtil.randomUUIDStr();
         String sql = "insert into %s (id, 
meta_key,meta_content,meta_ts,meta_mvcc,unit_id,operator,instance,project,"
                 + "diff_flag) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, sql, 
url.getIdentifier() + "_audit_log"),
+        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, sql, 
url.getIdentifier() + JdbcAuditLogStore.AUDIT_LOG_SUFFIX),
                 Arrays.asList(
                         new Object[] { 900, "PROJECT/abc",
                                 ("{ \"uuid\" : \"" + UUID.randomUUID()
@@ -301,7 +301,7 @@ class JdbcAuditLogStoreTest {
         // It will execute a 5s once scheduled thread
         workerStore.getAuditLogStore().restore(3);
         Assertions.assertEquals(3, 
workerStore.listResourcesRecursively(MetadataType.ALL.name()).size());
-        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, sql, 
url.getIdentifier() + "_audit_log"),
+        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, sql, 
url.getIdentifier() + JdbcAuditLogStore.AUDIT_LOG_SUFFIX),
                 Arrays.asList(
                         new Object[] { 800, "PROJECT/abc3",
                                 ("{ \"uuid\" : \"" + UUID.randomUUID()
@@ -355,7 +355,7 @@ class JdbcAuditLogStoreTest {
         String unitId = RandomUtil.randomUUIDStr();
         String sql = "insert into %s (id, 
meta_key,meta_content,meta_ts,meta_mvcc,unit_id,operator,instance,project,"
                 + "diff_flag) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, sql, 
url.getIdentifier() + "_audit_log"),
+        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, sql, 
url.getIdentifier() + JdbcAuditLogStore.AUDIT_LOG_SUFFIX),
                 Arrays.asList(
                         new Object[] { 900, "PROJECT/abc",
                                 ("{ \"uuid\" : \"" + UUID.randomUUID()
@@ -473,7 +473,7 @@ class JdbcAuditLogStoreTest {
                 StringEntity.serializer);
         Assert.assertEquals(1, 
workerStore.listResourcesRecursively(MetadataType.ALL.name()).size());
         val jdbcTemplate = info.getJdbcTemplate();
-        val auditLogTableName = testConfig.getMetadataUrl().getIdentifier() + 
"_audit_log";
+        val auditLogTableName = testConfig.getMetadataUrl().getIdentifier() + 
JdbcAuditLogStore.AUDIT_LOG_SUFFIX;
 
         val stopped = new AtomicBoolean(false);
         new Thread(() -> {
@@ -524,10 +524,10 @@ class JdbcAuditLogStoreTest {
         val dataSource = BasicDataSourceFactory.createDataSource(props);
         val transactionManager = new DataSourceTransactionManager(dataSource);
         val auditLogStore = new JdbcAuditLogStore(config, jdbcTemplate, 
transactionManager,
-                config.getMetadataUrl().getIdentifier() + "_audit_log");
+                config.getMetadataUrl().getIdentifier() + 
JdbcAuditLogStore.AUDIT_LOG_SUFFIX);
         auditLogStore.createIfNotExist();
 
-        val auditLogTableName = config.getMetadataUrl().getIdentifier() + 
"_audit_log";
+        val auditLogTableName = config.getMetadataUrl().getIdentifier() + 
JdbcAuditLogStore.AUDIT_LOG_SUFFIX;
         for (int i = 0; i < 1000; i++) {
             val projectName = "p" + (i + 1000);
             String unitId = RandomUtil.randomUUIDStr();
@@ -561,10 +561,10 @@ class JdbcAuditLogStoreTest {
         val dataSource = BasicDataSourceFactory.createDataSource(props);
         val transactionManager = new DataSourceTransactionManager(dataSource);
         val auditLogStore = new JdbcAuditLogStore(config, jdbcTemplate, 
transactionManager,
-                info.getTableName() + "_audit_log");
+                info.getTableName() + JdbcAuditLogStore.AUDIT_LOG_SUFFIX);
         auditLogStore.createIfNotExist();
 
-        val auditLogTableName = info.getTableName() + "_audit_log";
+        val auditLogTableName = info.getTableName() + 
JdbcAuditLogStore.AUDIT_LOG_SUFFIX;
         for (int i = 0; i < 1000; i++) {
             val projectNamePrefix = "p" + (i + 1000);
             String unitId = RandomUtil.randomUUIDStr();
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStoreTool.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStoreTool.java
index d17d25f36f..a81b0c3958 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStoreTool.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStoreTool.java
@@ -44,14 +44,20 @@ public class JdbcAuditLogStoreTool {
     public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
     private static final String LOCAL_INSTANCE = "127.0.0.1";
 
+    public static AuditLog createProjectAuditLog(String projectName, long 
mvcc, String uuid, String unitId
+            , boolean diffFlag) {
+        return createProjectAuditLog(projectName, mvcc, uuid, unitId, null, 
diffFlag);
+    }
+
     public static AuditLog createProjectAuditLog(String projectName, long 
mvcc, String uuid, String unitId,
-            boolean diffFlag) {
+                                                 String modelUuid, boolean 
diffFlag) {
         AuditLog resource = new AuditLog();
         resource.setResPath("PROJECT/" + projectName);
         resource.setDiffFlag(diffFlag);
         resource.setProject(uuid);
         resource.setInstance(LOCAL_INSTANCE);
         resource.setUnitId(unitId);
+        resource.setModelUuid(modelUuid);
         resource.setByteSource(ByteSource.wrap(("{ \"uuid\" : \"" + uuid + 
"\",\"meta_key\" : \"" + projectName
                 + "\",\"name\" : \"" + projectName + 
"\"}").getBytes(DEFAULT_CHARSET)));
         resource.setTimestamp(System.currentTimeMillis());
@@ -61,7 +67,12 @@ public class JdbcAuditLogStoreTool {
 
     public static AuditLog createProjectAuditLog(String projectName, long 
mvcc) {
         return createProjectAuditLog(projectName, mvcc, 
UUID.randomUUID().toString(), UUID.randomUUID().toString(),
-                false);
+                null, false);
+    }
+
+    public static AuditLog createProjectAuditLog(String projectName, String 
modelUuid, long mvcc) {
+        return createProjectAuditLog(projectName, mvcc, 
UUID.randomUUID().toString(), UUID.randomUUID().toString(),
+                modelUuid, false);
     }
 
     public static AuditLog createProjectAuditLog(String projectName, long 
mvcc, String uuid, String unitId,
@@ -100,7 +111,7 @@ public class JdbcAuditLogStoreTool {
 
     public static void prepareJdbcAuditLogStore(String projectPrefixName, 
JdbcTemplate jdbcTemplate, long logNum) {
         val url = getTestConfig().getMetadataUrl();
-        val table = url.getIdentifier() + "_audit_log";
+        val table = url.getIdentifier() + JdbcAuditLogStore.AUDIT_LOG_SUFFIX;
         for (int i = 0; i < logNum; i++) {
             val projectName = (projectPrefixName != null ? projectPrefixName : 
"p") + i;
             String unitId = RandomUtil.randomUUIDStr();
@@ -108,7 +119,7 @@ public class JdbcAuditLogStoreTool {
                     "PROJECT/" + projectName,
                     ("{ \"uuid\" : \"" + RandomUtil.randomUUIDStr() + 
"\",\"meta_key\" : \"" + projectName
                             + "\",\"name\" : \"" + projectName + 
"\"}").getBytes(DEFAULT_CHARSET),
-                    System.currentTimeMillis(), 0, unitId, null, 
AddressUtil.getLocalInstance(), projectName, false);
+                    System.currentTimeMillis(), 0, unitId, null, null, 
AddressUtil.getLocalInstance(), projectName, false);
         }
     }
 
@@ -118,15 +129,15 @@ public class JdbcAuditLogStoreTool {
         val url = getTestConfig().getMetadataUrl();
 
         Object[] log = isDel
-                ? new Object[] { "PROJECT/" + project, null, 
System.currentTimeMillis(), mvcc, unitId, null,
+                ? new Object[] { "PROJECT/" + project, null, 
System.currentTimeMillis(), mvcc, unitId, null, null,
                         LOCAL_INSTANCE, null, false }
                 : new Object[] { "PROJECT/" + project,
                         ("{\"name\" : \"" + project + "\",\"uuid\" : \"" + 
uuid + "\"}").getBytes(DEFAULT_CHARSET),
-                        System.currentTimeMillis(), mvcc, unitId, null, 
LOCAL_INSTANCE, null, false };
+                        System.currentTimeMillis(), mvcc, unitId, null, null, 
LOCAL_INSTANCE, null, false };
         List<Object[]> logs = new ArrayList<>();
         logs.add(log);
         jdbcTemplate.batchUpdate(
-                String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, 
url.getIdentifier() + "_audit_log"), logs);
+                String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, 
url.getIdentifier() + JdbcAuditLogStore.AUDIT_LOG_SUFFIX), logs);
     }
 
     public static void mockAuditLogForProjectEntry(String project, JdbcInfo 
info, boolean isDel) {
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcMetadataStoreTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcMetadataStoreTest.java
index 6e093d9fd9..c3d50499ff 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcMetadataStoreTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/JdbcMetadataStoreTest.java
@@ -337,7 +337,7 @@ class JdbcMetadataStoreTest {
         Assertions.assertFalse(CompressionUtils.isCompressed(contents));
 
         byte[] auditLogContents = jdbcTemplate.queryForObject(
-                "select meta_content from " + identifier + "_audit_log where 
meta_key = 'PROJECT/prj1'",
+                "select meta_content from " + identifier + 
JdbcAuditLogStore.AUDIT_LOG_SUFFIX + " where meta_key = 'PROJECT/prj1'",
                 (rs, rowNum) -> rs.getBytes(1));
 
         
Assertions.assertFalse(CompressionUtils.isCompressed(auditLogContents));
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcPartialAuditLogStoreTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcPartialAuditLogStoreTest.java
index 68439c6dc8..35695e7d68 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcPartialAuditLogStoreTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcPartialAuditLogStoreTest.java
@@ -47,8 +47,7 @@ class JdbcPartialAuditLogStoreTest {
     @Test
     void testPartialAuditLogRestore() throws Exception {
         val workerStore = ResourceStore.getKylinMetaStore(getTestConfig());
-        val auditLogStore = new JdbcPartialAuditLogStore(getTestConfig(),
-                resPath -> resPath.startsWith("PROJECT/"));
+        val auditLogStore = new JdbcPartialAuditLogStore(getTestConfig(), 
null);
         workerStore.getMetadataStore().setAuditLogStore(auditLogStore);
         auditLogStore.restore(101);
         Assertions.assertEquals(101, auditLogStore.getLogOffset());
@@ -58,24 +57,22 @@ class JdbcPartialAuditLogStoreTest {
     @Test
     void testPartialFetchAuditLog() throws Exception {
         val workerStore = ResourceStore.getKylinMetaStore(getTestConfig());
-        var auditLogStore = new JdbcPartialAuditLogStore(getTestConfig(),
-                resPath -> resPath.startsWith("PROJECT/abc"));
+        var auditLogStore = new JdbcPartialAuditLogStore(getTestConfig(), 
"uuid1");
         workerStore.getMetadataStore().setAuditLogStore(auditLogStore);
 
         workerStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, new 
StringEntity(RandomUtil.randomUUIDStr()),
                 StringEntity.serializer);
         Assertions.assertEquals(1, 
workerStore.listResourcesRecursively(MetadataType.ALL.name()).size());
         auditLogStore.batchInsert(Arrays.asList(
-                JdbcAuditLogStoreTool.createProjectAuditLog("abc", 0),
-                JdbcAuditLogStoreTool.createProjectAuditLog("abc2", 0),
-                JdbcAuditLogStoreTool.createProjectAuditLog("abc3", 0),
-                JdbcAuditLogStoreTool.createProjectAuditLog("abc4", 0),
-                JdbcAuditLogStoreTool.createProjectAuditLog("t1", 0)
+                JdbcAuditLogStoreTool.createProjectAuditLog("abc", "uuid1", 0),
+                JdbcAuditLogStoreTool.createProjectAuditLog("abc", null, 0),
+                JdbcAuditLogStoreTool.createProjectAuditLog("abc2", "uuid1", 
0),
+                JdbcAuditLogStoreTool.createProjectAuditLog("abc2", "uuid2", 0)
                 )
         );
         auditLogStore.catchupWithMaxTimeout();
         var totalR = workerStore.listResourcesRecursively("PROJECT");
-        Assertions.assertEquals(4, totalR.size());
+        Assertions.assertEquals(2, totalR.size());
         auditLogStore.close();
     }
 
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/transaction/AuditReplayWorkerTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/transaction/AuditReplayWorkerTest.java
index 6337eb2d52..0d692c47df 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/transaction/AuditReplayWorkerTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/transaction/AuditReplayWorkerTest.java
@@ -285,7 +285,8 @@ public class AuditReplayWorkerTest {
         ReflectionTestUtils.invokeMethod(replayWorker, 
"recordStepAbsentIdList", stepWin,
                 Collections.singletonList(
                         new AuditLog(101L, "adaasd", 
RawResourceTool.createByteSource("adaasd"),
-                                1L, 1L, null, null, null, null, false)));
+                                1L, 1L, null, null, null, null,
+                                null, false)));
         Assertions.assertTrue(delayIdQueue.isEmpty());
         replayWorker.close(true);
     }
@@ -302,11 +303,14 @@ public class AuditReplayWorkerTest {
 
         val auditLogs = Arrays.asList(
                 new AuditLog(101L, "adaasd", 
RawResourceTool.createByteSource("adaasd"),
-                        System.currentTimeMillis() - timeout * 2, 1L, null, 
null, null, null, false),
+                        System.currentTimeMillis() - timeout * 2, 1L, null, 
null,
+                        null, null, null, false),
                 new AuditLog(102L, "adaasd", 
RawResourceTool.createByteSource("adaasd"),
-                        System.currentTimeMillis() - timeout * 2, 1L, null, 
null, null, null, false),
+                        System.currentTimeMillis() - timeout * 2, 1L, null, 
null,
+                        null, null, null, false),
                 new AuditLog(103L, "adaasd", 
RawResourceTool.createByteSource("adaasd"),
-                        System.currentTimeMillis() - timeout * 2, 1L, null, 
null, null, null, false));
+                        System.currentTimeMillis() - timeout * 2, 1L, null, 
null,
+                        null, null, null, false));
 
         ReflectionTestUtils.invokeMethod(replayWorker, 
"recordStepAbsentIdList", stepWin, auditLogs);
 
@@ -326,11 +330,14 @@ public class AuditReplayWorkerTest {
 
         val auditLogs = Arrays.asList(
                 new AuditLog(101L, "adaasd", 
RawResourceTool.createByteSource("adaasd"),
-                        System.currentTimeMillis(), 1L, null, null, null, 
null, false),
+                        System.currentTimeMillis(), 1L, null, null, null, null,
+                        null, false),
                 new AuditLog(102L, "adaasd", 
RawResourceTool.createByteSource("adaasd"),
-                        System.currentTimeMillis(), 1L, null, null, null, 
null, false),
+                        System.currentTimeMillis(), 1L, null, null, null, null,
+                        null, false),
                 new AuditLog(103L, "adaasd", 
RawResourceTool.createByteSource("adaasd"),
-                        System.currentTimeMillis(), 1L, null, null, null, 
null, false));
+                        System.currentTimeMillis(), 1L, null, null, null, null,
+                        null, false));
 
         ReflectionTestUtils.invokeMethod(replayWorker, 
"recordStepAbsentIdList", stepWin, auditLogs);
 
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/util/AuditLogCheckerTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/util/AuditLogCheckerTest.java
index f8422efda2..4341b2a138 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/util/AuditLogCheckerTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/util/AuditLogCheckerTest.java
@@ -36,7 +36,8 @@ class AuditLogCheckerTest {
     public void testVerify() {
         byte[] removePatch = 
"[{\"op\":\"remove\",\"path\":\"\"}]".getBytes(StandardCharsets.UTF_8);
         AuditLog auditLog = new AuditLog(1L, "PROJECT/abc", 
ByteSource.wrap(removePatch), System.currentTimeMillis(),
-                0L, UUID.randomUUID().toString(), null, LOCAL_INSTANCE, null, 
true);
+                0L, UUID.randomUUID().toString(), null, null, LOCAL_INSTANCE,
+                null, true);
         try {
             verify(auditLog, false);
             Assertions.fail("Not patch mode, but audit log is patch format!");
@@ -46,6 +47,7 @@ class AuditLogCheckerTest {
 
         verify(new AuditLog(1L, "PROJECT/abc",
                 ByteSource.wrap("[{\"op\": \"value1\", \"path\": 
\"\"}]".getBytes(StandardCharsets.UTF_8)),
-                System.currentTimeMillis(), 0L, UUID.randomUUID().toString(), 
null, LOCAL_INSTANCE, null, true), true);
+                System.currentTimeMillis(), 0L, UUID.randomUUID().toString(), 
null,
+                null, LOCAL_INSTANCE, null, true), true);
     }
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java
index 507da7bf6a..776f8a5180 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java
@@ -41,8 +41,6 @@ import lombok.EqualsAndHashCode;
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
 public class FusionModel extends RootPersistentEntity implements Serializable {
 
-    private String project;
-
     @EqualsAndHashCode.Include
     @JsonProperty("alias")
     private String alias;
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelJdbcMetadataTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelJdbcMetadataTest.java
new file mode 100644
index 0000000000..2b641a1dc0
--- /dev/null
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelJdbcMetadataTest.java
@@ -0,0 +1,27 @@
+package org.apache.kylin.metadata.streaming;
+
+import org.apache.kylin.junit.annotation.JdbcMetadataInfo;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.metadata.model.FusionModel;
+import org.apache.kylin.metadata.model.FusionModelManager;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import static org.apache.kylin.common.util.TestUtils.getTestConfig;
+
+@MetadataInfo
+@JdbcMetadataInfo
+public class FusionModelJdbcMetadataTest {
+
+    private static String PROJECT = "streaming_test";
+    private static FusionModelManager mgr;
+
+    @Test
+    public void testNewFusionModel() {
+        FusionModel copy = new FusionModel();
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() ->
+                FusionModelManager.getInstance(getTestConfig(), 
PROJECT).createModel(copy), PROJECT);
+        mgr = FusionModelManager.getInstance(getTestConfig(), PROJECT);
+        Assertions.assertNotNull(mgr.getFusionModel(copy.getId()));
+    }
+}
diff --git 
a/src/kylin-server-it/src/test/java/org/apache/kylin/event/HAMetadataTest.java 
b/src/kylin-server-it/src/test/java/org/apache/kylin/event/HAMetadataTest.java
index 05aed59705..1dc47a1c66 100644
--- 
a/src/kylin-server-it/src/test/java/org/apache/kylin/event/HAMetadataTest.java
+++ 
b/src/kylin-server-it/src/test/java/org/apache/kylin/event/HAMetadataTest.java
@@ -156,7 +156,7 @@ public class HAMetadataTest extends 
NLocalFileMetadataTestCase {
 
         await().atMost(3, TimeUnit.SECONDS)
                 .until(() -> 7 == 
queryResourceStore.listResourcesRecursively(MetadataType.ALL.name()).size());
-        String table = getTestConfig().getMetadataUrl().getIdentifier() + 
"_audit_log";
+        String table = getTestConfig().getMetadataUrl().getIdentifier() + 
JdbcAuditLogStore.AUDIT_LOG_SUFFIX;
         val auditCount = 
getJdbcTemplate().queryForObject(String.format(Locale.ROOT, "select count(*) 
from %s", table),
                 Long.class);
         Assert.assertEquals(12L, auditCount.longValue());
@@ -188,7 +188,7 @@ public class HAMetadataTest extends 
NLocalFileMetadataTestCase {
             resourceStore.checkAndPutResource("PROJECT/path3", 
ByteSource.wrap("{ \"mvcc\": 3 }".getBytes(charset)), 2);
             return 0;
         }, "p0");
-        String table = getTestConfig().getMetadataUrl().getIdentifier() + 
"_audit_log";
+        String table = getTestConfig().getMetadataUrl().getIdentifier() + 
JdbcAuditLogStore.AUDIT_LOG_SUFFIX;
         getJdbcTemplate().update(String.format(Locale.ROOT, "delete from %s 
where id=7", table));
         try {
             queryResourceStore.catchup();
diff --git 
a/src/streaming/src/main/java/org/apache/kylin/streaming/app/StreamingApplication.java
 
b/src/streaming/src/main/java/org/apache/kylin/streaming/app/StreamingApplication.java
index 3c9e2a9cdb..744df3e877 100644
--- 
a/src/streaming/src/main/java/org/apache/kylin/streaming/app/StreamingApplication.java
+++ 
b/src/streaming/src/main/java/org/apache/kylin/streaming/app/StreamingApplication.java
@@ -20,13 +20,11 @@ package org.apache.kylin.streaming.app;
 
 import static org.apache.kylin.common.persistence.MetadataType.STREAMING_JOB;
 import static 
org.apache.kylin.common.persistence.metadata.FileSystemMetadataStore.HDFS_SCHEME;
-import static 
org.apache.kylin.metadata.cube.model.NDataSegDetails.DATAFLOW_DETAILS_RESOURCE_ROOT;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -99,10 +97,7 @@ public abstract class StreamingApplication implements 
Application, GracefulStopI
         }
 
         //init audit log store
-        val auditLogStore = new JdbcPartialAuditLogStore(kylinConfig,
-                resPath -> resPath.startsWith(
-                        String.format(Locale.ROOT, "/%s%s/%s", project, 
DATAFLOW_DETAILS_RESOURCE_ROOT, dataflowId))
-                        || getMetaResPathSet().contains(resPath));
+        val auditLogStore = new JdbcPartialAuditLogStore(kylinConfig, 
dataflowId);
 
         kylinConfig.setMetadataUrl(distMetaUrl);
 
diff --git 
a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
 
b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
index c067ac8b7f..d3898c481e 100644
--- 
a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
+++ 
b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
@@ -196,7 +196,9 @@ public class StreamingJobLauncher extends 
AbstractSparkJobLauncher {
         val metaSet = NDataflowManager.getInstance(config, 
project).getDataflow(modelId)
                 .collectPrecalculationResource();
         metaSet.add(ResourceStore.METASTORE_IMAGE);
-        metaSet.add(MetadataType.mergeKeyWithType(jobId, STREAMING_JOB));
+        String uuid = jobId.substring(0, 36);
+        metaSet.add(MetadataType.mergeKeyWithType(uuid + "_build", 
STREAMING_JOB));
+        metaSet.add(MetadataType.mergeKeyWithType(uuid + "_merge", 
STREAMING_JOB));
         return metaSet;
     }
 
diff --git 
a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
 
b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
index bcb348d1e1..45875b0275 100644
--- 
a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
+++ 
b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
@@ -426,7 +426,7 @@ public class StreamingJobLauncherTest extends 
NLocalFileMetadataTestCase {
         launcher.init(PROJECT, modelId, JobTypeEnum.STREAMING_BUILD);
 
         val dumpSet = launcher.getMetadataDumpList();
-        Assert.assertEquals(17, dumpSet.size());
+        Assert.assertEquals(18, dumpSet.size());
 
         
Assert.assertTrue(dumpSet.contains("DATAFLOW/e78a89dd-847f-4574-8afa-8768b4228b72"));
         
Assert.assertTrue(dumpSet.contains("SEGMENT/c380dd2a-43b8-4268-b73d-2a5f76236631"));
@@ -445,6 +445,7 @@ public class StreamingJobLauncherTest extends 
NLocalFileMetadataTestCase {
         
Assert.assertTrue(dumpSet.contains("TABLE_INFO/streaming_test.SSB.PART"));
         Assert.assertTrue(dumpSet.contains(METASTORE_IMAGE));
         
Assert.assertTrue(dumpSet.contains("STREAMING_JOB/e78a89dd-847f-4574-8afa-8768b4228b72_build"));
+        
Assert.assertTrue(dumpSet.contains("STREAMING_JOB/e78a89dd-847f-4574-8afa-8768b4228b72_merge"));
 
     }
 
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/upgrade/UpdateAuditLogTableColumnLengthCLI.java
 
b/src/tool/src/main/java/org/apache/kylin/tool/upgrade/UpdateAuditLogTableColumnLengthCLI.java
index 2170c0a0fe..fdc1d9389c 100644
--- 
a/src/tool/src/main/java/org/apache/kylin/tool/upgrade/UpdateAuditLogTableColumnLengthCLI.java
+++ 
b/src/tool/src/main/java/org/apache/kylin/tool/upgrade/UpdateAuditLogTableColumnLengthCLI.java
@@ -26,6 +26,7 @@ import java.util.Locale;
 import java.util.Objects;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.metadata.JdbcAuditLogStore;
 import org.apache.kylin.common.persistence.metadata.JdbcDataSource;
 import org.apache.kylin.common.util.Unsafe;
 import org.springframework.dao.EmptyResultDataAccessException;
@@ -38,7 +39,7 @@ import lombok.extern.slf4j.Slf4j;
 public class UpdateAuditLogTableColumnLengthCLI {
     private static final String SHOW_TABLE = "SHOW TABLES LIKE '%s'";
     private static final String UPDATE_COL_TO_TABLE_SQL = "alter table %s 
modify column %s %s";
-    private static final String TABLE_SUFFIX = "_audit_log";
+    private static final String TABLE_SUFFIX = 
JdbcAuditLogStore.AUDIT_LOG_SUFFIX;
     private static final String AUDIT_LOG_TABLE_OPERATOR = "operator";
     private static final int COLUMN_LENGTH = 200;
 
diff --git a/src/tool/src/test/java/org/apache/kylin/tool/AuditLogToolTest.java 
b/src/tool/src/test/java/org/apache/kylin/tool/AuditLogToolTest.java
index f32c2697ef..59ad06650d 100644
--- a/src/tool/src/test/java/org/apache/kylin/tool/AuditLogToolTest.java
+++ b/src/tool/src/test/java/org/apache/kylin/tool/AuditLogToolTest.java
@@ -244,7 +244,7 @@ public class AuditLogToolTest extends 
NLocalFileMetadataTestCase {
 
         val jdbcTemplate = getJdbcTemplate();
         long before = jdbcTemplate.queryForObject(String.format(Locale.ROOT,
-                "select count(1) from test_audit_Log where meta_ts between %d 
and %d", start, end), Long.class);
+                "select count(1) from test_audit_Log_v2 where meta_ts between 
%d and %d", start, end), Long.class);
         long after = fileLines(jsonl);
         Assertions.assertThat(after).isEqualTo(before);
     }
@@ -288,7 +288,8 @@ public class AuditLogToolTest extends 
NLocalFileMetadataTestCase {
                         return new AuditLog(x.get("id").asLong(), 
x.get("meta_key").asText(),
                                 
ByteSource.wrap(JsonUtil.writeValueAsBytes(x.get("meta_content"))),
                                 x.get("meta_ts").asLong(), 
x.get("meta_mvcc").asLong(), x.get("unit_id").asText(),
-                                x.get("operator").asText(), "", null, false);
+                                x.get("model_uuid").asText(), 
x.get("operator").asText(), "", null,
+                                false);
                     } catch (IOException e) {
                         throw Throwables.propagate(e);
                     }
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/AuditLogWorkerTest.java 
b/src/tool/src/test/java/org/apache/kylin/tool/AuditLogWorkerTest.java
index 44e47a3a28..65dfec459c 100644
--- a/src/tool/src/test/java/org/apache/kylin/tool/AuditLogWorkerTest.java
+++ b/src/tool/src/test/java/org/apache/kylin/tool/AuditLogWorkerTest.java
@@ -164,7 +164,8 @@ public class AuditLogWorkerTest extends 
NLocalFileMetadataTestCase {
                     ps.setString(4, path);
                 });
             }
-            jdbcTemplate.update(String.format(Locale.ROOT, 
INSERT_AUDIT_LOG_SQL, table + "_audit_log"), ps -> {
+            jdbcTemplate.update(String.format(Locale.ROOT, 
INSERT_AUDIT_LOG_SQL,
+                    table + JdbcAuditLogStore.AUDIT_LOG_SUFFIX), ps -> {
                 ps.setString(1, path);
                 ps.setBytes(2, path.getBytes(Charset.defaultCharset()));
                 ps.setLong(3, ts);
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/security/KylinPasswordResetCLITest.java
 
b/src/tool/src/test/java/org/apache/kylin/tool/security/KylinPasswordResetCLITest.java
index 28593b4f2a..25ee34a554 100644
--- 
a/src/tool/src/test/java/org/apache/kylin/tool/security/KylinPasswordResetCLITest.java
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/security/KylinPasswordResetCLITest.java
@@ -27,6 +27,7 @@ import java.nio.charset.Charset;
 import org.apache.commons.dbcp2.BasicDataSourceFactory;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.metadata.JdbcAuditLogStore;
 import org.apache.kylin.common.persistence.metadata.jdbc.AuditLogRowMapper;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.LogOutputTestCase;
@@ -109,7 +110,7 @@ public class KylinPasswordResetCLITest extends 
LogOutputTestCase {
 
         val url = getTestConfig().getMetadataUrl();
         val jdbcTemplate = getJdbcTemplate();
-        val all = jdbcTemplate.query("select * from " + url.getIdentifier() + 
"_audit_log", new AuditLogRowMapper());
+        val all = jdbcTemplate.query("select * from " + url.getIdentifier() + 
JdbcAuditLogStore.AUDIT_LOG_SUFFIX, new AuditLogRowMapper());
         Assert.assertTrue(all.stream().anyMatch(auditLog -> 
auditLog.getResPath().equals("USER_INFO/ADMIN")));
 
         System.setOut(System.out);
diff --git 
a/src/tool/src/test/resources/ut_audit_log/ke_metadata_test_audit_log.json 
b/src/tool/src/test/resources/ut_audit_log/ke_metadata_test_audit_log.json
index b8d3b27c70..d6ae57fd70 100644
--- a/src/tool/src/test/resources/ut_audit_log/ke_metadata_test_audit_log.json
+++ b/src/tool/src/test/resources/ut_audit_log/ke_metadata_test_audit_log.json
@@ -36,6 +36,7 @@
   "meta_ts" : 1562845984668,
   "meta_mvcc" : 0,
   "unit_id" : "f9d74202-4099-44f5-ba4b-b57697021594",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 14,
@@ -87,6 +88,7 @@
   "meta_ts" : 1562845984768,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 16,
@@ -142,6 +144,7 @@
   "meta_ts" : 1562845984768,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 20,
@@ -167,6 +170,7 @@
   "meta_ts" : 1562845984768,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 24,
@@ -258,6 +262,7 @@
   "meta_ts" : 1562845984768,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 25,
@@ -294,6 +299,7 @@
   "meta_ts" : 1562845984768,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 26,
@@ -330,6 +336,7 @@
   "meta_ts" : 1562845984768,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 27,
@@ -367,6 +374,7 @@
   "meta_ts" : 1562845984769,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 28,
@@ -426,6 +434,7 @@
   "meta_ts" : 1562845984769,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 29,
@@ -517,6 +526,7 @@
   "meta_ts" : 1562845984769,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 31,
@@ -553,6 +563,7 @@
   "meta_ts" : 1562845984769,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 32,
@@ -584,6 +595,7 @@
   "meta_ts" : 1562845984769,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 33,
@@ -821,6 +833,7 @@
   "meta_ts" : 1562845984769,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 34,
@@ -850,6 +863,7 @@
   "meta_ts" : 1562845984769,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 36,
@@ -886,6 +900,7 @@
   "meta_ts" : 1562845984769,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 38,
@@ -922,6 +937,7 @@
   "meta_ts" : 1562845984769,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 40,
@@ -982,6 +998,7 @@
   "meta_ts" : 1562845984769,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 42,
@@ -1077,6 +1094,7 @@
   "meta_ts" : 1562845984769,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 43,
@@ -1113,6 +1131,7 @@
   "meta_ts" : 1562845984769,
   "meta_mvcc" : 0,
   "unit_id" : "8cb1152a-839d-4884-9e6f-0ba9e1315d94",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 759,
@@ -1140,6 +1159,7 @@
   "meta_ts" : 1562933689094,
   "meta_mvcc" : 1,
   "unit_id" : "8726b1be-e7ad-4557-b41e-8e8394c87c0f",
+  "model_uuid": null,
   "operator" : "ADMIN"
 }, {
   "id" : 760,
@@ -1165,6 +1185,7 @@
   "meta_ts" : 1562933689095,
   "meta_mvcc" : 1,
   "unit_id" : "8726b1be-e7ad-4557-b41e-8e8394c87c0f",
+  "model_uuid": null,
   "operator" : "ADMIN"
 }, {
   "id" : 761,
@@ -1207,6 +1228,7 @@
   "meta_ts" : 1562933689096,
   "meta_mvcc" : 1,
   "unit_id" : "8726b1be-e7ad-4557-b41e-8e8394c87c0f",
+  "model_uuid": null,
   "operator" : "ADMIN"
 }, {
   "id" : 762,
@@ -1249,6 +1271,7 @@
   "meta_ts" : 1562933689103,
   "meta_mvcc" : 2,
   "unit_id" : "8726b1be-e7ad-4557-b41e-8e8394c87c0f",
+  "model_uuid": null,
   "operator" : "ADMIN"
 }, {
   "id" : 764,
@@ -1309,6 +1332,7 @@
   "meta_ts" : 1562933713249,
   "meta_mvcc" : 3,
   "unit_id" : "b0f17828-61fb-4de9-8059-9fc770f2ddd8",
+  "model_uuid": null,
   "operator" : "ADMIN"
 }, {
   "id" : 789,
@@ -1338,6 +1362,7 @@
   "meta_ts" : 1562934105034,
   "meta_mvcc" : 0,
   "unit_id" : "2e61bf8b-1648-4ae3-b970-628cb88b8aa8",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 787,
@@ -1375,6 +1400,7 @@
   "meta_ts" : 1562934105028,
   "meta_mvcc" : 2,
   "unit_id" : "2e61bf8b-1648-4ae3-b970-628cb88b8aa8",
+  "model_uuid": null,
   "operator" : null
 }, {
   "id" : 788,
@@ -1400,5 +1426,6 @@
   "meta_ts" : 1562934105028,
   "meta_mvcc" : 2,
   "unit_id" : "2e61bf8b-1648-4ae3-b970-628cb88b8aa8",
+  "model_uuid": null,
   "operator" : null
-} ]
+} ]
\ No newline at end of file

Reply via email to