This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 00f550e3d0 [Fix][MongoDB] The Long type cannot handle string values in scientific notation (#8783) 00f550e3d0 is described below commit 00f550e3d09a10b419651725cec88ca3016c8eb5 Author: lee <904850...@qq.com> AuthorDate: Mon Apr 7 22:06:12 2025 +0800 [Fix][MongoDB] The Long type cannot handle string values in scientific notation (#8783) --- .../source/split/SamplingSplitStrategy.java | 7 +- .../source/split/SamplingSplitStrategyTest.java | 74 ++++++++++++++++++++++ 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategy.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategy.java index 4ef08c5398..be37d38eda 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategy.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategy.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.mongodb.source.split; +import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting; import org.apache.seatunnel.shade.com.google.common.base.Preconditions; import org.apache.seatunnel.shade.com.google.common.collect.Lists; @@ -33,6 +34,7 @@ import com.mongodb.client.model.Projections; import com.mongodb.client.model.Sorts; import java.io.Serializable; +import java.math.BigDecimal; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -103,7 +105,8 @@ public class SamplingSplitStrategy implements MongoSplitStrategy, Serializable { return createSplits(splitKey, rightBoundaries); } - private ImmutablePair<Long, Long> getDocumentNumAndAvgSize() { + @VisibleForTesting + protected ImmutablePair<Long, Long> getDocumentNumAndAvgSize() { String collectionName = clientProvider.getDefaultCollection().getNamespace().getCollectionName(); BsonDocument statsCmd = new BsonDocument("collStats", new BsonString(collectionName)); @@ -112,7 +115,7 @@ public class SamplingSplitStrategy implements MongoSplitStrategy, Serializable { // fix issue https://github.com/apache/seatunnel/issues/7575 long total = Optional.ofNullable(count) - .map(v -> Long.parseLong(String.valueOf(count))) + .map(v -> new BigDecimal(String.valueOf(count)).longValue()) .orElse(0L); Object avgDocumentBytes = res.get("avgObjSize"); long avgObjSize = diff --git a/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategyTest.java b/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategyTest.java new file mode 100644 index 0000000000..ec5b05844f --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategyTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.source.split; + +import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider; + +import org.apache.commons.lang3.tuple.ImmutablePair; + +import org.bson.BsonDocument; +import org.bson.BsonString; +import org.bson.Document; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import com.mongodb.MongoNamespace; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +public class SamplingSplitStrategyTest { + + @Mock private MongodbClientProvider clientProvider; + + @Mock private MongoCollection<BsonDocument> collection; + + @Mock private MongoDatabase database; + + private SamplingSplitStrategy strategy; + + @BeforeEach + public void setUp() { + MockitoAnnotations.openMocks(this); + strategy = new SamplingSplitStrategy(clientProvider, "splitKey", null, null, 100L, 1000L); + when(clientProvider.getDefaultCollection()).thenReturn(collection); + when(clientProvider.getDefaultDatabase()).thenReturn(database); + + MongoNamespace namespace = new MongoNamespace("databaseName", "collectionName"); + when(collection.getNamespace()).thenReturn(namespace); + } + + @Test + public void testGetDocumentNumAndAvgSize() { + BsonDocument statsCmd = new BsonDocument("collStats", new BsonString("collectionName")); + Document res = new Document(); + res.put("count", "1.3360484963E10"); + res.put("avgObjSize", 200.0); + + when(database.runCommand(statsCmd)).thenReturn(res); + + ImmutablePair<Long, Long> result = strategy.getDocumentNumAndAvgSize(); + + assertEquals(Long.valueOf(13360484963L), result.getLeft()); + assertEquals(Long.valueOf(200), result.getRight()); + } +}