[ 
https://issues.apache.org/jira/browse/IGNITE-17369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17612278#comment-17612278
 ] 

Vladimir Steshin edited comment on IGNITE-17369 at 10/26/22 12:41 PM:
----------------------------------------------------------------------

Snapshot can begin work with different state of kin partitions. The shapshot 
process waits for the datastreamer futures. 
(_GridCacheMvccManager.addDataStreamerFuture()_). The problem is that these 
futures are created separately and concurrently on primary and backups nodes by 
_IsolatedUpdater_. As result, at the checkpoint some backups might be written 
without the primaries. And opposite. 
An additional problem is that streamer could fail, be canceled or lost long 
ago, before the snapshot. The data is already corrupted and streamer is gone, 
is not seen.

Solutions:

4) V4 (PR 10299).
We could watch streamer is already registered before snapshot and 
simultaneously.
Cons: 
Client become involved in snapshot process. It answers if it has streamer 
registered. And is required not to left for a short moment.
Doesn't solve the canceled/failed-before problem. 

5) V5 (PR 10330)
We could quickly check partition counters at the start stage. That would cover 
the case when Datastreamer failed or canceled before snapshot. 
Cons: 
Rare, but same counters do not mean same data.

6) V6 (PR 10346) 
We could just fully check snapshot after creating. Plus an option to the 
snapshot manager disabling default snapshot verification on the creation phase.
Cons: 
The validation duration is comparable to the creation or restoration. Could be 
half long, almost same long or even longer.

7) V7
We can try to use Batched receiver as default. It is slower. Probably 15-30%. 
Seem depends significantly on the persistence, WAL performance, page rotation, 
etc. Default Isolated receiver was brought before persistence. This receiver 
might be optional and documented, emphasized as faster, but requiring success 
finish. Meaning no cancelation, no node failure etc.
Cons: 
Changes default behavior of 'allowOverwrite' to true (the receiver uses 
rewriting _putAll()_). 'allowOverwrite == false' will be slow. Slower 
performance. Might need to care of data order with transactions (there is 
_BatchedOrdered_ receiver). 


Others:

1) V1 (PR 10285). 
Watch _DataStreamer_ futures in snapshot process. The futures are created 
before writing streamer batch on any node. 
Cons: 
In theory, the solution is not resilient. On streamer batch could've been 
entirely written before snapshot. Second batch after. Including the future 
creation. First batch writes partition on primaries or backups. The second 
writes the rest. Snapshot is inconsistent.
Doesn't solve the canceled/failed-before problem.

2) V2 (PR 10286).
_IsolatedUpdater_ could just notify snapshot process that concurrent 
inconsistent updates are on. 
Cons: 
Same as V1.

3) V3 (PR 10284).
Mark _DataStreamer_ is on on any first streamer batch received. And unmark 
somehow later. If _DataStreamer_ is marked as active, the snapshot process 
could check this mark. 
Cons: 
How to close such mark? When the streaming node left? Node can live forever. 
Send special closing request? The streamer node can do not close streamer at 
all. Meaning no _close()_ is invoked. Moreover, _DataStreamer_ works through 
_CommunicationSPI_. Which doesn't guarantee delivery. We can't be sure that the 
closing request is delivered and streamer is unmarked on the accepting node. Do 
we need to set this mark with a timeout and re-set with next datastreamer 
batch? Which timeout? Bind to what? 
On closing requests, a rebalance can happen. Should be processed too. Looks 
like we need a discovery closing message. Much simpler and reliable. 
Doesn't solve the canceled/failed-before problem.

8) V8
We may try waiting on PME, flushing the rest of the batches on streamer node 
before snapshot to keep consistency.
Cons: 
Doesn't solve the canceled/failed-before problem. 
Also, might be nothing to wait on PME, but the data is corrupted because 
streamers node left, failed or streamer was canceled when snapshotting. Some 
batches were already sent without required kin backups or primary batches which 
won't be sent ever.
Better option is sending only primary-holding batches to the receiver, form kin 
backups-holding batches on the receiver, bind all these batches to single 
PME-waiting-update-future on the receiver.
But it is close to _Batched_ updater except _allowOverwite==false_ ability.
Doesn't solve canceled/failed before problem. Client PME/pme-free are affected. 
We will wait for a node, client. In general, this solution is close to V4 (PR 
10299). We ask and wait every node (including client) before snapshot. Almost 
the same.


was (Author: vladsz83):
Snapshot can begin work with different state of kin partitions. The shapshot 
process waits for the datastreamer futures. 
(_GridCacheMvccManager.addDataStreamerFuture()_). The problem is that these 
futures are created separately and concurrently on primary and backups nodes by 
_IsolatedUpdater_. As result, at the checkpoint some backups might be written 
without the primaries. And opposite. 
An additional problem is that streamer could fail, be canceled or lost long 
ago, before the snapshot. The data is already corrupted and streamer is gone, 
is not seen.

Solutions:

4) V4 (PR 10299).
We could watch streamer is already registered before snapshot and 
simultaneously.
Cons: 
Client become involved in snapshot process. It answers if it has streamer 
registered. And is required not to left for a short moment.
Doesn't solve the canceled/failed-before problem. 

5) V5 (PR 10330)
We could quickly check partition counters at the start stage. That would cover 
the case when Datastreamer failed or canceled before snapshot. 
Cons: 
Rare, but same counters do not mean same data.

6) V6 (PR 10346) 
We could just fully check snapshot after creating. Plus an option to the 
snapshot manager disabling default snapshot verification on the creation phase.
Cons: 
The validation duration is comparable to the creation or restoration. Could be 
half long, almost same long or even longer.

7) V7
We can try to use Batched receiver as default. It is slower. Probably 15-30%. 
Seem depends significantly on the persistence, WAL performance, page rotation, 
etc. Default Isolated receiver was brought before persistence. This receiver 
might be optional and documented, emphasized as faster, but requiring success 
finish. Meaning no cancelation, no node failure etc.
Cons: 
Changes default behavior of 'allowOverwrite' to true (the receiver uses 
rewriting putAll()). 'allowOverwrite == false' will be slow. Slower 
performance. Might need to care of data order with transactions (there is 
BatchedOrdered receiver). 


Others:

1) V1 (PR 10285). 
Watch _DataStreamer_ futures in snapshot process. The futures are created 
before writing streamer batch on any node. 
Cons: 
In theory, the solution is not resilient. On streamer batch could've been 
entirely written before snapshot. Second batch after. Including the future 
creation. First batch writes partition on primaries or backups. The second 
writes the rest. Snapshot is inconsistent.
Doesn't solve the canceled/failed-before problem.

2) V2 (PR 10286).
_IsolatedUpdater_ could just notify snapshot process that concurrent 
inconsistent updates are on. 
Cons: 
Same as V1.

3) V3 (PR 10284).
Mark _DataStreamer_ is on on any first streamer batch received. And unmark 
somehow later. If _DataStreamer_ is marked as active, the snapshot process 
could check this mark. 
Cons: 
How to close such mark? When the streaming node left? Node can live forever. 
Send special closing request? The streamer node can do not close streamer at 
all. Meaning no _close()_ is invoked. Moreover, _DataStreamer_ works through 
_CommunicationSPI_. Which doesn't guarantee delivery. We can't be sure that the 
closing request is delivered and streamer is unmarked on the accepting node. Do 
we need to set this mark with a timeout and re-set with next datastreamer 
batch? Which timeout? Bind to what? 
On closing requests, a rebalance can happen. Should be processed too. Looks 
like we need a discovery closing message. Much simpler and reliable. 
Doesn't solve the canceled/failed-before problem.

8) V8
We may try waiting on PME, flushing the rest of the batches on streamer node 
before snapshot to keep consistency.
Cons: 
Doesn't solve the canceled/failed-before problem. 
Also, might be nothing to wait on PME, but the data is corrupted because 
streamers node left, failed or streamer was canceled when snapshotting. Some 
batches were already sent without required kin backups or primary batches which 
won't be sent ever.
Better option is sending only primary-holding batches to the receiver, form kin 
backups-holding batches on the receiver, bind all these batches to single 
PME-waiting-update-future on the receiver.
But it is close to Batched updater except allowOverwite==true ability.
Doesn't solve canceled/failed before problem. Client PME/pme-free are affected. 
We will wait for a node, client. In general, this solution is close to V4 (PR 
10299). We ask and wait every node (including client) before snapshot. Almost 
the same.

> Snapshot is inconsistent under streamed loading with 'allowOverwrite==false'.
> -----------------------------------------------------------------------------
>
>                 Key: IGNITE-17369
>                 URL: https://issues.apache.org/jira/browse/IGNITE-17369
>             Project: Ignite
>          Issue Type: Bug
>            Reporter: Vladimir Steshin
>            Assignee: Vladimir Steshin
>            Priority: Major
>              Labels: ise, ise.lts
>         Attachments: IgniteClusterShanpshotStreamerTest.java
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Ignite fails to restore snapshot created under streamed load:
> {code:java}
> Conflict partition: PartitionKeyV2 [grpId=109386747, 
> grpName=SQL_PUBLIC_TEST_TBL1, partId=148]
> Partition instances: [PartitionHashRecordV2 [isPrimary=false, 
> consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=29, 
> partitionState=OWNING, size=29, partHash=827765854], PartitionHashRecordV2 
> [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, 
> updateCntr=9, partitionState=OWNING, size=9, partHash=-1515069105]]
> Conflict partition: PartitionKeyV2 [grpId=109386747, 
> grpName=SQL_PUBLIC_TEST_TBL1, partId=146]
> Partition instances: [PartitionHashRecordV2 [isPrimary=false, 
> consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=28, 
> partitionState=OWNING, size=28, partHash=1497908810], PartitionHashRecordV2 
> [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, 
> updateCntr=5, partitionState=OWNING, size=5, partHash=821195757]]
> {code}
> Test (attached):
> {code:java}
>     public void testClusterSnapshotConsistencyWithStreamer() throws Exception 
> {
>         int grids = 2;
>         CountDownLatch loadNumberBeforeSnapshot = new CountDownLatch(60_000);
>         AtomicBoolean stopLoading = new AtomicBoolean(false);
>         dfltCacheCfg = null;
>         Class.forName("org.apache.ignite.IgniteJdbcDriver");
>         String tableName = "TEST_TBL1";
>         startGrids(grids);
>         grid(0).cluster().state(ACTIVE);
>         IgniteInternalFuture<?> load1 = runLoad(tableName, false, 1, true, 
> stopLoading, loadNumberBeforeSnapshot);
>         loadNumberBeforeSnapshot.await();
>         grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
>         stopLoading.set(true);
>         load1.get();
>         grid(0).cache("SQL_PUBLIC_" + tableName).destroy();
>         grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, 
> F.asList("SQL_PUBLIC_TEST_TBL1")).get();
>     }
>     /** */
>     private IgniteInternalFuture<?> runLoad(String tblName, boolean useCache, 
> int backups, boolean streaming, AtomicBoolean stop,
>         CountDownLatch startSnp) {
>         return GridTestUtils.runMultiThreadedAsync(() -> {
>             if(useCache) {
>                 String cacheName = "SQL_PUBLIC_" + tblName.toUpperCase();
>                 IgniteCache<Integer, Object> cache = grid(0)
>                     .createCache(new CacheConfiguration<Integer, 
> Object>(cacheName).setBackups(backups)
>                         .setCacheMode(CacheMode.REPLICATED));
>                 try (IgniteDataStreamer<Integer, Object> ds = 
> grid(0).dataStreamer(cacheName)) {
>                     for (int i = 0; !stop.get(); ++i) {
>                         if (streaming)
>                             ds.addData(i, new Account(i, i - 1));
>                         else
>                             cache.put(i, new Account(i, i - 1));
>                         if (startSnp.getCount() > 0)
>                             startSnp.countDown();
>                         Thread.yield();
>                     }
>                 }
>             } else {
>                 try (Connection conn = 
> DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/")) {
>                     createTable(conn, tblName, backups);
>                     try (PreparedStatement stmt = 
> conn.prepareStatement("INSERT INTO " + tblName +
>                         "(id, name, orgid, dep) VALUES(?, ?, ?, ?)")) {
>                         if (streaming)
>                             conn.prepareStatement("SET STREAMING 
> ON;").execute();
>                         int leftLimit = 97; // letter 'a'
>                         int rightLimit = 122; // letter'z'
>                         int targetStringLength = 15;
>                         Random rand = new Random();
> //
>                         for (int i = 0; !stop.get(); ++i) {
>                             int orgid = rand.ints(1, 0, 
> 5).findFirst().getAsInt();
>                             String val = rand.ints(leftLimit, rightLimit + 
> 1).limit(targetStringLength)
>                                 .collect(StringBuilder::new, 
> StringBuilder::appendCodePoint, StringBuilder::append)
>                                 .toString();
>                             stmt.setInt(1, i);
>                             stmt.setString(2, val);
>                             stmt.setInt(3, orgid);
>                             stmt.setInt(4, 0);
>                             stmt.executeUpdate();
>                             if (startSnp.getCount() > 0)
>                                 startSnp.countDown();
>                             Thread.yield();
>                         }
>                     }
>                 }
>                 catch (Exception e) {
>                     while (startSnp.getCount() > 0)
>                         startSnp.countDown();
>                     throw new IgniteException("Unable to load.", e);
>                 }
>             }
>         }, 1, "load-thread-" + tblName);
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to