[ https://issues.apache.org/jira/browse/IGNITE-17369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17609543#comment-17609543 ]
Vladimir Steshin edited comment on IGNITE-17369 at 9/26/22 3:34 PM: -------------------------------------------------------------------- Snapshot process should warn user about possileble data corruption by streamer. Snapshot task should finish with this message and negative status. was (Author: vladsz83): Snapshot should warn user about possileble data corruption by streamer. Snapshot task should finish with this message and negative status. > Snapshot is inconsistent under streamed loading with 'allowOverwrite==false'. > ----------------------------------------------------------------------------- > > Key: IGNITE-17369 > URL: https://issues.apache.org/jira/browse/IGNITE-17369 > Project: Ignite > Issue Type: Bug > Reporter: Vladimir Steshin > Assignee: Vladimir Steshin > Priority: Major > Labels: ise, ise.lts > Attachments: IgniteClusterShanpshotStreamerTest.java > > > Ignite fails to restore snapshot created under streamed load: > {code:java} > Conflict partition: PartitionKeyV2 [grpId=109386747, > grpName=SQL_PUBLIC_TEST_TBL1, partId=148] > Partition instances: [PartitionHashRecordV2 [isPrimary=false, > consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=29, > partitionState=OWNING, size=29, partHash=827765854], PartitionHashRecordV2 > [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, > updateCntr=9, partitionState=OWNING, size=9, partHash=-1515069105]] > Conflict partition: PartitionKeyV2 [grpId=109386747, > grpName=SQL_PUBLIC_TEST_TBL1, partId=146] > Partition instances: [PartitionHashRecordV2 [isPrimary=false, > consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=28, > partitionState=OWNING, size=28, partHash=1497908810], PartitionHashRecordV2 > [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, > updateCntr=5, partitionState=OWNING, size=5, partHash=821195757]] > {code} > Test (attached): > {code:java} > public void testClusterSnapshotConsistencyWithStreamer() throws Exception > { > int grids = 2; > CountDownLatch loadNumberBeforeSnapshot = new CountDownLatch(60_000); > AtomicBoolean stopLoading = new AtomicBoolean(false); > dfltCacheCfg = null; > Class.forName("org.apache.ignite.IgniteJdbcDriver"); > String tableName = "TEST_TBL1"; > startGrids(grids); > grid(0).cluster().state(ACTIVE); > IgniteInternalFuture<?> load1 = runLoad(tableName, false, 1, true, > stopLoading, loadNumberBeforeSnapshot); > loadNumberBeforeSnapshot.await(); > grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(); > stopLoading.set(true); > load1.get(); > grid(0).cache("SQL_PUBLIC_" + tableName).destroy(); > grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, > F.asList("SQL_PUBLIC_TEST_TBL1")).get(); > } > /** */ > private IgniteInternalFuture<?> runLoad(String tblName, boolean useCache, > int backups, boolean streaming, AtomicBoolean stop, > CountDownLatch startSnp) { > return GridTestUtils.runMultiThreadedAsync(() -> { > if(useCache) { > String cacheName = "SQL_PUBLIC_" + tblName.toUpperCase(); > IgniteCache<Integer, Object> cache = grid(0) > .createCache(new CacheConfiguration<Integer, > Object>(cacheName).setBackups(backups) > .setCacheMode(CacheMode.REPLICATED)); > try (IgniteDataStreamer<Integer, Object> ds = > grid(0).dataStreamer(cacheName)) { > for (int i = 0; !stop.get(); ++i) { > if (streaming) > ds.addData(i, new Account(i, i - 1)); > else > cache.put(i, new Account(i, i - 1)); > if (startSnp.getCount() > 0) > startSnp.countDown(); > Thread.yield(); > } > } > } else { > try (Connection conn = > DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/")) { > createTable(conn, tblName, backups); > try (PreparedStatement stmt = > conn.prepareStatement("INSERT INTO " + tblName + > "(id, name, orgid, dep) VALUES(?, ?, ?, ?)")) { > if (streaming) > conn.prepareStatement("SET STREAMING > ON;").execute(); > int leftLimit = 97; // letter 'a' > int rightLimit = 122; // letter'z' > int targetStringLength = 15; > Random rand = new Random(); > // > for (int i = 0; !stop.get(); ++i) { > int orgid = rand.ints(1, 0, > 5).findFirst().getAsInt(); > String val = rand.ints(leftLimit, rightLimit + > 1).limit(targetStringLength) > .collect(StringBuilder::new, > StringBuilder::appendCodePoint, StringBuilder::append) > .toString(); > stmt.setInt(1, i); > stmt.setString(2, val); > stmt.setInt(3, orgid); > stmt.setInt(4, 0); > stmt.executeUpdate(); > if (startSnp.getCount() > 0) > startSnp.countDown(); > Thread.yield(); > } > } > } > catch (Exception e) { > while (startSnp.getCount() > 0) > startSnp.countDown(); > throw new IgniteException("Unable to load.", e); > } > } > }, 1, "load-thread-" + tblName); > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)