This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f46ce8bbee6 Add OpenGaussIncrementalDumperTest (#37420)
f46ce8bbee6 is described below
commit f46ce8bbee61c43f594becf87951cf665fb0913f
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Dec 18 12:37:52 2025 +0800
Add OpenGaussIncrementalDumperTest (#37420)
---
.../dumper/OpenGaussIncrementalDumperTest.java | 314 ++++++++++++++++++++-
1 file changed, 304 insertions(+), 10 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/incremental/dumper/OpenGaussIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/incremental/dumper/OpenGaussIncrementalDumperTest.java
index 600cd5ccf88..4b14b256362 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/incremental/dumper/OpenGaussIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/incremental/dumper/OpenGaussIncrementalDumperTest.java
@@ -17,25 +17,319 @@
package
org.apache.shardingsphere.data.pipeline.opengauss.ingest.incremental.dumper;
+import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.channel.memory.MemoryPipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
+import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.incremental.wal.OpenGaussLogicalReplication;
+import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.incremental.wal.decode.MppdbDecodingPlugin;
+import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.incremental.wal.decode.OpenGaussLogSequenceNumber;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.WALEventConverter;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.WALPosition;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.AbstractRowEvent;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.AbstractWALEvent;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.BeginTXEvent;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.CommitTXEvent;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.WriteRowEvent;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.position.slot.PostgreSQLSlotNameGenerator;
+import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
import org.mockito.internal.configuration.plugins.Plugins;
+import org.opengauss.jdbc.PgConnection;
+import org.opengauss.replication.LogSequenceNumber;
+import org.opengauss.replication.PGReplicationStream;
+
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
class OpenGaussIncrementalDumperTest {
@Test
- void assertGetVersion() throws ReflectiveOperationException {
- OpenGaussIncrementalDumper dumper =
mock(OpenGaussIncrementalDumper.class);
- int version = (int)
Plugins.getMemberAccessor().invoke(OpenGaussIncrementalDumper.class.getDeclaredMethod("parseMajorVersion",
String.class), dumper,
- "(openGauss 3.1.0 build ) compiled at 2023-02-17 16:13:51
commit 0 last mr on x86_64-unknown-linux-gnu, compiled by g++ (GCC) 7.3.0,
64-bit");
- assertThat(version, is(3));
- OpenGaussIncrementalDumper mock =
mock(OpenGaussIncrementalDumper.class);
- version = (int)
Plugins.getMemberAccessor().invoke(OpenGaussIncrementalDumper.class.getDeclaredMethod("parseMajorVersion",
String.class), mock, "(openGauss 5.0.1 build )");
- assertThat(version, is(5));
- version = (int)
Plugins.getMemberAccessor().invoke(OpenGaussIncrementalDumper.class.getDeclaredMethod("parseMajorVersion",
String.class), mock, "not match");
- assertThat(version, is(2));
+ void assertRunBlockingRetriesFiveTimesThenThrowsIngestException() {
+ AtomicInteger runningIndex = new AtomicInteger();
+ boolean[] runningSequence = {true, false, true, false, true, false,
true, false, true, false};
+ OpenGaussIncrementalDumper dumper =
mock(OpenGaussIncrementalDumper.class, withSettings()
+ .useConstructor(createDumperContext(false), new
WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(1L))), new
MemoryPipelineChannel(10, records -> {
+ }), mock(PipelineTableMetaDataLoader.class))
+ .defaultAnswer(invocation -> {
+ if ("isRunning".equals(invocation.getMethod().getName())) {
+ int index = runningIndex.getAndIncrement();
+ return index < runningSequence.length &&
runningSequence[index];
+ }
+ return invocation.callRealMethod();
+ }));
+ AtomicInteger attempts = new AtomicInteger();
+ try (MockedStatic<DriverManager> driverManagerMock =
mockStatic(DriverManager.class)) {
+ driverManagerMock.when(() ->
DriverManager.getConnection(anyString(), anyString(),
anyString())).thenAnswer(invocation -> {
+ attempts.incrementAndGet();
+ throw new SQLException("failed");
+ });
+ assertThrows(IngestException.class, dumper::start);
+ }
+ assertThat(attempts.get(), is(5));
+ }
+
+ @Test
+ void assertRunBlockingSkipSleepWhenStopped() throws
ReflectiveOperationException, SQLException {
+ IngestPosition position = new WALPosition(new
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(1L)));
+ PipelineChannel channel = new MemoryPipelineChannel(10, records -> {
+ });
+ OpenGaussIncrementalDumper dumper = new
OpenGaussIncrementalDumper(createDumperContext(false), position, channel,
mock());
+ AtomicReference<Boolean> running = getRunningState(dumper);
+ OpenGaussLogicalReplication logicalReplication =
mock(OpenGaussLogicalReplication.class);
+
Plugins.getMemberAccessor().set(OpenGaussIncrementalDumper.class.getDeclaredField("logicalReplication"),
dumper, logicalReplication);
+ IncrementalDumperContext dumperContext = getDumperContext(dumper);
+ PgConnection pgConnection = mock(PgConnection.class);
+
when(logicalReplication.createConnection((StandardPipelineDataSourceConfiguration)
dumperContext.getCommonContext().getDataSourceConfig())).thenReturn(pgConnection);
+ when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
+ try (MockedStatic<PostgreSQLSlotNameGenerator> slotNameMock =
mockStatic(PostgreSQLSlotNameGenerator.class)) {
+ slotNameMock.when(() ->
PostgreSQLSlotNameGenerator.getUniqueSlotName(pgConnection,
dumperContext.getJobId())).thenReturn("slot-1");
+ when(logicalReplication.createReplicationStream(eq(pgConnection),
any(), eq("slot-1"), eq(3))).thenAnswer(invocation -> {
+ running.set(false);
+ throw new SQLException("stopped");
+ });
+ try (MockedStatic<DriverManager> driverManagerMock =
mockStatic(DriverManager.class)) {
+ mockVersionQuery(driverManagerMock, "(openGauss 3.0 build )");
+ dumper.start();
+ }
+ }
+ assertFalse(running.get());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ void assertDumpDecodeWithTXWhenMajorVersionAtLeastThree() throws
ReflectiveOperationException, SQLException {
+ IngestPosition position = new WALPosition(new
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(1L)));
+ MemoryPipelineChannel channel = new MemoryPipelineChannel(10, records
-> {
+ });
+ OpenGaussIncrementalDumper dumper = new
OpenGaussIncrementalDumper(createDumperContext(true), position, channel,
mock());
+ OpenGaussLogicalReplication logicalReplication =
mock(OpenGaussLogicalReplication.class);
+
Plugins.getMemberAccessor().set(OpenGaussIncrementalDumper.class.getDeclaredField("logicalReplication"),
dumper, logicalReplication);
+ PgConnection pgConnection = mock(PgConnection.class);
+ PGReplicationStream stream = mock(PGReplicationStream.class);
+ IncrementalDumperContext dumperContext = getDumperContext(dumper);
+
when(logicalReplication.createConnection((StandardPipelineDataSourceConfiguration)
dumperContext.getCommonContext().getDataSourceConfig())).thenReturn(pgConnection);
+ when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
+
when(stream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(200L));
+ when(stream.readPending()).thenReturn(null,
ByteBuffer.wrap("1".getBytes()), ByteBuffer.wrap("2".getBytes()),
+ ByteBuffer.wrap("3".getBytes()),
ByteBuffer.wrap("4".getBytes()), ByteBuffer.wrap("5".getBytes()));
+
Plugins.getMemberAccessor().set(OpenGaussIncrementalDumper.class.getDeclaredField("walEventConverter"),
dumper, mock(WALEventConverter.class));
+ AtomicReference<Boolean> running = getRunningState(dumper);
+ try (
+ MockedStatic<DriverManager> driverManagerMock =
mockStatic(DriverManager.class);
+ MockedStatic<PostgreSQLSlotNameGenerator> slotNameMock =
mockStatic(PostgreSQLSlotNameGenerator.class);
+ MockedConstruction<MppdbDecodingPlugin> ignoredConstruction =
mockConstruction(MppdbDecodingPlugin.class, (mockedPlugin, context) -> {
+ AtomicInteger counter = new AtomicInteger();
+ when(mockedPlugin.decode(any(ByteBuffer.class),
any(OpenGaussLogSequenceNumber.class)))
+ .thenAnswer(invocation ->
createWALEventWithDecodeSequence(running, counter.incrementAndGet()));
+ })) {
+ slotNameMock.when(() ->
PostgreSQLSlotNameGenerator.getUniqueSlotName(pgConnection,
dumperContext.getJobId())).thenReturn("slot-1");
+ when(logicalReplication.createReplicationStream(eq(pgConnection),
any(), eq("slot-1"), eq(3))).thenReturn(stream);
+ mockVersionQuery(driverManagerMock, "(openGauss 3.1.0 build )");
+ dumper.start();
+ }
+ assertThat(channel.fetch(10, 0L).size(), is(3));
+ AtomicReference<WALPosition> walPosition =
(AtomicReference<WALPosition>)
Plugins.getMemberAccessor().get(OpenGaussIncrementalDumper.class.getDeclaredField("walPosition"),
dumper);
+ assertThat(walPosition.get().getLogSequenceNumber().asString(),
is(LogSequenceNumber.valueOf(5L).asString()));
+ verify(stream).close();
+ }
+
+ private AbstractWALEvent createWALEventWithDecodeSequence(final
AtomicReference<Boolean> running, final int index) {
+ AbstractWALEvent result;
+ switch (index) {
+ case 1:
+ result = new BeginTXEvent(1L, 10L);
+ break;
+ case 2:
+ WriteRowEvent firstRowEvent = new WriteRowEvent();
+ firstRowEvent.setTableName("t_order");
+ firstRowEvent.setSchemaName("public");
+ result = firstRowEvent;
+ break;
+ case 3:
+ result = new BeginTXEvent(2L, 20L);
+ break;
+ case 4:
+ WriteRowEvent secondRowEvent = new WriteRowEvent();
+ secondRowEvent.setTableName("t_order");
+ secondRowEvent.setSchemaName("public");
+ result = secondRowEvent;
+ break;
+ default:
+ result = new CommitTXEvent(3L, 30L);
+ running.set(false);
+ }
+ result.setLogSequenceNumber(new
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(index)));
+ return result;
+ }
+
+ @Test
+ void assertDumpDecodeWithTXBeforeVersionThree() throws
ReflectiveOperationException, SQLException {
+ IngestPosition position = new WALPosition(new
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(1L)));
+ MemoryPipelineChannel channel = new MemoryPipelineChannel(10, records
-> {
+ });
+ OpenGaussIncrementalDumper dumper = new
OpenGaussIncrementalDumper(createDumperContext(true), position, channel,
mock());
+ OpenGaussLogicalReplication logicalReplication =
mock(OpenGaussLogicalReplication.class);
+
Plugins.getMemberAccessor().set(OpenGaussIncrementalDumper.class.getDeclaredField("logicalReplication"),
dumper, logicalReplication);
+ PgConnection pgConnection = mock(PgConnection.class);
+ PGReplicationStream stream = mock(PGReplicationStream.class);
+ IncrementalDumperContext dumperContext = getDumperContext(dumper);
+
when(logicalReplication.createConnection((StandardPipelineDataSourceConfiguration)
dumperContext.getCommonContext().getDataSourceConfig())).thenReturn(pgConnection);
+ when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
+
when(stream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(20L));
+ when(stream.readPending()).thenReturn(ByteBuffer.wrap("1".getBytes()),
ByteBuffer.wrap("2".getBytes()), ByteBuffer.wrap("3".getBytes()));
+ doThrow(new SQLException("close error")).when(stream).close();
+ WALEventConverter walEventConverter = mock(WALEventConverter.class);
+
Plugins.getMemberAccessor().set(OpenGaussIncrementalDumper.class.getDeclaredField("walEventConverter"),
dumper, walEventConverter);
+ ArgumentCaptor<AbstractWALEvent> eventCaptor =
ArgumentCaptor.forClass(AbstractWALEvent.class);
+ AtomicReference<Boolean> running = getRunningState(dumper);
+ try (
+ MockedStatic<DriverManager> driverManagerMock =
mockStatic(DriverManager.class);
+ MockedStatic<PostgreSQLSlotNameGenerator> slotNameMock =
mockStatic(PostgreSQLSlotNameGenerator.class);
+ MockedConstruction<MppdbDecodingPlugin> ignoredConstruction =
mockConstruction(MppdbDecodingPlugin.class, (mockedPlugin, context) -> {
+ AtomicInteger counter = new AtomicInteger();
+ when(mockedPlugin.decode(any(ByteBuffer.class),
any(OpenGaussLogSequenceNumber.class)))
+ .thenAnswer(invocation ->
createWALEventWithDecodeForOldVersion(running, counter.incrementAndGet()));
+ })) {
+ slotNameMock.when(() ->
PostgreSQLSlotNameGenerator.getUniqueSlotName(pgConnection,
dumperContext.getJobId())).thenReturn("slot-1");
+ when(logicalReplication.createReplicationStream(eq(pgConnection),
any(), eq("slot-1"), eq(2))).thenReturn(stream);
+ mockVersionQuery(driverManagerMock, "(openGauss 2.0 build )");
+ dumper.start();
+ }
+ verify(walEventConverter, times(2)).convert(eventCaptor.capture());
+ AbstractRowEvent rowEvent = (AbstractRowEvent)
eventCaptor.getAllValues().get(0);
+ assertThat(rowEvent.getCsn(), is(40L));
+ }
+
+ private AbstractWALEvent createWALEventWithDecodeForOldVersion(final
AtomicReference<Boolean> running, final int index) {
+ AbstractWALEvent result;
+ if (1 == index) {
+ result = new BeginTXEvent(1L, 10L);
+ } else if (2 == index) {
+ WriteRowEvent rowEvent = new WriteRowEvent();
+ rowEvent.setTableName("t_order");
+ rowEvent.setSchemaName("public");
+ result = rowEvent;
+ } else {
+ result = new CommitTXEvent(3L, 40L);
+ running.set(false);
+ }
+ result.setLogSequenceNumber(new
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(index)));
+ return result;
+ }
+
+ @Test
+ void assertDumpIgnoreTransactionProcessEvents() throws
ReflectiveOperationException, SQLException {
+ MemoryPipelineChannel channel = new MemoryPipelineChannel(10, records
-> {
+ });
+ IngestPosition position = new WALPosition(new
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(1L)));
+ OpenGaussIncrementalDumper dumper = new
OpenGaussIncrementalDumper(createDumperContext(false), position, channel,
mock());
+ OpenGaussLogicalReplication logicalReplication =
mock(OpenGaussLogicalReplication.class);
+
Plugins.getMemberAccessor().set(OpenGaussIncrementalDumper.class.getDeclaredField("logicalReplication"),
dumper, logicalReplication);
+ PgConnection pgConnection = mock(PgConnection.class);
+ PGReplicationStream stream = mock(PGReplicationStream.class);
+ IncrementalDumperContext dumperContext = getDumperContext(dumper);
+
when(logicalReplication.createConnection((StandardPipelineDataSourceConfiguration)
dumperContext.getCommonContext().getDataSourceConfig())).thenReturn(pgConnection);
+ when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
+
when(stream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(50L));
+ when(stream.readPending()).thenReturn(ByteBuffer.wrap("1".getBytes()),
ByteBuffer.wrap("2".getBytes()));
+
Plugins.getMemberAccessor().set(OpenGaussIncrementalDumper.class.getDeclaredField("walEventConverter"),
dumper, mock(WALEventConverter.class));
+ AtomicReference<Boolean> running = getRunningState(dumper);
+ try (
+ MockedStatic<DriverManager> driverManagerMock =
mockStatic(DriverManager.class);
+ MockedStatic<PostgreSQLSlotNameGenerator> slotNameMock =
mockStatic(PostgreSQLSlotNameGenerator.class);
+ MockedConstruction<MppdbDecodingPlugin> ignoredConstruction =
mockConstruction(MppdbDecodingPlugin.class, (mockedPlugin, context) -> {
+ AtomicInteger counter = new AtomicInteger();
+ when(mockedPlugin.decode(any(ByteBuffer.class),
any(OpenGaussLogSequenceNumber.class)))
+ .thenAnswer(invocation ->
createWALEventWithDecodeIgnoreTX(running, counter.incrementAndGet()));
+ })) {
+ slotNameMock.when(() ->
PostgreSQLSlotNameGenerator.getUniqueSlotName(pgConnection,
dumperContext.getJobId())).thenReturn("slot-1");
+ when(logicalReplication.createReplicationStream(eq(pgConnection),
any(), eq("slot-1"), eq(5))).thenReturn(stream);
+ mockVersionQuery(driverManagerMock, "(openGauss 5.0 build )");
+ dumper.start();
+ }
+ assertThat(channel.fetch(10, 0L).size(), is(1));
+ verify(stream).close();
+ }
+
+ private AbstractWALEvent createWALEventWithDecodeIgnoreTX(final
AtomicReference<Boolean> running, final int index) {
+ AbstractWALEvent result;
+ if (1 == index) {
+ result = new BeginTXEvent(1L, 10L);
+ } else {
+ WriteRowEvent rowEvent = new WriteRowEvent();
+ rowEvent.setTableName("t_order");
+ rowEvent.setSchemaName("public");
+ result = rowEvent;
+ running.set(false);
+ }
+ result.setLogSequenceNumber(new
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(index)));
+ return result;
+ }
+
+ private IncrementalDumperContext createDumperContext(final boolean
decodeWithTX) {
+ Map<String, Object> poolProps = new HashMap<>(3, 1F);
+ poolProps.put("url", "jdbc:postgresql://localhost:5432/test_db");
+ poolProps.put("username", "root");
+ poolProps.put("password", "root");
+ DumperCommonContext commonContext = new DumperCommonContext(null, new
StandardPipelineDataSourceConfiguration(poolProps),
+ new ActualAndLogicTableNameMapper(Collections.singletonMap(new
ShardingSphereIdentifier("t_order"), new ShardingSphereIdentifier("t_order"))),
+ new TableAndSchemaNameMapper(Collections.emptyMap()));
+ return new IncrementalDumperContext(commonContext, "0101",
decodeWithTX);
+ }
+
+ @SuppressWarnings("unchecked")
+ private AtomicReference<Boolean> getRunningState(final
OpenGaussIncrementalDumper dumper) throws ReflectiveOperationException {
+ return (AtomicReference<Boolean>)
Plugins.getMemberAccessor().get(AbstractPipelineLifecycleRunnable.class.getDeclaredField("running"),
dumper);
+ }
+
+ private IncrementalDumperContext getDumperContext(final
OpenGaussIncrementalDumper dumper) throws ReflectiveOperationException {
+ return (IncrementalDumperContext)
Plugins.getMemberAccessor().get(OpenGaussIncrementalDumper.class.getDeclaredField("dumperContext"),
dumper);
+ }
+
+ private void mockVersionQuery(final MockedStatic<DriverManager>
driverManagerMock, final String versionText) throws SQLException {
+ Connection connection = mock(Connection.class);
+ Statement statement = mock(Statement.class);
+ ResultSet resultSet = mock(ResultSet.class);
+ driverManagerMock.when(() -> DriverManager.getConnection(anyString(),
anyString(), anyString())).thenReturn(connection);
+ when(connection.createStatement()).thenReturn(statement);
+ when(statement.executeQuery("SELECT version()")).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getString(1)).thenReturn(versionText);
}
}