This is an automated email from the ASF dual-hosted git repository. duanzhengqiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 5df6349a71b Optimize ShardingStatisticsTableCollector to support aggregated datasource (#34083) 5df6349a71b is described below commit 5df6349a71b9dcd460100e62d6b6544215fb2a7a Author: jiangML <1060319...@qq.com> AuthorDate: Tue Dec 17 11:43:53 2024 +0800 Optimize ShardingStatisticsTableCollector to support aggregated datasource (#34083) --- .../data/ShardingStatisticsTableCollector.java | 28 ++++++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java index e0735b75a0c..2165b336f63 100644 --- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java +++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.sharding.metadata.data; +import org.apache.shardingsphere.infra.database.DatabaseTypeEngine; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; @@ -29,10 +30,12 @@ import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSp import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData; import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData; import org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector; +import org.apache.shardingsphere.infra.rule.attribute.datasource.aggregate.AggregatedDataSourceRuleAttribute; import org.apache.shardingsphere.sharding.metadata.data.dialect.DialectShardingStatisticsTableCollector; import org.apache.shardingsphere.sharding.rule.ShardingRule; import org.apache.shardingsphere.sharding.rule.ShardingTable; +import javax.sql.DataSource; import java.math.BigDecimal; import java.sql.Connection; import java.sql.SQLException; @@ -81,19 +84,34 @@ public final class ShardingStatisticsTableCollector implements ShardingSphereSta row.add(each.getLogicTable()); row.add(dataNode.getDataSourceName()); row.add(dataNode.getTableName()); - addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnits(), dataNode, row); + addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnits(), dataNode, row, rule); tableData.getRows().add(new ShardingSphereRowData(row)); } } } - private void addTableRowsAndDataLength(final Map<String, StorageUnit> storageUnits, final DataNode dataNode, final List<Object> row) throws SQLException { + private void addTableRowsAndDataLength(final Map<String, StorageUnit> storageUnits, final DataNode dataNode, final List<Object> row, final ShardingRule rule) throws SQLException { + DataSource dataSource; + DatabaseType databaseType; StorageUnit storageUnit = storageUnits.get(dataNode.getDataSourceName()); - DatabaseType databaseType = storageUnit.getStorageType(); - Optional<DialectShardingStatisticsTableCollector> dialectCollector = DatabaseTypedSPILoader.findService(DialectShardingStatisticsTableCollector.class, databaseType); + if (null != storageUnit) { + dataSource = storageUnit.getDataSource(); + databaseType = storageUnit.getStorageType(); + } else { + Optional<AggregatedDataSourceRuleAttribute> aggregatedDataSourceRuleAttribute = rule.getAttributes().findAttribute(AggregatedDataSourceRuleAttribute.class); + dataSource = aggregatedDataSourceRuleAttribute.map(optional -> optional.getAggregatedDataSources().get(dataNode.getDataSourceName())).orElse(null); + databaseType = null != dataSource ? DatabaseTypeEngine.getStorageType(dataSource) : null; + } + if (null != dataSource && null != databaseType) { + addTableRowsAndDataLength(databaseType, dataSource, dataNode, row); + } + } + + private void addTableRowsAndDataLength(final DatabaseType databaseType, final DataSource dataSource, final DataNode dataNode, final List<Object> row) throws SQLException { boolean isAppended = false; + Optional<DialectShardingStatisticsTableCollector> dialectCollector = DatabaseTypedSPILoader.findService(DialectShardingStatisticsTableCollector.class, databaseType); if (dialectCollector.isPresent()) { - try (Connection connection = storageUnit.getDataSource().getConnection()) { + try (Connection connection = dataSource.getConnection()) { isAppended = dialectCollector.get().appendRow(connection, dataNode, row); } }