This is an automated email from the ASF dual-hosted git repository.

chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6871caeff7e Refactor statistics data collect (#34568)
6871caeff7e is described below

commit 6871caeff7e2f7c8c9240b55f7b3fdcbb32e27cf
Author: jiangML <1060319...@qq.com>
AuthorDate: Fri Feb 7 11:06:20 2025 +0800

    Refactor statistics data collect (#34568)
    
    * Refactor statistics data collect
    
    * Refactor statistics data collect
    
    * Fix checkstyle error
    
    * Fix e2e error
    
    * Fix test error
---
 .../data/ShardingTableStatisticsCollector.java     |  68 +++++----
 .../DialectShardingStatisticsTableCollector.java   |   6 +-
 .../MySQLShardingStatisticsTableCollector.java     |   8 +-
 .../OpenGaussShardingStatisticsTableCollector.java |   8 +-
 ...PostgreSQLShardingStatisticsTableCollector.java |   8 +-
 ...gsphere.ShardingSphereTableStatisticsCollector} |   0
 .../data/ShardingTableStatisticsCollectorTest.java |  59 +++++---
 .../MySQLShardingTableStatisticsCollectorTest.java |  60 +++++---
 ...nGaussShardingTableStatisticsCollectorTest.java |  69 ++++++---
 ...greSQLShardingTableStatisticsCollectorTest.java |  56 +++++---
 .../metadata/statistics/DatabaseStatistics.java    |   3 +-
 .../metadata/statistics/SchemaStatistics.java      |   3 +-
 .../statistics/ShardingSphereStatistics.java       |   3 +-
 ...ava => DialectDatabaseStatisticsCollector.java} |  30 ++--
 ...r.java => DialectTableStatisticsCollector.java} |  36 +++--
 .../postgresql/PostgreSQLStatisticsCollector.java  |  64 +++++++++
 .../PostgreSQLTableStatisticsCollector.java}       |  25 +---
 ...PostgreSQLPgClassTableStatisticsCollector.java} |  52 ++++---
 ...greSQLPgNamespaceTableStatisticsCollector.java} |  44 +++---
 .../ShardingSphereStatisticsCollector.java         |  65 +++++++++
 .../ShardingSphereTableStatisticsCollector.java}   |  25 +---
 .../row => utils}/RowStatisticsCollectorUtils.java |   2 +-
 ...cs.collector.DialectDatabaseStatisticsCollector |   3 +-
 ....postgresql.PostgreSQLTableStatisticsCollector} |   4 +-
 .../statistics/StatisticsRefreshEngine.java        | 154 +++++----------------
 .../statistics/StatisticsStorageEngine.java        | 103 ++++++++++++++
 .../fixture/TableStatisticsCollectorFixture.java   |  46 ------
 ...eTest.java => StatisticsStorageEngineTest.java} |  44 ++----
 28 files changed, 609 insertions(+), 439 deletions(-)

diff --git 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingTableStatisticsCollector.java
 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingTableStatisticsCollector.java
index 7654d7c446b..02c8bb5318b 100644
--- 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingTableStatisticsCollector.java
+++ 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingTableStatisticsCollector.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.sharding.metadata.data;
 
+import com.cedarsoftware.util.CaseInsensitiveMap;
 import org.apache.shardingsphere.infra.database.DatabaseTypeEngine;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
@@ -26,10 +27,7 @@ import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere.ShardingSphereTableStatisticsCollector;
 import 
org.apache.shardingsphere.infra.rule.attribute.datasource.aggregate.AggregatedDataSourceRuleAttribute;
 import 
org.apache.shardingsphere.sharding.metadata.data.dialect.DialectShardingStatisticsTableCollector;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
@@ -39,23 +37,24 @@ import javax.sql.DataSource;
 import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.Collection;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
 /**
  * Sharding table statistics collector.
  */
-public final class ShardingTableStatisticsCollector implements 
TableStatisticsCollector {
+public final class ShardingTableStatisticsCollector implements 
ShardingSphereTableStatisticsCollector {
     
-    private static final String SHARDING_TABLE_STATISTICS = 
"sharding_table_statistics";
+    private long currentId = 1;
     
     @Override
-    public Optional<TableStatistics> collect(final String databaseName, final 
ShardingSphereTable table, final ShardingSphereMetaData metaData) throws 
SQLException {
-        TableStatistics result = new 
TableStatistics(SHARDING_TABLE_STATISTICS);
+    public Collection<Map<String, Object>> collect(final String databaseName, 
final String schemaName, final String tableName, final ShardingSphereMetaData 
metaData) throws SQLException {
+        Collection<Map<String, Object>> result = new LinkedList<>();
         DatabaseType protocolType = 
metaData.getAllDatabases().iterator().next().getProtocolType();
         DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(protocolType).getDialectDatabaseMetaData();
+        currentId = 1;
         if (dialectDatabaseMetaData.getDefaultSchema().isPresent()) {
             collectFromDatabase(metaData.getDatabase(databaseName), result);
         } else {
@@ -63,34 +62,34 @@ public final class ShardingTableStatisticsCollector 
implements TableStatisticsCo
                 collectFromDatabase(each, result);
             }
         }
-        return result.getRows().isEmpty() ? Optional.empty() : 
Optional.of(result);
+        return result;
     }
     
-    private void collectFromDatabase(final ShardingSphereDatabase database, 
final TableStatistics tableStatistics) throws SQLException {
+    private void collectFromDatabase(final ShardingSphereDatabase database, 
final Collection<Map<String, Object>> rows) throws SQLException {
         Optional<ShardingRule> rule = 
database.getRuleMetaData().findSingleRule(ShardingRule.class);
         if (!rule.isPresent()) {
             return;
         }
-        collectForShardingStatisticTable(database, rule.get(), 
tableStatistics);
+        collectForShardingStatisticTable(database, rule.get(), rows);
     }
     
-    private void collectForShardingStatisticTable(final ShardingSphereDatabase 
database, final ShardingRule rule, final TableStatistics tableStatistics) 
throws SQLException {
-        int count = 1;
+    private void collectForShardingStatisticTable(final ShardingSphereDatabase 
database, final ShardingRule rule, final Collection<Map<String, Object>> rows) 
throws SQLException {
         for (ShardingTable each : rule.getShardingTables().values()) {
             for (DataNode dataNode : each.getActualDataNodes()) {
-                List<Object> row = new LinkedList<>();
-                row.add(count++);
-                row.add(database.getName());
-                row.add(each.getLogicTable());
-                row.add(dataNode.getDataSourceName());
-                row.add(dataNode.getTableName());
-                
addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnits(), 
dataNode, row, rule);
-                tableStatistics.getRows().add(new RowStatistics(row));
+                Map<String, Object> rowColumnValues = new 
CaseInsensitiveMap<>();
+                rowColumnValues.put("id", currentId++);
+                rowColumnValues.put("logic_database_name", database.getName());
+                rowColumnValues.put("logic_table_name", each.getLogicTable());
+                rowColumnValues.put("actual_database_name", 
dataNode.getDataSourceName());
+                rowColumnValues.put("actual_table_name", 
dataNode.getTableName());
+                
addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnits(), 
dataNode, rowColumnValues, rule);
+                rows.add(rowColumnValues);
             }
         }
     }
     
-    private void addTableRowsAndDataLength(final Map<String, StorageUnit> 
storageUnits, final DataNode dataNode, final List<Object> row, final 
ShardingRule rule) throws SQLException {
+    private void addTableRowsAndDataLength(final Map<String, StorageUnit> 
storageUnits, final DataNode dataNode, final Map<String, Object> 
rowColumnValues,
+                                           final ShardingRule rule) throws 
SQLException {
         DataSource dataSource;
         DatabaseType databaseType;
         StorageUnit storageUnit = 
storageUnits.get(dataNode.getDataSourceName());
@@ -103,26 +102,37 @@ public final class ShardingTableStatisticsCollector 
implements TableStatisticsCo
             databaseType = null != dataSource ? 
DatabaseTypeEngine.getStorageType(dataSource) : null;
         }
         if (null != dataSource && null != databaseType) {
-            addTableRowsAndDataLength(databaseType, dataSource, dataNode, row);
+            addTableRowsAndDataLength(databaseType, dataSource, dataNode, 
rowColumnValues);
         }
     }
     
-    private void addTableRowsAndDataLength(final DatabaseType databaseType, 
final DataSource dataSource, final DataNode dataNode, final List<Object> row) 
throws SQLException {
+    private void addTableRowsAndDataLength(final DatabaseType databaseType, 
final DataSource dataSource, final DataNode dataNode,
+                                           final Map<String, Object> 
rowColumnValues) throws SQLException {
         boolean isAppended = false;
         Optional<DialectShardingStatisticsTableCollector> dialectCollector = 
DatabaseTypedSPILoader.findService(DialectShardingStatisticsTableCollector.class,
 databaseType);
         if (dialectCollector.isPresent()) {
             try (Connection connection = dataSource.getConnection()) {
-                isAppended = dialectCollector.get().appendRow(connection, 
dataNode, row);
+                isAppended = dialectCollector.get().appendRow(connection, 
dataNode, rowColumnValues);
             }
         }
         if (!isAppended) {
-            row.add(BigDecimal.ZERO);
-            row.add(BigDecimal.ZERO);
+            rowColumnValues.put("row_count", BigDecimal.ZERO);
+            rowColumnValues.put("size", BigDecimal.ZERO);
         }
     }
     
+    @Override
+    public String getSchemaName() {
+        return "shardingsphere";
+    }
+    
+    @Override
+    public String getTableName() {
+        return "sharding_table_statistics";
+    }
+    
     @Override
     public String getType() {
-        return SHARDING_TABLE_STATISTICS;
+        return "shardingsphere.sharding_table_statistics";
     }
 }
diff --git 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/DialectShardingStatisticsTableCollector.java
 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/DialectShardingStatisticsTableCollector.java
index 34a277aebcd..0cbc4240bc9 100644
--- 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/DialectShardingStatisticsTableCollector.java
+++ 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/DialectShardingStatisticsTableCollector.java
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.List;
+import java.util.Map;
 
 /**
  * Dialect sharding statistics table data collector.
@@ -40,9 +40,9 @@ public interface DialectShardingStatisticsTableCollector 
extends DatabaseTypedSP
      *
      * @param connection connection
      * @param dataNode data node
-     * @param row row to be appended
+     * @param rowColumnValues row column values
      * @return is appended or not
      * @throws SQLException SQL exception
      */
-    boolean appendRow(Connection connection, DataNode dataNode, List<Object> 
row) throws SQLException;
+    boolean appendRow(Connection connection, DataNode dataNode, Map<String, 
Object> rowColumnValues) throws SQLException;
 }
diff --git 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/MySQLShardingStatisticsTableCollector.java
 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/MySQLShardingStatisticsTableCollector.java
index 890b5cd56dd..db4e55db34a 100644
--- 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/MySQLShardingStatisticsTableCollector.java
+++ 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/MySQLShardingStatisticsTableCollector.java
@@ -24,7 +24,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.List;
+import java.util.Map;
 
 /**
  * Sharding statistics table data collector of MySQL.
@@ -34,14 +34,14 @@ public final class MySQLShardingStatisticsTableCollector 
implements DialectShard
     private static final String FETCH_TABLE_ROWS_AND_DATA_LENGTH_SQL = "SELECT 
TABLE_ROWS, DATA_LENGTH FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? 
AND TABLE_NAME = ?";
     
     @Override
-    public boolean appendRow(final Connection connection, final DataNode 
dataNode, final List<Object> row) throws SQLException {
+    public boolean appendRow(final Connection connection, final DataNode 
dataNode, final Map<String, Object> rowColumnValues) throws SQLException {
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(FETCH_TABLE_ROWS_AND_DATA_LENGTH_SQL)) {
             preparedStatement.setString(1, connection.getCatalog());
             preparedStatement.setString(2, dataNode.getTableName());
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 if (resultSet.next()) {
-                    row.add(resultSet.getBigDecimal(TABLE_ROWS_COLUMN_NAME));
-                    row.add(resultSet.getBigDecimal(DATA_LENGTH_COLUMN_NAME));
+                    rowColumnValues.put("row_count", 
resultSet.getBigDecimal(TABLE_ROWS_COLUMN_NAME));
+                    rowColumnValues.put("size", 
resultSet.getBigDecimal(DATA_LENGTH_COLUMN_NAME));
                     return true;
                 }
             }
diff --git 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/OpenGaussShardingStatisticsTableCollector.java
 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/OpenGaussShardingStatisticsTableCollector.java
index 0fb7127bd56..95f7dcbf5cd 100644
--- 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/OpenGaussShardingStatisticsTableCollector.java
+++ 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/OpenGaussShardingStatisticsTableCollector.java
@@ -24,7 +24,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.List;
+import java.util.Map;
 
 /**
  * Sharding statistics table data collector of openGauss.
@@ -34,7 +34,7 @@ public final class OpenGaussShardingStatisticsTableCollector 
implements DialectS
     private static final String FETCH_TABLE_ROWS_AND_DATA_LENGTH_SQL = "SELECT 
RELTUPLES AS TABLE_ROWS, PG_TABLE_SIZE(?) AS DATA_LENGTH FROM PG_CLASS WHERE 
RELNAME = ?";
     
     @Override
-    public boolean appendRow(final Connection connection, final DataNode 
dataNode, final List<Object> row) throws SQLException {
+    public boolean appendRow(final Connection connection, final DataNode 
dataNode, final Map<String, Object> rowColumnValues) throws SQLException {
         if (!isTableExist(connection, dataNode.getTableName())) {
             return false;
         }
@@ -43,8 +43,8 @@ public final class OpenGaussShardingStatisticsTableCollector 
implements DialectS
             preparedStatement.setString(2, dataNode.getTableName());
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 if (resultSet.next()) {
-                    row.add(resultSet.getBigDecimal(TABLE_ROWS_COLUMN_NAME));
-                    row.add(resultSet.getBigDecimal(DATA_LENGTH_COLUMN_NAME));
+                    rowColumnValues.put("row_count", 
resultSet.getBigDecimal(TABLE_ROWS_COLUMN_NAME));
+                    rowColumnValues.put("size", 
resultSet.getBigDecimal(DATA_LENGTH_COLUMN_NAME));
                     return true;
                 }
             }
diff --git 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/PostgreSQLShardingStatisticsTableCollector.java
 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/PostgreSQLShardingStatisticsTableCollector.java
index 0e8c6fec01d..a4ac7f28d56 100644
--- 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/PostgreSQLShardingStatisticsTableCollector.java
+++ 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/PostgreSQLShardingStatisticsTableCollector.java
@@ -25,7 +25,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -38,9 +38,9 @@ public final class PostgreSQLShardingStatisticsTableCollector 
implements Dialect
     private static final String FETCH_TABLE_DATA_LENGTH_SQL = "SELECT 
PG_RELATION_SIZE(RELID) as DATA_LENGTH FROM PG_STAT_ALL_TABLES T WHERE 
SCHEMANAME= ? AND RELNAME = ?";
     
     @Override
-    public boolean appendRow(final Connection connection, final DataNode 
dataNode, final List<Object> row) throws SQLException {
-        row.add(getRowValue(connection, dataNode, FETCH_TABLE_ROWS_LENGTH_SQL, 
TABLE_ROWS_COLUMN_NAME).orElse(BigDecimal.ZERO));
-        row.add(getRowValue(connection, dataNode, FETCH_TABLE_DATA_LENGTH_SQL, 
DATA_LENGTH_COLUMN_NAME).orElse(BigDecimal.ZERO));
+    public boolean appendRow(final Connection connection, final DataNode 
dataNode, final Map<String, Object> rowColumnValues) throws SQLException {
+        rowColumnValues.put("row_count", getRowValue(connection, dataNode, 
FETCH_TABLE_ROWS_LENGTH_SQL, TABLE_ROWS_COLUMN_NAME).orElse(BigDecimal.ZERO));
+        rowColumnValues.put("size", getRowValue(connection, dataNode, 
FETCH_TABLE_DATA_LENGTH_SQL, DATA_LENGTH_COLUMN_NAME).orElse(BigDecimal.ZERO));
         return true;
     }
     
diff --git 
a/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector
 
b/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere.ShardingSphereTableStatisticsCollector
similarity index 100%
rename from 
features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector
rename to 
features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere.ShardingSphereTableStatisticsCollector
diff --git 
a/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/ShardingTableStatisticsCollectorTest.java
 
b/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/ShardingTableStatisticsCollectorTest.java
index 28152b3374f..dd485396abf 100644
--- 
a/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/ShardingTableStatisticsCollectorTest.java
+++ 
b/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/ShardingTableStatisticsCollectorTest.java
@@ -24,10 +24,8 @@ import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere.ShardingSphereTableStatisticsCollector;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.DialectTableStatisticsCollector;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.apache.shardingsphere.sharding.rule.ShardingTable;
@@ -36,13 +34,14 @@ import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
-import java.util.Optional;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -57,11 +56,11 @@ class ShardingTableStatisticsCollectorTest {
     
     private final DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
     
-    private TableStatisticsCollector statisticsCollector;
+    private DialectTableStatisticsCollector statisticsCollector;
     
     @BeforeEach
     void setUp() {
-        statisticsCollector = 
TypedSPILoader.getService(TableStatisticsCollector.class, 
"sharding_table_statistics");
+        statisticsCollector = 
TypedSPILoader.getService(ShardingSphereTableStatisticsCollector.class, 
"shardingsphere.sharding_table_statistics");
     }
     
     @Test
@@ -71,8 +70,8 @@ class ShardingTableStatisticsCollectorTest {
         when(database.getProtocolType()).thenReturn(databaseType);
         ShardingSphereMetaData metaData = new ShardingSphereMetaData(
                 Collections.singleton(database), mock(ResourceMetaData.class), 
mock(RuleMetaData.class), new ConfigurationProperties(new Properties()));
-        Optional<TableStatistics> actual = 
statisticsCollector.collect("foo_db", mock(ShardingSphereTable.class), 
metaData);
-        assertFalse(actual.isPresent());
+        Collection<Map<String, Object>> actualRows = 
statisticsCollector.collect("foo_db", "shardingsphere", 
"sharding_table_statistics", metaData);
+        assertTrue(actualRows.isEmpty());
     }
     
     @Test
@@ -85,12 +84,36 @@ class ShardingTableStatisticsCollectorTest {
         ShardingSphereDatabase database = new ShardingSphereDatabase(
                 "foo_db", databaseType, new 
ResourceMetaData(Collections.emptyMap(), storageUnits), new 
RuleMetaData(Collections.singleton(rule)), Collections.emptyList());
         ShardingSphereMetaData metaData = new 
ShardingSphereMetaData(Collections.singleton(database), mock(), mock(), new 
ConfigurationProperties(new Properties()));
-        Optional<TableStatistics> actual = 
statisticsCollector.collect("foo_db", mock(ShardingSphereTable.class), 
metaData);
-        assertTrue(actual.isPresent());
-        assertThat(actual.get().getName(), is("sharding_table_statistics"));
-        List<RowStatistics> actualRows = new 
ArrayList<>(actual.get().getRows());
-        assertThat(actualRows.size(), is(2));
-        assertThat(actualRows.get(0).getRows(), is(Arrays.asList(1, "foo_db", 
"foo_tbl", "ds_0", "foo_tbl", new BigDecimal("0"), new BigDecimal("0"))));
-        assertThat(actualRows.get(1).getRows(), is(Arrays.asList(2, "foo_db", 
"foo_tbl", "ds_1", "foo_tbl", new BigDecimal("0"), new BigDecimal("0"))));
+        Collection<Map<String, Object>> actualRows = 
statisticsCollector.collect("foo_db", "shardingsphere", 
"sharding_table_statistics", metaData);
+        assertFalse(actualRows.isEmpty());
+        Collection<Map<String, Object>> expectedRows = new LinkedList<>();
+        expectedRows.add(createRowColumnValues(1L, "foo_db", "foo_tbl", 
"ds_0", "foo_tbl", new BigDecimal("0"), new BigDecimal("0")));
+        expectedRows.add(createRowColumnValues(2L, "foo_db", "foo_tbl", 
"ds_1", "foo_tbl", new BigDecimal("0"), new BigDecimal("0")));
+        assertRowsValue(expectedRows, actualRows);
+    }
+    
+    private Map<String, Object> createRowColumnValues(final long id, final 
String logicDatabaseName, final String logicTableName, final String 
actualDatabaseName,
+                                                      final String 
actualTableName, final BigDecimal rowCount, final BigDecimal size) {
+        Map<String, Object> result = new HashMap<>();
+        result.put("id", id);
+        result.put("logic_database_name", logicDatabaseName);
+        result.put("logic_table_name", logicTableName);
+        result.put("actual_database_name", actualDatabaseName);
+        result.put("actual_table_name", actualTableName);
+        result.put("row_count", rowCount);
+        result.put("size", size);
+        return result;
+    }
+    
+    private void assertRowsValue(final Collection<Map<String, Object>> 
expectedRows, final Collection<Map<String, Object>> actualRows) {
+        assertThat(actualRows.size(), is(expectedRows.size()));
+        Iterator<Map<String, Object>> actualRowsIterator = 
actualRows.iterator();
+        for (Map<String, Object> each : expectedRows) {
+            Map<String, Object> actualRow = actualRowsIterator.next();
+            for (Entry<String, Object> entry : each.entrySet()) {
+                assertTrue(actualRow.containsKey(entry.getKey()));
+                assertThat(actualRow.get(entry.getKey()), 
is(entry.getValue()));
+            }
+        }
     }
 }
diff --git 
a/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/MySQLShardingTableStatisticsCollectorTest.java
 
b/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/MySQLShardingTableStatisticsCollectorTest.java
index 339d8134c18..a713693c920 100644
--- 
a/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/MySQLShardingTableStatisticsCollectorTest.java
+++ 
b/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/MySQLShardingTableStatisticsCollectorTest.java
@@ -24,10 +24,8 @@ import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere.ShardingSphereTableStatisticsCollector;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.DialectTableStatisticsCollector;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.apache.shardingsphere.sharding.rule.ShardingTable;
@@ -39,17 +37,19 @@ import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
-import java.util.Optional;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
@@ -59,11 +59,11 @@ class MySQLShardingTableStatisticsCollectorTest {
     
     private final DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "MySQL");
     
-    private TableStatisticsCollector statisticsCollector;
+    private DialectTableStatisticsCollector statisticsCollector;
     
     @BeforeEach
     void setUp() {
-        statisticsCollector = 
TypedSPILoader.getService(TableStatisticsCollector.class, 
"sharding_table_statistics");
+        statisticsCollector = 
TypedSPILoader.getService(ShardingSphereTableStatisticsCollector.class, 
"shardingsphere.sharding_table_statistics");
     }
     
     @Test
@@ -73,16 +73,15 @@ class MySQLShardingTableStatisticsCollectorTest {
         Map<String, StorageUnit> storageUnits = new HashMap<>(2, 1F);
         storageUnits.put("ds_0", mockStorageUnit(mock(ResultSet.class)));
         storageUnits.put("ds_1", mockStorageUnit(mockResultSet()));
-        ShardingSphereDatabase database = new ShardingSphereDatabase(
-                "foo_db", databaseType, new 
ResourceMetaData(Collections.emptyMap(), storageUnits), new 
RuleMetaData(Collections.singleton(rule)), Collections.emptyList());
+        ShardingSphereDatabase database = new ShardingSphereDatabase("foo_db", 
databaseType, new ResourceMetaData(Collections.emptyMap(), storageUnits),
+                new RuleMetaData(Collections.singleton(rule)), 
Collections.emptyList());
         ShardingSphereMetaData metaData = new 
ShardingSphereMetaData(Collections.singleton(database), mock(), mock(), new 
ConfigurationProperties(new Properties()));
-        Optional<TableStatistics> actual = 
statisticsCollector.collect("foo_db", mock(ShardingSphereTable.class), 
metaData);
-        assertTrue(actual.isPresent());
-        assertThat(actual.get().getName(), is("sharding_table_statistics"));
-        List<RowStatistics> actualRows = new 
ArrayList<>(actual.get().getRows());
-        assertThat(actualRows.size(), is(2));
-        assertThat(actualRows.get(0).getRows(), is(Arrays.asList(2, "foo_db", 
"foo_tbl", "ds_1", "foo_tbl", new BigDecimal("10"), new BigDecimal("100"))));
-        assertThat(actualRows.get(1).getRows(), is(Arrays.asList(1, "foo_db", 
"foo_tbl", "ds_0", "foo_tbl", new BigDecimal("0"), new BigDecimal("0"))));
+        Collection<Map<String, Object>> actualRows = 
statisticsCollector.collect("foo_db", "shardingsphere", 
"sharding_table_statistics", metaData);
+        assertFalse(actualRows.isEmpty());
+        Collection<Map<String, Object>> expectedRows = new LinkedList<>();
+        expectedRows.add(createRowColumnValues(1L, "foo_db", "foo_tbl", 
"ds_0", "foo_tbl", new BigDecimal("0"), new BigDecimal("0")));
+        expectedRows.add(createRowColumnValues(2L, "foo_db", "foo_tbl", 
"ds_1", "foo_tbl", new BigDecimal("10"), new BigDecimal("100")));
+        assertRowsValue(expectedRows, actualRows);
     }
     
     private StorageUnit mockStorageUnit(final ResultSet resultSet) throws 
SQLException {
@@ -101,4 +100,29 @@ class MySQLShardingTableStatisticsCollectorTest {
         when(result.getBigDecimal("DATA_LENGTH")).thenReturn(new 
BigDecimal("100"));
         return result;
     }
+    
+    private Map<String, Object> createRowColumnValues(final long id, final 
String logicDatabaseName, final String logicTableName, final String 
actualDatabaseName,
+                                                      final String 
actualTableName, final BigDecimal rowCount, final BigDecimal size) {
+        Map<String, Object> result = new HashMap<>();
+        result.put("id", id);
+        result.put("logic_database_name", logicDatabaseName);
+        result.put("logic_table_name", logicTableName);
+        result.put("actual_database_name", actualDatabaseName);
+        result.put("actual_table_name", actualTableName);
+        result.put("row_count", rowCount);
+        result.put("size", size);
+        return result;
+    }
+    
+    private void assertRowsValue(final Collection<Map<String, Object>> 
expectedRows, final Collection<Map<String, Object>> actualRows) {
+        assertThat(actualRows.size(), is(expectedRows.size()));
+        Iterator<Map<String, Object>> actualRowsIterator = 
actualRows.iterator();
+        for (Map<String, Object> each : expectedRows) {
+            Map<String, Object> actualRow = actualRowsIterator.next();
+            for (Entry<String, Object> entry : each.entrySet()) {
+                assertTrue(actualRow.containsKey(entry.getKey()));
+                assertThat(actualRow.get(entry.getKey()), 
is(entry.getValue()));
+            }
+        }
+    }
 }
diff --git 
a/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/OpenGaussShardingTableStatisticsCollectorTest.java
 
b/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/OpenGaussShardingTableStatisticsCollectorTest.java
index 2d0d8ce4f0d..0fb726d7bdc 100644
--- 
a/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/OpenGaussShardingTableStatisticsCollectorTest.java
+++ 
b/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/OpenGaussShardingTableStatisticsCollectorTest.java
@@ -24,10 +24,8 @@ import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere.ShardingSphereTableStatisticsCollector;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.DialectTableStatisticsCollector;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.apache.shardingsphere.sharding.rule.ShardingTable;
@@ -39,17 +37,19 @@ import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
-import java.util.Optional;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -60,11 +60,11 @@ class OpenGaussShardingTableStatisticsCollectorTest {
     
     private final DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "openGauss");
     
-    private TableStatisticsCollector statisticsCollector;
+    private DialectTableStatisticsCollector statisticsCollector;
     
     @BeforeEach
     void setUp() {
-        statisticsCollector = 
TypedSPILoader.getService(TableStatisticsCollector.class, 
"sharding_table_statistics");
+        statisticsCollector = 
TypedSPILoader.getService(ShardingSphereTableStatisticsCollector.class, 
"shardingsphere.sharding_table_statistics");
     }
     
     @Test
@@ -77,13 +77,12 @@ class OpenGaussShardingTableStatisticsCollectorTest {
         ShardingSphereDatabase database = new ShardingSphereDatabase(
                 "foo_db", databaseType, new 
ResourceMetaData(Collections.emptyMap(), storageUnits), new 
RuleMetaData(Collections.singleton(rule)), Collections.emptyList());
         ShardingSphereMetaData metaData = new 
ShardingSphereMetaData(Collections.singleton(database), mock(), mock(), new 
ConfigurationProperties(new Properties()));
-        Optional<TableStatistics> actual = 
statisticsCollector.collect("foo_db", mock(ShardingSphereTable.class), 
metaData);
-        assertTrue(actual.isPresent());
-        assertThat(actual.get().getName(), is("sharding_table_statistics"));
-        List<RowStatistics> actualRows = new 
ArrayList<>(actual.get().getRows());
-        assertThat(actualRows.size(), is(2));
-        assertThat(actualRows.get(0).getRows(), is(Arrays.asList(1, "foo_db", 
"foo_tbl", "ds_0", "foo_tbl", new BigDecimal("0"), new BigDecimal("0"))));
-        assertThat(actualRows.get(1).getRows(), is(Arrays.asList(2, "foo_db", 
"foo_tbl", "ds_1", "foo_tbl", new BigDecimal("0"), new BigDecimal("0"))));
+        Collection<Map<String, Object>> actualRows = 
statisticsCollector.collect("foo_db", "shardingsphere", 
"sharding_table_statistics", metaData);
+        assertFalse(actualRows.isEmpty());
+        Collection<Map<String, Object>> expectedRows = new LinkedList<>();
+        expectedRows.add(createRowColumnValues(1, "foo_db", "foo_tbl", "ds_0", 
"foo_tbl", new BigDecimal("0"), new BigDecimal("0")));
+        expectedRows.add(createRowColumnValues(2, "foo_db", "foo_tbl", "ds_1", 
"foo_tbl", new BigDecimal("0"), new BigDecimal("0")));
+        assertRowsValue(expectedRows, actualRows);
     }
     
     @Test
@@ -96,13 +95,12 @@ class OpenGaussShardingTableStatisticsCollectorTest {
         ShardingSphereDatabase database = new ShardingSphereDatabase(
                 "foo_db", databaseType, new 
ResourceMetaData(Collections.emptyMap(), storageUnits), new 
RuleMetaData(Collections.singleton(rule)), Collections.emptyList());
         ShardingSphereMetaData metaData = new 
ShardingSphereMetaData(Collections.singleton(database), mock(), mock(), new 
ConfigurationProperties(new Properties()));
-        Optional<TableStatistics> actual = 
statisticsCollector.collect("foo_db", mock(ShardingSphereTable.class), 
metaData);
-        assertTrue(actual.isPresent());
-        assertThat(actual.get().getName(), is("sharding_table_statistics"));
-        List<RowStatistics> actualRows = new 
ArrayList<>(actual.get().getRows());
-        assertThat(actualRows.size(), is(2));
-        assertThat(actualRows.get(0).getRows(), is(Arrays.asList(2, "foo_db", 
"foo_tbl", "ds_1", "foo_tbl", new BigDecimal("10"), new BigDecimal("100"))));
-        assertThat(actualRows.get(1).getRows(), is(Arrays.asList(1, "foo_db", 
"foo_tbl", "ds_0", "foo_tbl", new BigDecimal("0"), new BigDecimal("0"))));
+        Collection<Map<String, Object>> actualRows = 
statisticsCollector.collect("foo_db", "shardingsphere", 
"sharding_table_statistics", metaData);
+        assertFalse(actualRows.isEmpty());
+        Collection<Map<String, Object>> expectedRows = new LinkedList<>();
+        expectedRows.add(createRowColumnValues(1L, "foo_db", "foo_tbl", 
"ds_0", "foo_tbl", new BigDecimal("0"), new BigDecimal("0")));
+        expectedRows.add(createRowColumnValues(2L, "foo_db", "foo_tbl", 
"ds_1", "foo_tbl", new BigDecimal("10"), new BigDecimal("100")));
+        assertRowsValue(expectedRows, actualRows);
     }
     
     private StorageUnit mockStorageUnit(final ResultSet resultSet, final 
boolean tableExisted) throws SQLException {
@@ -124,4 +122,29 @@ class OpenGaussShardingTableStatisticsCollectorTest {
         when(result.getBigDecimal("DATA_LENGTH")).thenReturn(new 
BigDecimal("100"));
         return result;
     }
+    
+    private Map<String, Object> createRowColumnValues(final long id, final 
String logicDatabaseName, final String logicTableName, final String 
actualDatabaseName,
+                                                      final String 
actualTableName, final BigDecimal rowCount, final BigDecimal size) {
+        Map<String, Object> result = new HashMap<>();
+        result.put("id", id);
+        result.put("logic_database_name", logicDatabaseName);
+        result.put("logic_table_name", logicTableName);
+        result.put("actual_database_name", actualDatabaseName);
+        result.put("actual_table_name", actualTableName);
+        result.put("row_count", rowCount);
+        result.put("size", size);
+        return result;
+    }
+    
+    private void assertRowsValue(final Collection<Map<String, Object>> 
expectedRows, final Collection<Map<String, Object>> actualRows) {
+        assertThat(actualRows.size(), is(expectedRows.size()));
+        Iterator<Map<String, Object>> actualRowsIterator = 
actualRows.iterator();
+        for (Map<String, Object> each : expectedRows) {
+            Map<String, Object> actualRow = actualRowsIterator.next();
+            for (Entry<String, Object> entry : each.entrySet()) {
+                assertTrue(actualRow.containsKey(entry.getKey()));
+                assertThat(actualRow.get(entry.getKey()), 
is(entry.getValue()));
+            }
+        }
+    }
 }
diff --git 
a/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/PostgreSQLShardingTableStatisticsCollectorTest.java
 
b/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/PostgreSQLShardingTableStatisticsCollectorTest.java
index 3a28d7a076d..d8662ea74ff 100644
--- 
a/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/PostgreSQLShardingTableStatisticsCollectorTest.java
+++ 
b/features/sharding/core/src/test/java/org/apache/shardingsphere/sharding/metadata/data/dialect/type/PostgreSQLShardingTableStatisticsCollectorTest.java
@@ -24,10 +24,8 @@ import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere.ShardingSphereTableStatisticsCollector;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.DialectTableStatisticsCollector;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.apache.shardingsphere.sharding.rule.ShardingTable;
@@ -39,17 +37,19 @@ import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
-import java.util.Optional;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
@@ -59,11 +59,11 @@ class PostgreSQLShardingTableStatisticsCollectorTest {
     
     private final DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "PostgreSQL");
     
-    private TableStatisticsCollector statisticsCollector;
+    private DialectTableStatisticsCollector statisticsCollector;
     
     @BeforeEach
     void setUp() {
-        statisticsCollector = 
TypedSPILoader.getService(TableStatisticsCollector.class, 
"sharding_table_statistics");
+        statisticsCollector = 
TypedSPILoader.getService(ShardingSphereTableStatisticsCollector.class, 
"shardingsphere.sharding_table_statistics");
     }
     
     @Test
@@ -76,13 +76,12 @@ class PostgreSQLShardingTableStatisticsCollectorTest {
         ShardingSphereDatabase database = new ShardingSphereDatabase(
                 "foo_db", databaseType, new 
ResourceMetaData(Collections.emptyMap(), storageUnits), new 
RuleMetaData(Collections.singleton(rule)), Collections.emptyList());
         ShardingSphereMetaData metaData = new 
ShardingSphereMetaData(Collections.singleton(database), mock(), mock(), new 
ConfigurationProperties(new Properties()));
-        Optional<TableStatistics> actual = 
statisticsCollector.collect("foo_db", mock(ShardingSphereTable.class), 
metaData);
-        assertTrue(actual.isPresent());
-        assertThat(actual.get().getName(), is("sharding_table_statistics"));
-        List<RowStatistics> actualRows = new 
ArrayList<>(actual.get().getRows());
-        assertThat(actualRows.size(), is(2));
-        assertThat(actualRows.get(0).getRows(), is(Arrays.asList(2, "foo_db", 
"foo_tbl", "ds_1", "foo_tbl", new BigDecimal("10"), new BigDecimal("100"))));
-        assertThat(actualRows.get(1).getRows(), is(Arrays.asList(1, "foo_db", 
"foo_tbl", "ds_0", "foo_tbl", new BigDecimal("0"), new BigDecimal("0"))));
+        Collection<Map<String, Object>> actualRows = 
statisticsCollector.collect("foo_db", "shardingsphere", 
"sharding_table_statistics", metaData);
+        assertFalse(actualRows.isEmpty());
+        Collection<Map<String, Object>> expectedRows = new LinkedList<>();
+        expectedRows.add(createRowColumnValues(1L, "foo_db", "foo_tbl", 
"ds_0", "foo_tbl", new BigDecimal("0"), new BigDecimal("0")));
+        expectedRows.add(createRowColumnValues(2L, "foo_db", "foo_tbl", 
"ds_1", "foo_tbl", new BigDecimal("10"), new BigDecimal("100")));
+        assertRowsValue(expectedRows, actualRows);
     }
     
     private StorageUnit mockStorageUnit(final ResultSet resultSet) throws 
SQLException {
@@ -102,4 +101,29 @@ class PostgreSQLShardingTableStatisticsCollectorTest {
         when(result.getBigDecimal("DATA_LENGTH")).thenReturn(new 
BigDecimal("100"));
         return result;
     }
+    
+    private Map<String, Object> createRowColumnValues(final long id, final 
String logicDatabaseName, final String logicTableName, final String 
actualDatabaseName,
+                                                      final String 
actualTableName, final BigDecimal rowCount, final BigDecimal size) {
+        Map<String, Object> result = new HashMap<>();
+        result.put("id", id);
+        result.put("logic_database_name", logicDatabaseName);
+        result.put("logic_table_name", logicTableName);
+        result.put("actual_database_name", actualDatabaseName);
+        result.put("actual_table_name", actualTableName);
+        result.put("row_count", rowCount);
+        result.put("size", size);
+        return result;
+    }
+    
+    private void assertRowsValue(final Collection<Map<String, Object>> 
expectedRows, final Collection<Map<String, Object>> actualRows) {
+        assertThat(actualRows.size(), is(expectedRows.size()));
+        Iterator<Map<String, Object>> actualRowsIterator = 
actualRows.iterator();
+        for (Map<String, Object> each : expectedRows) {
+            Map<String, Object> actualRow = actualRowsIterator.next();
+            for (Entry<String, Object> entry : each.entrySet()) {
+                assertTrue(actualRow.containsKey(entry.getKey()));
+                assertThat(actualRow.get(entry.getKey()), 
is(entry.getValue()));
+            }
+        }
+    }
 }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/DatabaseStatistics.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/DatabaseStatistics.java
index 06cb5c9267b..e9ad16f1482 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/DatabaseStatistics.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/DatabaseStatistics.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.infra.metadata.statistics;
 import com.cedarsoftware.util.CaseInsensitiveMap;
 import lombok.Getter;
 
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -28,7 +29,7 @@ import java.util.Map;
 @Getter
 public final class DatabaseStatistics {
     
-    private final Map<String, SchemaStatistics> schemaStatisticsMap = new 
CaseInsensitiveMap<>();
+    private final Map<String, SchemaStatistics> schemaStatisticsMap = 
Collections.synchronizedMap(new CaseInsensitiveMap<>());
     
     /**
      * Judge whether to contains schema statistics.
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/SchemaStatistics.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/SchemaStatistics.java
index 689911550a7..817d28dc8bf 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/SchemaStatistics.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/SchemaStatistics.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.infra.metadata.statistics;
 import com.cedarsoftware.util.CaseInsensitiveMap;
 import lombok.Getter;
 
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -28,7 +29,7 @@ import java.util.Map;
 @Getter
 public final class SchemaStatistics {
     
-    private final Map<String, TableStatistics> tableStatisticsMap = new 
CaseInsensitiveMap<>();
+    private final Map<String, TableStatistics> tableStatisticsMap = 
Collections.synchronizedMap(new CaseInsensitiveMap<>());
     
     /**
      * Get table statistics.
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/ShardingSphereStatistics.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/ShardingSphereStatistics.java
index 6ed2de4cfe6..50536d450ca 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/ShardingSphereStatistics.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/ShardingSphereStatistics.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.infra.metadata.statistics;
 import com.cedarsoftware.util.CaseInsensitiveMap;
 import lombok.Getter;
 
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -28,7 +29,7 @@ import java.util.Map;
 @Getter
 public final class ShardingSphereStatistics {
     
-    private final Map<String, DatabaseStatistics> databaseStatisticsMap = new 
CaseInsensitiveMap<>();
+    private final Map<String, DatabaseStatistics> databaseStatisticsMap = 
Collections.synchronizedMap(new CaseInsensitiveMap<>());
     
     /**
      * Get database statistics.
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/TableStatisticsCollector.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/DialectDatabaseStatisticsCollector.java
similarity index 60%
copy from 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/TableStatisticsCollector.java
copy to 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/DialectDatabaseStatisticsCollector.java
index 6fc1303502c..997d8ecda35 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/TableStatisticsCollector.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/DialectDatabaseStatisticsCollector.java
@@ -15,31 +15,39 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.metadata.statistics.collector.table;
+package org.apache.shardingsphere.infra.metadata.statistics.collector;
 
+import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
 
 import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Map;
 import java.util.Optional;
 
 /**
- * Table statistics collector.
+ * Dialect database statistics collector.
  */
 @SingletonSPI
-public interface TableStatisticsCollector extends TypedSPI {
+public interface DialectDatabaseStatisticsCollector extends DatabaseTypedSPI {
     
     /**
-     * Collect table statistics.
+     * Get statistics schema tables.
+     *
+     * @return schema and tables
+     */
+    Map<String, Collection<String>> getStatisticsSchemaTables();
+    
+    /**
+     * Collect row column values.
      *
      * @param databaseName database name
-     * @param table table
-     * @param metaData meta data
-     * @return table statistics
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param metaData shardingsphere meta data
+     * @return row column datas
      * @throws SQLException SQL exception
      */
-    Optional<TableStatistics> collect(String databaseName, ShardingSphereTable 
table, ShardingSphereMetaData metaData) throws SQLException;
+    Optional<Collection<Map<String, Object>>> collectRowColumnValues(String 
databaseName, String schemaName, String tableName, ShardingSphereMetaData 
metaData) throws SQLException;
 }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/TableStatisticsCollector.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/DialectTableStatisticsCollector.java
similarity index 64%
copy from 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/TableStatisticsCollector.java
copy to 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/DialectTableStatisticsCollector.java
index 6fc1303502c..9ee108987ee 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/TableStatisticsCollector.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/DialectTableStatisticsCollector.java
@@ -15,31 +15,43 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.metadata.statistics.collector.table;
+package org.apache.shardingsphere.infra.metadata.statistics.collector;
 
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
 
 import java.sql.SQLException;
-import java.util.Optional;
+import java.util.Collection;
+import java.util.Map;
 
 /**
  * Table statistics collector.
  */
-@SingletonSPI
-public interface TableStatisticsCollector extends TypedSPI {
+public interface DialectTableStatisticsCollector extends TypedSPI {
     
     /**
-     * Collect table statistics.
+     * Get schema name.
+     *
+     * @return schema name
+     */
+    String getSchemaName();
+    
+    /**
+     * Get table name.
+     *
+     * @return table name
+     */
+    String getTableName();
+    
+    /**
+     * Collect.
      *
      * @param databaseName database name
-     * @param table table
-     * @param metaData meta data
-     * @return table statistics
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param metaData shardingsphere meta data
+     * @return row columns data
      * @throws SQLException SQL exception
      */
-    Optional<TableStatistics> collect(String databaseName, ShardingSphereTable 
table, ShardingSphereMetaData metaData) throws SQLException;
+    Collection<Map<String, Object>> collect(String databaseName, String 
schemaName, String tableName, ShardingSphereMetaData metaData) throws 
SQLException;
 }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/postgresql/PostgreSQLStatisticsCollector.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/postgresql/PostgreSQLStatisticsCollector.java
new file mode 100644
index 00000000000..a2eb37cfe6f
--- /dev/null
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/postgresql/PostgreSQLStatisticsCollector.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.infra.metadata.statistics.collector.postgresql;
+
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.DialectDatabaseStatisticsCollector;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Statistics collector for PostgreSQL.
+ */
+public final class PostgreSQLStatisticsCollector implements 
DialectDatabaseStatisticsCollector {
+    
+    private static final Map<String, Collection<String>> 
STATISTICS_SCHEMA_TABLES = new ConcurrentHashMap<>();
+    
+    static {
+        for (PostgreSQLTableStatisticsCollector each : 
ShardingSphereServiceLoader.getServiceInstances(PostgreSQLTableStatisticsCollector.class))
 {
+            if (!STATISTICS_SCHEMA_TABLES.containsKey(each.getSchemaName())) {
+                STATISTICS_SCHEMA_TABLES.put(each.getSchemaName(), new 
LinkedList<>());
+            }
+            
STATISTICS_SCHEMA_TABLES.get(each.getSchemaName()).add(each.getTableName());
+        }
+    }
+    
+    @Override
+    public Map<String, Collection<String>> getStatisticsSchemaTables() {
+        return STATISTICS_SCHEMA_TABLES;
+    }
+    
+    @Override
+    public Optional<Collection<Map<String, Object>>> 
collectRowColumnValues(final String databaseName, final String schemaName, 
final String tableName,
+                                                                            
final ShardingSphereMetaData metaData) throws SQLException {
+        Optional<PostgreSQLTableStatisticsCollector> tableStatisticsCollector 
= TypedSPILoader.findService(PostgreSQLTableStatisticsCollector.class, 
String.format("%s.%s", schemaName, tableName));
+        return tableStatisticsCollector.isPresent() ? 
Optional.of(tableStatisticsCollector.get().collect(databaseName, schemaName, 
tableName, metaData)) : Optional.empty();
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "PostgreSQL";
+    }
+}
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/TableStatisticsCollector.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/postgresql/PostgreSQLTableStatisticsCollector.java
similarity index 53%
copy from 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/TableStatisticsCollector.java
copy to 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/postgresql/PostgreSQLTableStatisticsCollector.java
index 6fc1303502c..d2e50f3631d 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/TableStatisticsCollector.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/postgresql/PostgreSQLTableStatisticsCollector.java
@@ -15,31 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.metadata.statistics.collector.table;
+package 
org.apache.shardingsphere.infra.metadata.statistics.collector.postgresql;
 
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.DialectTableStatisticsCollector;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
-
-import java.sql.SQLException;
-import java.util.Optional;
 
 /**
- * Table statistics collector.
+ * Statistics collector for PostgreSQL table.
  */
 @SingletonSPI
-public interface TableStatisticsCollector extends TypedSPI {
-    
-    /**
-     * Collect table statistics.
-     *
-     * @param databaseName database name
-     * @param table table
-     * @param metaData meta data
-     * @return table statistics
-     * @throws SQLException SQL exception
-     */
-    Optional<TableStatistics> collect(String databaseName, ShardingSphereTable 
table, ShardingSphereMetaData metaData) throws SQLException;
+public interface PostgreSQLTableStatisticsCollector extends 
DialectTableStatisticsCollector {
 }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/type/PgClassTableStatisticsCollector.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/postgresql/table/PostgreSQLPgClassTableStatisticsCollector.java
similarity index 50%
rename from 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/type/PgClassTableStatisticsCollector.java
rename to 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/postgresql/table/PostgreSQLPgClassTableStatisticsCollector.java
index ddcaa2057b0..74320109095 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/type/PgClassTableStatisticsCollector.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/postgresql/table/PostgreSQLPgClassTableStatisticsCollector.java
@@ -15,59 +15,55 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.infra.metadata.statistics.collector.table.type;
+package 
org.apache.shardingsphere.infra.metadata.statistics.collector.postgresql.table;
 
 import com.cedarsoftware.util.CaseInsensitiveMap;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.row.RowStatisticsCollectorUtils;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.postgresql.PostgreSQLTableStatisticsCollector;
 
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Optional;
 
 /**
- * Table statistics collector for pg_class.
+ * Table statistics collector for pg_catalog.pg_class of PostgreSQL.
  */
-public final class PgClassTableStatisticsCollector implements 
TableStatisticsCollector {
-    
-    private static final String PG_CLASS = "pg_class";
+public final class PostgreSQLPgClassTableStatisticsCollector implements 
PostgreSQLTableStatisticsCollector {
     
     private static final String PUBLIC_SCHEMA = "public";
     
-    private static final Long PUBLIC_SCHEMA_OID = 0L;
-    
     @Override
-    public Optional<TableStatistics> collect(final String databaseName, final 
ShardingSphereTable table, final ShardingSphereMetaData metaData) throws 
SQLException {
-        TableStatistics result = new TableStatistics(PG_CLASS);
+    public Collection<Map<String, Object>> collect(final String databaseName, 
final String schemaName, final String tableName, final ShardingSphereMetaData 
metaData) throws SQLException {
+        Collection<Map<String, Object>> result = new LinkedList<>();
         ShardingSphereSchema publicSchema = 
metaData.getDatabase(databaseName).getSchema(PUBLIC_SCHEMA);
         if (null != publicSchema) {
-            result.getRows().addAll(collectForSchema(0L, PUBLIC_SCHEMA_OID, 
publicSchema, table));
+            for (ShardingSphereTable each : publicSchema.getAllTables()) {
+                Map<String, Object> columnValues = new CaseInsensitiveMap<>(4, 
1F);
+                columnValues.put("oid", 0L);
+                columnValues.put("relnamespace", 0L);
+                columnValues.put("relname", each.getName());
+                columnValues.put("relkind", "r");
+                result.add(columnValues);
+            }
         }
-        return Optional.of(result);
+        return result;
     }
     
-    private Collection<RowStatistics> collectForSchema(final Long oid, final 
Long relNamespace, final ShardingSphereSchema schema, final ShardingSphereTable 
table) {
-        Collection<RowStatistics> result = new LinkedList<>();
-        for (ShardingSphereTable each : schema.getAllTables()) {
-            Map<String, Object> columnValues = new CaseInsensitiveMap<>(4, 1F);
-            columnValues.put("oid", oid);
-            columnValues.put("relnamespace", relNamespace);
-            columnValues.put("relname", each.getName());
-            columnValues.put("relkind", "r");
-            result.add(new 
RowStatistics(RowStatisticsCollectorUtils.createRowValues(columnValues, 
table)));
-        }
-        return result;
+    @Override
+    public String getSchemaName() {
+        return "pg_catalog";
+    }
+    
+    @Override
+    public String getTableName() {
+        return "pg_class";
     }
     
     @Override
     public String getType() {
-        return PG_CLASS;
+        return "pg_catalog.pg_class";
     }
 }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/type/PgNamespaceTableStatisticsCollector.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/postgresql/table/PostgreSQLPgNamespaceTableStatisticsCollector.java
similarity index 51%
rename from 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/type/PgNamespaceTableStatisticsCollector.java
rename to 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/postgresql/table/PostgreSQLPgNamespaceTableStatisticsCollector.java
index 02e86253e29..cdf348f7f1f 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/type/PgNamespaceTableStatisticsCollector.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/postgresql/table/PostgreSQLPgNamespaceTableStatisticsCollector.java
@@ -15,52 +15,52 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.infra.metadata.statistics.collector.table.type;
+package 
org.apache.shardingsphere.infra.metadata.statistics.collector.postgresql.table;
 
 import com.cedarsoftware.util.CaseInsensitiveMap;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.row.RowStatisticsCollectorUtils;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.postgresql.PostgreSQLTableStatisticsCollector;
 
 import java.sql.SQLException;
-import java.util.List;
+import java.util.Collection;
+import java.util.LinkedList;
 import java.util.Map;
-import java.util.Optional;
 
 /**
- * Table statistics collector for pg_namespace.
+ * Table statistics collector for pg_catalog.pg_namespace of PostgreSQL.
  */
-public final class PgNamespaceTableStatisticsCollector implements 
TableStatisticsCollector {
-    
-    private static final String PG_NAMESPACE = "pg_namespace";
+public final class PostgreSQLPgNamespaceTableStatisticsCollector implements 
PostgreSQLTableStatisticsCollector {
     
     private static final String PUBLIC_SCHEMA = "public";
     
     private static final Long PUBLIC_SCHEMA_OID = 0L;
     
     @Override
-    public Optional<TableStatistics> collect(final String databaseName, final 
ShardingSphereTable table, final ShardingSphereMetaData metaData) throws 
SQLException {
-        TableStatistics result = new TableStatistics(PG_NAMESPACE);
+    public Collection<Map<String, Object>> collect(final String databaseName, 
final String schemaName, final String tableName, final ShardingSphereMetaData 
metaData) throws SQLException {
+        Collection<Map<String, Object>> result = new LinkedList<>();
         long oid = 1L;
         for (ShardingSphereSchema each : 
metaData.getDatabase(databaseName).getAllSchemas()) {
-            result.getRows().add(new 
RowStatistics(getRow(PUBLIC_SCHEMA.equalsIgnoreCase(each.getName()) ? 
PUBLIC_SCHEMA_OID : oid++, each.getName(), table)));
+            Map<String, Object> columnValues = new CaseInsensitiveMap<>(2, 1F);
+            columnValues.put("oid", 
PUBLIC_SCHEMA.equalsIgnoreCase(each.getName()) ? PUBLIC_SCHEMA_OID : oid++);
+            columnValues.put("nspname", each.getName());
+            result.add(columnValues);
         }
-        return Optional.of(result);
+        return result;
     }
     
-    private List<Object> getRow(final Long oid, final String schemaName, final 
ShardingSphereTable table) {
-        Map<String, Object> columnValues = new CaseInsensitiveMap<>(2, 1F);
-        columnValues.put("oid", oid);
-        columnValues.put("nspname", schemaName);
-        return RowStatisticsCollectorUtils.createRowValues(columnValues, 
table);
+    @Override
+    public String getSchemaName() {
+        return "pg_catalog";
+    }
+    
+    @Override
+    public String getTableName() {
+        return "pg_namespace";
     }
     
     @Override
     public String getType() {
-        return PG_NAMESPACE;
+        return "pg_catalog.pg_namespace";
     }
 }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/shardingsphere/ShardingSphereStatisticsCollector.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/shardingsphere/ShardingSphereStatisticsCollector.java
new file mode 100644
index 00000000000..08bc41963cb
--- /dev/null
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/shardingsphere/ShardingSphereStatisticsCollector.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere;
+
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.DialectDatabaseStatisticsCollector;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Statistics collector for ShardingSphere.
+ */
+public final class ShardingSphereStatisticsCollector implements 
DialectDatabaseStatisticsCollector {
+    
+    private static final Map<String, Collection<String>> 
STATISTICS_SCHEMA_TABLES = new ConcurrentHashMap<>();
+    
+    static {
+        for (ShardingSphereTableStatisticsCollector each : 
ShardingSphereServiceLoader.getServiceInstances(ShardingSphereTableStatisticsCollector.class))
 {
+            if (!STATISTICS_SCHEMA_TABLES.containsKey(each.getSchemaName())) {
+                STATISTICS_SCHEMA_TABLES.put(each.getSchemaName(), new 
LinkedList<>());
+            }
+            
STATISTICS_SCHEMA_TABLES.get(each.getSchemaName()).add(each.getTableName());
+        }
+    }
+    
+    @Override
+    public Map<String, Collection<String>> getStatisticsSchemaTables() {
+        return STATISTICS_SCHEMA_TABLES;
+    }
+    
+    @Override
+    public Optional<Collection<Map<String, Object>>> 
collectRowColumnValues(final String databaseName, final String schemaName, 
final String tableName,
+                                                                            
final ShardingSphereMetaData metaData) throws SQLException {
+        Optional<ShardingSphereTableStatisticsCollector> 
tableStatisticsCollector = 
TypedSPILoader.findService(ShardingSphereTableStatisticsCollector.class,
+                String.format("%s.%s", schemaName, tableName));
+        return tableStatisticsCollector.isPresent() ? 
Optional.of(tableStatisticsCollector.get().collect(databaseName, schemaName, 
tableName, metaData)) : Optional.empty();
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "ShardingSphere";
+    }
+}
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/TableStatisticsCollector.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/shardingsphere/ShardingSphereTableStatisticsCollector.java
similarity index 53%
rename from 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/TableStatisticsCollector.java
rename to 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/shardingsphere/ShardingSphereTableStatisticsCollector.java
index 6fc1303502c..12f1ac851fb 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/table/TableStatisticsCollector.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/shardingsphere/ShardingSphereTableStatisticsCollector.java
@@ -15,31 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.metadata.statistics.collector.table;
+package 
org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere;
 
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.DialectTableStatisticsCollector;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
-
-import java.sql.SQLException;
-import java.util.Optional;
 
 /**
- * Table statistics collector.
+ * Statistics collector for shardingsphere table.
  */
 @SingletonSPI
-public interface TableStatisticsCollector extends TypedSPI {
-    
-    /**
-     * Collect table statistics.
-     *
-     * @param databaseName database name
-     * @param table table
-     * @param metaData meta data
-     * @return table statistics
-     * @throws SQLException SQL exception
-     */
-    Optional<TableStatistics> collect(String databaseName, ShardingSphereTable 
table, ShardingSphereMetaData metaData) throws SQLException;
+public interface ShardingSphereTableStatisticsCollector extends 
DialectTableStatisticsCollector {
 }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/row/RowStatisticsCollectorUtils.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/utils/RowStatisticsCollectorUtils.java
similarity index 96%
rename from 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/row/RowStatisticsCollectorUtils.java
rename to 
infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/utils/RowStatisticsCollectorUtils.java
index f263f9b9e03..a8a9d485183 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/collector/row/RowStatisticsCollectorUtils.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/statistics/utils/RowStatisticsCollectorUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.metadata.statistics.collector.row;
+package org.apache.shardingsphere.infra.metadata.statistics.utils;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
diff --git 
a/mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector
 
b/infra/common/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.DialectDatabaseStatisticsCollector
similarity index 78%
rename from 
mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector
rename to 
infra/common/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.DialectDatabaseStatisticsCollector
index 77485be7eb9..c1ee220d54d 100644
--- 
a/mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector
+++ 
b/infra/common/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.DialectDatabaseStatisticsCollector
@@ -15,4 +15,5 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.mode.fixture.TableStatisticsCollectorFixture
+org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere.ShardingSphereStatisticsCollector
+org.apache.shardingsphere.infra.metadata.statistics.collector.postgresql.PostgreSQLStatisticsCollector
diff --git 
a/infra/common/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector
 
b/infra/common/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.postgresql.PostgreSQLTableStatisticsCollector
similarity index 76%
rename from 
infra/common/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector
rename to 
infra/common/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.postgresql.PostgreSQLTableStatisticsCollector
index 6a311866ebf..30b8604326d 100644
--- 
a/infra/common/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector
+++ 
b/infra/common/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.metadata.statistics.collector.postgresql.PostgreSQLTableStatisticsCollector
@@ -15,5 +15,5 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.infra.metadata.statistics.collector.table.type.PgNamespaceTableStatisticsCollector
-org.apache.shardingsphere.infra.metadata.statistics.collector.table.type.PgClassTableStatisticsCollector
+org.apache.shardingsphere.infra.metadata.statistics.collector.postgresql.table.PostgreSQLPgNamespaceTableStatisticsCollector
+org.apache.shardingsphere.infra.metadata.statistics.collector.postgresql.table.PostgreSQLPgClassTableStatisticsCollector
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngine.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngine.java
index 90705424427..00f7f478577 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngine.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngine.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.mode.metadata.refresher.statistics;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
+import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import org.apache.shardingsphere.infra.lock.LockContext;
@@ -27,26 +28,16 @@ import 
org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.DatabaseStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.SchemaStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import 
org.apache.shardingsphere.infra.yaml.data.swapper.YamlRowStatisticsSwapper;
-import 
org.apache.shardingsphere.mode.metadata.persist.statistics.AlteredDatabaseStatistics;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.DialectDatabaseStatisticsCollector;
+import 
org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere.ShardingSphereStatisticsCollector;
 import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
  * Statistics refresh engine.
@@ -74,128 +65,55 @@ public final class StatisticsRefreshEngine {
     public void refresh() {
         try {
             if 
(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED))
 {
-                collectAndRefresh();
+                LockContext lockContext = 
contextManager.getComputeNodeInstanceContext().getLockContext();
+                GlobalLockDefinition lockDefinition = new 
GlobalLockDefinition(new StatisticsLock());
+                if (lockContext.tryLock(lockDefinition, 5000L)) {
+                    try {
+                        refreshStatistics();
+                    } finally {
+                        lockContext.unlock(lockDefinition);
+                    }
+                }
             }
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
-            log.error("Collect statistics error.", ex);
-        }
-    }
-    
-    private void collectAndRefresh() {
-        LockContext lockContext = 
contextManager.getComputeNodeInstanceContext().getLockContext();
-        GlobalLockDefinition lockDefinition = new GlobalLockDefinition(new 
StatisticsLock());
-        if (lockContext.tryLock(lockDefinition, 5000L)) {
-            try {
-                ShardingSphereStatistics currentStatistics = 
contextManager.getMetaDataContexts().getStatistics();
-                ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData();
-                ShardingSphereStatistics changedStatistics = new 
ShardingSphereStatistics();
-                for (Entry<String, DatabaseStatistics> entry : 
currentStatistics.getDatabaseStatisticsMap().entrySet()) {
-                    if (metaData.containsDatabase(entry.getKey())) {
-                        collectForDatabase(entry.getKey(), entry.getValue(), 
metaData, changedStatistics);
-                    }
-                }
-                compareAndUpdate(currentStatistics, changedStatistics, 
metaData);
-            } finally {
-                lockContext.unlock(lockDefinition);
-            }
+            log.error("Refresh statistics error.", ex);
         }
     }
     
-    private void collectForDatabase(final String databaseName, final 
DatabaseStatistics databaseStatistics, final ShardingSphereMetaData metaData, 
final ShardingSphereStatistics statistics) {
-        for (Entry<String, SchemaStatistics> entry : 
databaseStatistics.getSchemaStatisticsMap().entrySet()) {
-            if 
(metaData.getDatabase(databaseName).containsSchema(entry.getKey())) {
-                collectForSchema(databaseName, entry.getKey(), 
entry.getValue(), metaData, statistics);
-            }
-        }
-    }
-    
-    private void collectForSchema(final String databaseName, final String 
schemaName, final SchemaStatistics schemaStatistics,
-                                  final ShardingSphereMetaData metaData, final 
ShardingSphereStatistics statistics) {
-        for (Entry<String, TableStatistics> entry : 
schemaStatistics.getTableStatisticsMap().entrySet()) {
-            if 
(metaData.getDatabase(databaseName).getSchema(schemaName).containsTable(entry.getKey()))
 {
-                collectForTable(databaseName, schemaName, 
metaData.getDatabase(databaseName).getSchema(schemaName).getTable(entry.getKey()),
 metaData, statistics);
-            }
+    private void refreshStatistics() {
+        ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData();
+        for (ShardingSphereDatabase each : metaData.getAllDatabases()) {
+            refreshForDatabase(metaData, each);
         }
     }
     
-    private void collectForTable(final String databaseName, final String 
schemaName, final ShardingSphereTable table,
-                                 final ShardingSphereMetaData metaData, final 
ShardingSphereStatistics statistics) {
-        Optional<TableStatisticsCollector> tableStatisticsCollector = 
TypedSPILoader.findService(TableStatisticsCollector.class, table.getName());
-        Optional<TableStatistics> tableStatistics;
-        if (tableStatisticsCollector.isPresent()) {
-            try {
-                tableStatistics = 
tableStatisticsCollector.get().collect(databaseName, table, metaData);
-                // CHECKSTYLE:OFF
-            } catch (final Exception ex) {
-                // CHECKSTYLE:ON
-                log.error("Collect {}.{}.{} statistics failed.", databaseName, 
schemaName, table.getName(), ex);
-                tableStatistics = Optional.empty();
-            }
-        } else {
-            tableStatistics = Optional.empty();
+    private void refreshForDatabase(final ShardingSphereMetaData metaData, 
final ShardingSphereDatabase database) {
+        for (ShardingSphereSchema each : database.getAllSchemas()) {
+            refreshForSchema(metaData, database.getName(), each);
         }
-        DatabaseStatistics databaseStatistics = 
statistics.containsDatabaseStatistics(databaseName) ? 
statistics.getDatabaseStatistics(databaseName) : new DatabaseStatistics();
-        SchemaStatistics schemaStatistics = 
databaseStatistics.containsSchemaStatistics(schemaName) ? 
databaseStatistics.getSchemaStatistics(schemaName) : new SchemaStatistics();
-        tableStatistics.ifPresent(optional -> 
schemaStatistics.putTableStatistics(table.getName(), optional));
-        databaseStatistics.putSchemaStatistics(schemaName, schemaStatistics);
-        statistics.putDatabaseStatistics(databaseName, databaseStatistics);
     }
     
-    private void compareAndUpdate(final ShardingSphereStatistics 
currentStatistics, final ShardingSphereStatistics changedStatistics, final 
ShardingSphereMetaData metaData) {
-        for (Entry<String, DatabaseStatistics> entry : 
changedStatistics.getDatabaseStatisticsMap().entrySet()) {
-            compareAndUpdateForDatabase(metaData.getDatabase(entry.getKey()), 
currentStatistics, currentStatistics.getDatabaseStatistics(entry.getKey()), 
entry.getValue());
-        }
-        for (Entry<String, DatabaseStatistics> entry : 
currentStatistics.getDatabaseStatisticsMap().entrySet()) {
-            if (!changedStatistics.containsDatabaseStatistics(entry.getKey())) 
{
-                currentStatistics.dropDatabaseStatistics(entry.getKey());
-                
contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getStatisticsService().delete(entry.getKey());
-            }
+    private void refreshForSchema(final ShardingSphereMetaData metaData, final 
String databaseName, final ShardingSphereSchema schema) {
+        for (ShardingSphereTable each : schema.getAllTables()) {
+            refreshForTable(metaData, databaseName, schema.getName(), each);
         }
     }
     
-    private void compareAndUpdateForDatabase(final ShardingSphereDatabase 
database, final ShardingSphereStatistics currentStatistics,
-                                             final DatabaseStatistics 
currentDatabaseStatistics, final DatabaseStatistics changedDatabaseStatistics) {
-        for (Entry<String, SchemaStatistics> entry : 
changedDatabaseStatistics.getSchemaStatisticsMap().entrySet()) {
-            compareAndUpdateForSchema(database.getName(), 
database.getSchema(entry.getKey()), currentStatistics, 
currentDatabaseStatistics.getSchemaStatistics(entry.getKey()), 
entry.getValue());
-        }
-    }
-    
-    private void compareAndUpdateForSchema(final String databaseName, final 
ShardingSphereSchema schema, final ShardingSphereStatistics currentStatistics,
-                                           final SchemaStatistics 
currentSchemaStatistics, final SchemaStatistics changedSchemaStatistics) {
-        for (Entry<String, TableStatistics> entry : 
changedSchemaStatistics.getTableStatisticsMap().entrySet()) {
-            compareAndUpdateForTable(databaseName, schema.getName(), 
schema.getTable(entry.getKey()), currentStatistics, 
currentSchemaStatistics.getTableStatistics(entry.getKey()), entry.getValue());
-        }
-    }
-    
-    private void compareAndUpdateForTable(final String databaseName, final 
String schemaName, final ShardingSphereTable table,
-                                          final ShardingSphereStatistics 
currentStatistics, final TableStatistics currentTableStatistics, final 
TableStatistics changedTableStatistics) {
-        if (!currentTableStatistics.equals(changedTableStatistics)) {
-            
currentStatistics.getDatabaseStatistics(databaseName).getSchemaStatistics(schemaName).putTableStatistics(changedTableStatistics.getName(),
 changedTableStatistics);
-            AlteredDatabaseStatistics alteredDatabaseStatistics = 
createAlteredDatabaseStatistics(databaseName, schemaName, table, 
currentTableStatistics, changedTableStatistics);
-            
contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getStatisticsService().update(alteredDatabaseStatistics);
-        }
-    }
-    
-    private AlteredDatabaseStatistics createAlteredDatabaseStatistics(final 
String databaseName, final String schemaName, final ShardingSphereTable table,
-                                                                      final 
TableStatistics currentTableStatistics, final TableStatistics 
changedTableStatistics) {
-        AlteredDatabaseStatistics result = new 
AlteredDatabaseStatistics(databaseName, schemaName, 
currentTableStatistics.getName());
-        Map<String, RowStatistics> tableStatisticsMap = 
currentTableStatistics.getRows().stream().collect(Collectors.toMap(RowStatistics::getUniqueKey,
 Function.identity()));
-        Map<String, RowStatistics> changedTableStatisticsMap = 
changedTableStatistics.getRows().stream().collect(Collectors.toMap(RowStatistics::getUniqueKey,
 Function.identity()));
-        YamlRowStatisticsSwapper swapper = new YamlRowStatisticsSwapper(new 
ArrayList<>(table.getAllColumns()));
-        for (Entry<String, RowStatistics> entry : 
changedTableStatisticsMap.entrySet()) {
-            if (!tableStatisticsMap.containsKey(entry.getKey())) {
-                
result.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
-            } else if 
(!tableStatisticsMap.get(entry.getKey()).equals(entry.getValue())) {
-                
result.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
-            }
-        }
-        for (Entry<String, RowStatistics> entry : 
tableStatisticsMap.entrySet()) {
-            if (!changedTableStatisticsMap.containsKey(entry.getKey())) {
-                
result.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+    private void refreshForTable(final ShardingSphereMetaData metaData, final 
String databaseName, final String schemaName, final ShardingSphereTable table) {
+        try {
+            Optional<DialectDatabaseStatisticsCollector> 
dialectStatisticsCollector = "shardingsphere".equalsIgnoreCase(schemaName)
+                    ? Optional.of(new ShardingSphereStatisticsCollector())
+                    : 
DatabaseTypedSPILoader.findService(DialectDatabaseStatisticsCollector.class, 
metaData.getDatabase(databaseName).getProtocolType());
+            if (dialectStatisticsCollector.isPresent()) {
+                Optional<Collection<Map<String, Object>>> rowColumnValues = 
dialectStatisticsCollector.get().collectRowColumnValues(databaseName, 
schemaName, table.getName(), metaData);
+                rowColumnValues.ifPresent(optional -> new 
StatisticsStorageEngine(contextManager, databaseName, schemaName, 
table.getName(), optional).storage());
             }
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            log.error("Refresh {}.{}.{} statistics failed.", databaseName, 
schemaName, table.getName(), ex);
         }
-        return result;
     }
 }
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsStorageEngine.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsStorageEngine.java
new file mode 100644
index 00000000000..56b8c99a8be
--- /dev/null
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsStorageEngine.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.refresher.statistics;
+
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.metadata.statistics.DatabaseStatistics;
+import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
+import org.apache.shardingsphere.infra.metadata.statistics.SchemaStatistics;
+import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
+import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
+import 
org.apache.shardingsphere.infra.metadata.statistics.utils.RowStatisticsCollectorUtils;
+import 
org.apache.shardingsphere.infra.yaml.data.swapper.YamlRowStatisticsSwapper;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.metadata.persist.statistics.AlteredDatabaseStatistics;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Statistics storage engine.
+ */
+@RequiredArgsConstructor
+public final class StatisticsStorageEngine {
+    
+    private final ContextManager contextManager;
+    
+    private final String databaseName;
+    
+    private final String schemaName;
+    
+    private final String tableName;
+    
+    private final Collection<Map<String, Object>> rowColumnValues;
+    
+    /**
+     * Storage.
+     */
+    public void storage() {
+        ShardingSphereTable table = 
contextManager.getDatabase(databaseName).getSchema(schemaName).getTable(tableName);
+        TableStatistics changedTableStatistics = new 
TableStatistics(table.getName());
+        for (Map<String, Object> each : rowColumnValues) {
+            changedTableStatistics.getRows().add(new 
RowStatistics(RowStatisticsCollectorUtils.createRowValues(each, table)));
+        }
+        AlteredDatabaseStatistics alteredDatabaseStatistics = 
createAlteredDatabaseStatistics(table, getCurrentTableStatistics(), 
changedTableStatistics);
+        
contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getStatisticsService().update(alteredDatabaseStatistics);
+    }
+    
+    private TableStatistics getCurrentTableStatistics() {
+        ShardingSphereStatistics currentStatistics = 
contextManager.getMetaDataContexts().getStatistics();
+        if (!currentStatistics.containsDatabaseStatistics(databaseName)) {
+            return new TableStatistics(tableName);
+        }
+        DatabaseStatistics databaseStatistics = 
currentStatistics.getDatabaseStatistics(databaseName);
+        if (!databaseStatistics.containsSchemaStatistics(schemaName)) {
+            return new TableStatistics(tableName);
+        }
+        SchemaStatistics schemaStatistics = 
databaseStatistics.getSchemaStatistics(schemaName);
+        if (!schemaStatistics.containsTableStatistics(tableName)) {
+            return new TableStatistics(tableName);
+        }
+        return schemaStatistics.getTableStatistics(tableName);
+    }
+    
+    private AlteredDatabaseStatistics createAlteredDatabaseStatistics(final 
ShardingSphereTable table, final TableStatistics currentTableStatistics, final 
TableStatistics changedTableStatistics) {
+        AlteredDatabaseStatistics result = new 
AlteredDatabaseStatistics(databaseName, schemaName, tableName);
+        Map<String, RowStatistics> tableStatisticsMap = 
currentTableStatistics.getRows().stream().collect(Collectors.toMap(RowStatistics::getUniqueKey,
 Function.identity()));
+        Map<String, RowStatistics> changedTableStatisticsMap = 
changedTableStatistics.getRows().stream().collect(Collectors.toMap(RowStatistics::getUniqueKey,
 Function.identity()));
+        YamlRowStatisticsSwapper swapper = new YamlRowStatisticsSwapper(new 
ArrayList<>(table.getAllColumns()));
+        for (Entry<String, RowStatistics> entry : 
changedTableStatisticsMap.entrySet()) {
+            if (!tableStatisticsMap.containsKey(entry.getKey())) {
+                
result.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+            } else if 
(!tableStatisticsMap.get(entry.getKey()).equals(entry.getValue())) {
+                
result.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+            }
+        }
+        for (Entry<String, RowStatistics> entry : 
tableStatisticsMap.entrySet()) {
+            if (!changedTableStatisticsMap.containsKey(entry.getKey())) {
+                
result.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/TableStatisticsCollectorFixture.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/TableStatisticsCollectorFixture.java
deleted file mode 100644
index fedce3430d2..00000000000
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/TableStatisticsCollectorFixture.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.fixture;
-
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
-import 
org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector;
-
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Optional;
-
-/**
- * Table statistics collector fixture.
- */
-public final class TableStatisticsCollectorFixture implements 
TableStatisticsCollector {
-    
-    @Override
-    public Optional<TableStatistics> collect(final String databaseName, final 
ShardingSphereTable table, final ShardingSphereMetaData metaData) throws 
SQLException {
-        TableStatistics tableStatistics = new TableStatistics("test_table");
-        tableStatistics.getRows().add(new RowStatistics(Arrays.asList("1", 
"2")));
-        return Optional.of(tableStatistics);
-    }
-    
-    @Override
-    public String getType() {
-        return "test_table";
-    }
-}
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngineTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsStorageEngineTest.java
similarity index 56%
rename from 
mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngineTest.java
rename to 
mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsStorageEngineTest.java
index b01303946c6..021b271a039 100644
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngineTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsStorageEngineTest.java
@@ -17,76 +17,52 @@
 
 package org.apache.shardingsphere.mode.metadata.refresher.statistics;
 
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import 
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationProperties;
-import 
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
 import org.apache.shardingsphere.infra.metadata.statistics.DatabaseStatistics;
 import org.apache.shardingsphere.infra.metadata.statistics.SchemaStatistics;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
 import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.test.util.PropertiesBuilder;
-import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
 import org.junit.jupiter.api.Test;
 
 import java.sql.Types;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Properties;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-class StatisticsRefreshEngineTest {
+class StatisticsStorageEngineTest {
     
     @Test
     void assertRefresh() {
         ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
         ShardingSphereStatistics statistics = mockStatistics();
         
when(contextManager.getMetaDataContexts().getStatistics()).thenReturn(statistics);
-        ShardingSphereMetaData metaData = mockMetaData();
-        
when(contextManager.getMetaDataContexts().getMetaData()).thenReturn(metaData);
-        
when(contextManager.getMetaDataContexts().getMetaData().getProps()).thenReturn(new
 ConfigurationProperties(new Properties()));
-        
when(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps()).thenReturn(new
 TemporaryConfigurationProperties(
-                PropertiesBuilder.build(new 
Property(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED.getKey(),
 Boolean.TRUE.toString()))));
-        
when(contextManager.getComputeNodeInstanceContext().getLockContext().tryLock(any(),
 anyLong())).thenReturn(true);
-        new StatisticsRefreshEngine(contextManager).refresh();
+        ShardingSphereTable table = mockTable();
+        
when(contextManager.getDatabase("foo_db").getSchema("foo_schema").getTable("foo_table")).thenReturn(table);
+        new StatisticsStorageEngine(contextManager, "foo_db", "foo_schema", 
"foo_table", Collections.emptyList()).storage();
         
verify(contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getStatisticsService()).update(any());
     }
     
     private ShardingSphereStatistics mockStatistics() {
-        ShardingSphereStatistics result = new ShardingSphereStatistics();
-        DatabaseStatistics databaseStatistics = new DatabaseStatistics();
+        TableStatistics tableStatistics = new TableStatistics("foo_table");
         SchemaStatistics schemaStatistics = new SchemaStatistics();
+        schemaStatistics.putTableStatistics("foo_table", tableStatistics);
+        DatabaseStatistics databaseStatistics = new DatabaseStatistics();
         databaseStatistics.putSchemaStatistics("foo_schema", schemaStatistics);
-        TableStatistics tableStatistics = new TableStatistics("test_table");
-        schemaStatistics.putTableStatistics("test_table", tableStatistics);
+        ShardingSphereStatistics result = new ShardingSphereStatistics();
         result.getDatabaseStatisticsMap().put("foo_db", databaseStatistics);
         return result;
     }
     
-    private ShardingSphereMetaData mockMetaData() {
-        ShardingSphereMetaData result = mock(ShardingSphereMetaData.class);
-        ShardingSphereSchema schema = new ShardingSphereSchema("foo_schema", 
Collections.singleton(mockTable()), Collections.emptyList());
-        ShardingSphereDatabase database = new ShardingSphereDatabase("foo_db", 
mock(), mock(), mock(), Collections.singleton(schema));
-        
when(result.getAllDatabases()).thenReturn(Collections.singleton(database));
-        when(result.getDatabase("foo_db")).thenReturn(database);
-        when(result.containsDatabase("foo_db")).thenReturn(true);
-        return result;
-    }
-    
     private static ShardingSphereTable mockTable() {
-        ShardingSphereTable result = mock(ShardingSphereTable.class);
-        when(result.getName()).thenReturn("test_table");
+        ShardingSphereTable result = mock(ShardingSphereTable.class, 
RETURNS_DEEP_STUBS);
+        when(result.getName()).thenReturn("foo_table");
         ShardingSphereColumn column1 = new ShardingSphereColumn("col_1", 
Types.INTEGER, false, false, false, true, false, false);
         ShardingSphereColumn column2 = new ShardingSphereColumn("col_2", 
Types.INTEGER, false, false, false, true, false, false);
         when(result.getAllColumns()).thenReturn(Arrays.asList(column1, 
column2));

Reply via email to