Vladimir Steshin created IGNITE-17369:
-----------------------------------------
Summary: Snapshot is inconsistent under streamed loading.
Key: IGNITE-17369
URL: https://issues.apache.org/jira/browse/IGNITE-17369
Project: Ignite
Issue Type: Improvement
Reporter: Vladimir Steshin
Attachments: IgniteClusterShanpshotStreamerTest.java
Ignite fails to restore snapshot created under streamed load:
{code:java}
Conflict partition: PartitionKeyV2 [grpId=109386747,
grpName=SQL_PUBLIC_TEST_TBL1, partId=148]
Partition instances: [PartitionHashRecordV2 [isPrimary=false,
consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=29,
partitionState=OWNING, size=29, partHash=827765854], PartitionHashRecordV2
[isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1,
updateCntr=9, partitionState=OWNING, size=9, partHash=-1515069105]]
Conflict partition: PartitionKeyV2 [grpId=109386747,
grpName=SQL_PUBLIC_TEST_TBL1, partId=146]
Partition instances: [PartitionHashRecordV2 [isPrimary=false,
consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=28,
partitionState=OWNING, size=28, partHash=1497908810], PartitionHashRecordV2
[isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1,
updateCntr=5, partitionState=OWNING, size=5, partHash=821195757]]
{code}
Test (attached):
{code:java}
public void testClusterSnapshotConsistencyWithStreamer() throws Exception {
int grids = 2;
CountDownLatch loadNumberBeforeSnapshot = new CountDownLatch(60_000);
AtomicBoolean stopLoading = new AtomicBoolean(false);
dfltCacheCfg = null;
Class.forName("org.apache.ignite.IgniteJdbcDriver");
String tableName = "TEST_TBL1";
startGrids(grids);
grid(0).cluster().state(ACTIVE);
IgniteInternalFuture<?> load1 = runLoad(tableName, false, 1, true,
stopLoading, loadNumberBeforeSnapshot);
loadNumberBeforeSnapshot.await();
grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
stopLoading.set(true);
load1.get();
grid(0).cache("SQL_PUBLIC_" + tableName).destroy();
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME,
F.asList("SQL_PUBLIC_TEST_TBL1")).get();
}
/** */
private IgniteInternalFuture<?> runLoad(String tblName, boolean useCache,
int backups, boolean streaming, AtomicBoolean stop,
CountDownLatch startSnp) {
return GridTestUtils.runMultiThreadedAsync(() -> {
if(useCache) {
String cacheName = "SQL_PUBLIC_" + tblName.toUpperCase();
IgniteCache<Integer, Object> cache = grid(0)
.createCache(new CacheConfiguration<Integer,
Object>(cacheName).setBackups(backups)
.setCacheMode(CacheMode.REPLICATED));
try (IgniteDataStreamer<Integer, Object> ds =
grid(0).dataStreamer(cacheName)) {
for (int i = 0; !stop.get(); ++i) {
if (streaming)
ds.addData(i, new Account(i, i - 1));
else
cache.put(i, new Account(i, i - 1));
if (startSnp.getCount() > 0)
startSnp.countDown();
Thread.yield();
}
}
} else {
try (Connection conn =
DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/")) {
createTable(conn, tblName, backups);
try (PreparedStatement stmt = conn.prepareStatement("INSERT
INTO " + tblName +
"(id, name, orgid, dep) VALUES(?, ?, ?, ?)")) {
if (streaming)
conn.prepareStatement("SET STREAMING
ON;").execute();
int leftLimit = 97; // letter 'a'
int rightLimit = 122; // letter'z'
int targetStringLength = 15;
Random rand = new Random();
//
for (int i = 0; !stop.get(); ++i) {
int orgid = rand.ints(1, 0,
5).findFirst().getAsInt();
String val = rand.ints(leftLimit, rightLimit +
1).limit(targetStringLength)
.collect(StringBuilder::new,
StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
stmt.setInt(1, i);
stmt.setString(2, val);
stmt.setInt(3, orgid);
stmt.setInt(4, 0);
stmt.executeUpdate();
if (startSnp.getCount() > 0)
startSnp.countDown();
Thread.yield();
}
}
}
catch (Exception e) {
while (startSnp.getCount() > 0)
startSnp.countDown();
throw new IgniteException("Unable to load.", e);
}
}
}, 1, "load-thread-" + tblName);
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)