This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5df6349a71b Optimize ShardingStatisticsTableCollector to support 
aggregated datasource (#34083)
5df6349a71b is described below

commit 5df6349a71b9dcd460100e62d6b6544215fb2a7a
Author: jiangML <1060319...@qq.com>
AuthorDate: Tue Dec 17 11:43:53 2024 +0800

    Optimize ShardingStatisticsTableCollector to support aggregated datasource 
(#34083)
---
 .../data/ShardingStatisticsTableCollector.java     | 28 ++++++++++++++++++----
 1 file changed, 23 insertions(+), 5 deletions(-)

diff --git 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
index e0735b75a0c..2165b336f63 100644
--- 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
+++ 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.sharding.metadata.data;
 
+import org.apache.shardingsphere.infra.database.DatabaseTypeEngine;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -29,10 +30,12 @@ import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSp
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
+import 
org.apache.shardingsphere.infra.rule.attribute.datasource.aggregate.AggregatedDataSourceRuleAttribute;
 import 
org.apache.shardingsphere.sharding.metadata.data.dialect.DialectShardingStatisticsTableCollector;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.apache.shardingsphere.sharding.rule.ShardingTable;
 
+import javax.sql.DataSource;
 import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.SQLException;
@@ -81,19 +84,34 @@ public final class ShardingStatisticsTableCollector 
implements ShardingSphereSta
                 row.add(each.getLogicTable());
                 row.add(dataNode.getDataSourceName());
                 row.add(dataNode.getTableName());
-                
addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnits(), 
dataNode, row);
+                
addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnits(), 
dataNode, row, rule);
                 tableData.getRows().add(new ShardingSphereRowData(row));
             }
         }
     }
     
-    private void addTableRowsAndDataLength(final Map<String, StorageUnit> 
storageUnits, final DataNode dataNode, final List<Object> row) throws 
SQLException {
+    private void addTableRowsAndDataLength(final Map<String, StorageUnit> 
storageUnits, final DataNode dataNode, final List<Object> row, final 
ShardingRule rule) throws SQLException {
+        DataSource dataSource;
+        DatabaseType databaseType;
         StorageUnit storageUnit = 
storageUnits.get(dataNode.getDataSourceName());
-        DatabaseType databaseType = storageUnit.getStorageType();
-        Optional<DialectShardingStatisticsTableCollector> dialectCollector = 
DatabaseTypedSPILoader.findService(DialectShardingStatisticsTableCollector.class,
 databaseType);
+        if (null != storageUnit) {
+            dataSource = storageUnit.getDataSource();
+            databaseType = storageUnit.getStorageType();
+        } else {
+            Optional<AggregatedDataSourceRuleAttribute> 
aggregatedDataSourceRuleAttribute = 
rule.getAttributes().findAttribute(AggregatedDataSourceRuleAttribute.class);
+            dataSource = aggregatedDataSourceRuleAttribute.map(optional -> 
optional.getAggregatedDataSources().get(dataNode.getDataSourceName())).orElse(null);
+            databaseType = null != dataSource ? 
DatabaseTypeEngine.getStorageType(dataSource) : null;
+        }
+        if (null != dataSource && null != databaseType) {
+            addTableRowsAndDataLength(databaseType, dataSource, dataNode, row);
+        }
+    }
+    
+    private void addTableRowsAndDataLength(final DatabaseType databaseType, 
final DataSource dataSource, final DataNode dataNode, final List<Object> row) 
throws SQLException {
         boolean isAppended = false;
+        Optional<DialectShardingStatisticsTableCollector> dialectCollector = 
DatabaseTypedSPILoader.findService(DialectShardingStatisticsTableCollector.class,
 databaseType);
         if (dialectCollector.isPresent()) {
-            try (Connection connection = 
storageUnit.getDataSource().getConnection()) {
+            try (Connection connection = dataSource.getConnection()) {
                 isAppended = dialectCollector.get().appendRow(connection, 
dataNode, row);
             }
         }

Reply via email to