[ https://issues.apache.org/jira/browse/IGNITE-17369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17612278#comment-17612278 ]
Vladimir Steshin edited comment on IGNITE-17369 at 10/26/22 12:37 PM: ---------------------------------------------------------------------- Snapshot can begin work with different state of kin partitions. The shapshot process waits for the datastreamer futures. (_GridCacheMvccManager.addDataStreamerFuture()_). The problem is that these futures are created separately and concurrently on primary and backups nodes by _IsolatedUpdater_. As result, at the checkpoint some backups might be written without the primaries. And opposite. An additional problem is that streamer could fail, be canceled or lost long ago, before the snapshot. The data is already corrupted and streamer is gone, is not seen. Solutions: 4) V4 (PR 10299). We could watch streamer is already registered before snapshot and simultaneously. Cons: Client become involved in snapshot process. It answers if it has streamer registered. And is required not to left for a short moment. Doesn't solve the canceled/failed-before problem. 5) V5 (PR 10330) We could quickly check partition counters at the start stage. That would cover the case when Datastreamer failed or canceled before snapshot. Cons: Rare, but same counters do not mean same data. 6) V6 (PR 10346) We could just fully check snapshot after creating. Plus an option to the snapshot manager disabling default snapshot verification on the creation phase. Cons: The validation duration is comparable to the creation or restoration. Could be half long, almost same long or even longer. 7) V7 We can try to use Batched receiver as default. It is slower. Probably 15-30%. Seem depends significantly on the persistence, WAL performance, page rotation, etc. Default Isolated receiver was brought before persistence. This receiver might be optional and documented, emphasized as faster, but requiring success finish. Meaning no cancelation, no node failure etc. Cons: Changes default behavior of 'allowOverwrite' to true (the receiver uses rewriting putAll()). 'allowOverwrite == false' will be slow. Slower performance. Might need to care of data order with transactions (there is BatchedOrdered receiver). Others: 1) V1 (PR 10285). Watch _DataStreamer_ futures in snapshot process. The futures are created before writing streamer batch on any node. Cons: In theory, the solution is not resilient. On streamer batch could've been entirely written before snapshot. Second batch after. Including the future creation. First batch writes partition on primaries or backups. The second writes the rest. Snapshot is inconsistent. Doesn't solve the canceled/failed-before problem. 2) V2 (PR 10286). _IsolatedUpdater_ could just notify snapshot process that concurrent inconsistent updates are on. Cons: Same as V1. 3) V3 (PR 10284). Mark _DataStreamer_ is on on any first streamer batch received. And unmark somehow later. If _DataStreamer_ is marked as active, the snapshot process could check this mark. Cons: How to close such mark? When the streaming node left? Node can live forever. Send special closing request? The streamer node can do not close streamer at all. Meaning no _close()_ is invoked. Moreover, _DataStreamer_ works through _CommunicationSPI_. Which doesn't guarantee delivery. We can't be sure that the closing request is delivered and streamer is unmarked on the accepting node. Do we need to set this mark with a timeout and re-set with next datastreamer batch? Which timeout? Bind to what? On closing requests, a rebalance can happen. Should be processed too. Looks like we need a discovery closing message. Much simpler and reliable. Doesn't solve the canceled/failed-before problem. 8) V8 We may try waiting on PME, flushing the rest of the batches on streamer node before snapshot to keep consistency. Cons: Doesn't solve the canceled/failed-before problem. Also, might be nothing to wait on PME, but the data is corrupted because streamers node left, failed or streamer was canceled when snapshotting. Some batches were already sent without required kin backups or primary batches which won't be sent ever. Better option is sending only primary-holding batches to the receiver, form kin backups-holding batches on the receiver, bind all these batches to single PME-waiting-update-future on the receiver. But it is close to Batched updater except allowOverwite==true ability. Doesn't solve canceled/failed before problem. Client PME/pme-free are affected. We will wait for a node, client. In general, this solution is close to V4 (PR 10299). We ask and wait every node (including client) before snapshot. Almost the same. was (Author: vladsz83): Snapshot can begin work with different state of kin partitions. The shapshot process waits for the datastreamer futures. (_GridCacheMvccManager.addDataStreamerFuture()_). The problem is that these futures are created separately and concurrently on primary and backups nodes by _IsolatedUpdater_. As result, at the checkpoint some backups might be written without the primaries. And opposite. There are no updates accepted during checkpoint. Late streamer updates is not written to snapshotting partitions. This verification could produce a warning of active streamer. An additional problem is that streamer could fail, be canceled or lost long ago, before the snapshot. The data is already corrupted and streamer is gone, is not seen. Solutions: 4) V4 (PR 10299). We could watch streamer is already registered before snapshot and simultaneously. The problem is that we need to monitor even client at the snapshot beginning and make sure they answered whether streamer is on. We could adjust snapshot process so that it would gather client responses at the start stage. Cons: doesn't solve the canceled/failed-before-problem. 5) V5 (PR 10330) We could quickly check partition counters at the start stage. That would cover the case when Datastreamer failed or canceled before snapshot. Cons: Rare, but same counters do not mean same data. 6) V6 (PR 10346) We could just fully check snapshot after creating. Longer, but solves the counters problem. Cons: the validation duration is comparable to duration of creation or restoration. Could be half long, almost same long or even longer. 7) V7 We can try use Batched receiver as default. It is slower. Probably 15-30%. Seem depends significantly on the persistence, WAL performance, page rotation, etc. But is consistent. Default Isolated receiver was brought before persistence. This receiver might be optional and documented, emphasized as faster, but requiring success finish. Meaning no cancelation, no node failure etc. Cons: slower performance. Need to care of data order with transaction (there is BatchedOrdered receiver). Others: 1) V1 (PR 10285). Watch _DataStreamer_ futures in snapshot process. The futures are created before writing streamer batch on any node. We cannot rely on the future as on final and consistent write for streamer batch or certain entry. But we know that datastreamer is in progress at the checkpoint and that it is on pause. We can invalidate snapshot at this moment. Cons: In theory, the solution is not resilient. On streamer batch could've been entirely written before snapshot. Second batch after. First batch writes partition on primaries or backups. Second writes the rest. Snapshot is inconsistent. 2) V2 (PR 10286). _IsolatedUpdater_ could just notify snapshot process, if exists, that concurrent inconsistent update is on. A notification of at least one entry on any node wound be enough. Should work in practice. Cons: In theory, the solution is not resilient. On streamer batch could've been entirely written before snapshot. Second batch after. First batch writes partition on primaries or backups. Second writes the rest. Snapshot is inconsistent. 3) V3 (PR 10284). Mark _DataStreamer_ is on on any first streamer batch received. And unmark somehow later. If _DataStreamer_ is marked as active, the snapshot process could check this mark. Since the mark is set before writing data, it is set before the datastreamer future which is being waited for in the snapshot process. This guaranties the mark is visible before the snapshot. Cons: How to close such mark? When the streaming node left? Node can live forever. Send special closing request? The streamer node can do not close streamer at all. Meaning no _close()_ is invoked. Moreover, _DataStreamer_ works through _CommunicationSPI_. Which doesn't guarantee delivery. We can't be sure that closing request is delivered and streamer is unmarked on the accepting node. Do we need to set this mark with a timeout and re-set with next datastreamer batch? Which timeout? Bind to what? On closing requests, a rebalance can happen. Should be processed too. Looks like we need a discovery closing message. Much simpler and reliable. Also, datastreamer can be canceled. Meaning, some batches were written before snapshot. Others won't ever. 8) V8 We may try waiting on PME, flushing the rest of the batches on streamer side to keep consistency. Const: doesn't solve cancelation or node failure. Might be nothing to wait, but the data is corrupted. Doesn't solve canceled/failed before problem. Client PME/pme-free are affected. We will wait for a node, client. In general, this solution is close to V4 (PR 10299). We ask and wait every node (including client) before snapshot. Almost the same. > Snapshot is inconsistent under streamed loading with 'allowOverwrite==false'. > ----------------------------------------------------------------------------- > > Key: IGNITE-17369 > URL: https://issues.apache.org/jira/browse/IGNITE-17369 > Project: Ignite > Issue Type: Bug > Reporter: Vladimir Steshin > Assignee: Vladimir Steshin > Priority: Major > Labels: ise, ise.lts > Attachments: IgniteClusterShanpshotStreamerTest.java > > Time Spent: 10m > Remaining Estimate: 0h > > Ignite fails to restore snapshot created under streamed load: > {code:java} > Conflict partition: PartitionKeyV2 [grpId=109386747, > grpName=SQL_PUBLIC_TEST_TBL1, partId=148] > Partition instances: [PartitionHashRecordV2 [isPrimary=false, > consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=29, > partitionState=OWNING, size=29, partHash=827765854], PartitionHashRecordV2 > [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, > updateCntr=9, partitionState=OWNING, size=9, partHash=-1515069105]] > Conflict partition: PartitionKeyV2 [grpId=109386747, > grpName=SQL_PUBLIC_TEST_TBL1, partId=146] > Partition instances: [PartitionHashRecordV2 [isPrimary=false, > consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=28, > partitionState=OWNING, size=28, partHash=1497908810], PartitionHashRecordV2 > [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, > updateCntr=5, partitionState=OWNING, size=5, partHash=821195757]] > {code} > Test (attached): > {code:java} > public void testClusterSnapshotConsistencyWithStreamer() throws Exception > { > int grids = 2; > CountDownLatch loadNumberBeforeSnapshot = new CountDownLatch(60_000); > AtomicBoolean stopLoading = new AtomicBoolean(false); > dfltCacheCfg = null; > Class.forName("org.apache.ignite.IgniteJdbcDriver"); > String tableName = "TEST_TBL1"; > startGrids(grids); > grid(0).cluster().state(ACTIVE); > IgniteInternalFuture<?> load1 = runLoad(tableName, false, 1, true, > stopLoading, loadNumberBeforeSnapshot); > loadNumberBeforeSnapshot.await(); > grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(); > stopLoading.set(true); > load1.get(); > grid(0).cache("SQL_PUBLIC_" + tableName).destroy(); > grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, > F.asList("SQL_PUBLIC_TEST_TBL1")).get(); > } > /** */ > private IgniteInternalFuture<?> runLoad(String tblName, boolean useCache, > int backups, boolean streaming, AtomicBoolean stop, > CountDownLatch startSnp) { > return GridTestUtils.runMultiThreadedAsync(() -> { > if(useCache) { > String cacheName = "SQL_PUBLIC_" + tblName.toUpperCase(); > IgniteCache<Integer, Object> cache = grid(0) > .createCache(new CacheConfiguration<Integer, > Object>(cacheName).setBackups(backups) > .setCacheMode(CacheMode.REPLICATED)); > try (IgniteDataStreamer<Integer, Object> ds = > grid(0).dataStreamer(cacheName)) { > for (int i = 0; !stop.get(); ++i) { > if (streaming) > ds.addData(i, new Account(i, i - 1)); > else > cache.put(i, new Account(i, i - 1)); > if (startSnp.getCount() > 0) > startSnp.countDown(); > Thread.yield(); > } > } > } else { > try (Connection conn = > DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/")) { > createTable(conn, tblName, backups); > try (PreparedStatement stmt = > conn.prepareStatement("INSERT INTO " + tblName + > "(id, name, orgid, dep) VALUES(?, ?, ?, ?)")) { > if (streaming) > conn.prepareStatement("SET STREAMING > ON;").execute(); > int leftLimit = 97; // letter 'a' > int rightLimit = 122; // letter'z' > int targetStringLength = 15; > Random rand = new Random(); > // > for (int i = 0; !stop.get(); ++i) { > int orgid = rand.ints(1, 0, > 5).findFirst().getAsInt(); > String val = rand.ints(leftLimit, rightLimit + > 1).limit(targetStringLength) > .collect(StringBuilder::new, > StringBuilder::appendCodePoint, StringBuilder::append) > .toString(); > stmt.setInt(1, i); > stmt.setString(2, val); > stmt.setInt(3, orgid); > stmt.setInt(4, 0); > stmt.executeUpdate(); > if (startSnp.getCount() > 0) > startSnp.countDown(); > Thread.yield(); > } > } > } > catch (Exception e) { > while (startSnp.getCount() > 0) > startSnp.countDown(); > throw new IgniteException("Unable to load.", e); > } > } > }, 1, "load-thread-" + tblName); > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)