[ 
https://issues.apache.org/jira/browse/IGNITE-17369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17612278#comment-17612278
 ] 

Vladimir Steshin edited comment on IGNITE-17369 at 10/26/22 11:15 AM:
----------------------------------------------------------------------

Snapshot can begin work with different state of kin partitions. The shapshot 
process waits for the datastreamer futures. 
(_GridCacheMvccManager.addDataStreamerFuture()_). The problem is that these 
futures are created separately and concurrently on primary and backups nodes by 
_IsolatedUpdater_. As result, at the checkpoint some backups might be written 
without the primaries. And opposite. There are no updates accepted during 
checkpoint. Late streamer updates is not written to snapshotting partitions. 
This verification could produce a warning of active streamer.

An additional problem is that streamer could fail, be canceled or lost long 
ago, before the snapshot. The data is already corrupted and streamer is gone, 
is not seen.

Solutions:

4) V4 (PR 10299).
We could watch streamer is already registered before snapshot and 
simultaneously. The problem is that we need to monitor even client at the 
snapshot beginning and make sure they answered whether streamer is on. We could 
adjust snapshot process so that it would gather client responses at the start 
stage. 
Cons: doesn't solve the canceled/failed-before-problem.

5) V5 (PR 10330)
We could quickly check partition counters at the start stage. That would cover 
the case when Datastreamer failed or canceled before snapshot. 
Cons: Rare, but same counters do not mean same data.

6) V6 (PR 10346) 
We could just fully check snapshot after creating. Longer, but solves the 
counters problem. 
Cons: the validation duration is comparable to duration of creation or 
restoration. Could be half long, almost same long or even longer.

7) V7
We can try use Batched receiver as default. It is slower. Probably 15-30%. Seem 
depends significantly on the persistence, WAL performance, page rotation, etc. 
But is consistent. Default Isolated receiver was brought before persistence. 
This receiver might be optional and documented, emphasized as faster, but 
requiring success finish. Meaning no cancelation, no node failure etc.
Cons: slower performance. Need to care of data order with transaction (there is 
BatchedOrdered receiver). 


Others:

1) V1 (PR 10285). 
Watch _DataStreamer_ futures in snapshot process. The futures are created 
before writing streamer batch on any node. We cannot rely on the future as on 
final and consistent write for streamer batch or certain entry. But we know 
that datastreamer is in progress at the checkpoint and that it is on pause. We 
can invalidate snapshot at this moment.
Cons: In theory, the solution is not resilient. On streamer batch could've been 
entirely written before snapshot. Second batch after. First batch writes 
partition on primaries or backups. Second writes the rest. Snapshot is 
inconsistent.

2) V2 (PR 10286).
_IsolatedUpdater_ could just notify snapshot process, if exists, that 
concurrent inconsistent update is on. A notification of at least one entry on 
any node wound be enough. Should work in practice. Cons: In theory, the 
solution is not resilient. On streamer batch could've been entirely written 
before snapshot. Second batch after. First batch writes partition on primaries 
or backups. Second writes the rest. Snapshot is inconsistent.

3) V3 (PR 10284).
Mark _DataStreamer_ is on on any first streamer batch received. And unmark 
somehow later. If _DataStreamer_ is marked as active, the snapshot process 
could check this mark. Since the mark is set before writing data, it is set 
before the datastreamer future which is being waited for in the snapshot 
process. This guaranties the mark is visible before the snapshot.

Cons: How to close such mark? When the streaming node left? Node can live 
forever. Send special closing request? The streamer node can do not close 
streamer at all. Meaning no _close()_ is invoked. Moreover, _DataStreamer_ 
works through _CommunicationSPI_. Which doesn't guarantee delivery. We can't be 
sure that closing request is delivered and streamer is unmarked on the 
accepting node. Do we need to set this mark with a timeout and re-set with next 
datastreamer batch? Which timeout? Bind to what? 
On closing requests, a rebalance can happen. Should be processed too. Looks 
like we need a discovery closing message. Much simpler and reliable. 
Also, datastreamer can be canceled. Meaning, some batches were written before 
snapshot. Others won't ever. 

8) V8
We may try waiting on PME, flushing the rest of the batches on streamer side to 
keep consistency.
Const: doesn't solve cancelation or node failure. Might be nothing to wait, but 
the data is corrupted. Doesn't solve canceled/failed before problem. Client 
PME/pme-free are affected. 
We will wait for a node, client. In general, this solution is close to V4 (PR 
10299). We ask and wait every node (including client) before snapshot. Almost 
the same.


was (Author: vladsz83):
Snapshot can begin work with different state of kin partitions. The shapshot 
process waits for the datastreamer futures. 
(_GridCacheMvccManager.addDataStreamerFuture()_). The problem is that these 
futures are created separately and concurrently on primary and backups nodes by 
_IsolatedUpdater_. As result, at the checkpoint some backups might be written 
without the primaries. And opposite. There are no updates accepted during 
checkpoint. Late streamer updates is not written to snapshotting partitions. 
This verification could produce a warning of active streamer.

An additional problem is that streamer could fail, be canceled or lost long 
ago, before the snapshot. The data is already corrupted and streamer is gone, 
is not seen.

Solutions:

4) V4 (PR 10299).
We could watch streamer is already registered before snapshot and 
simultaneously. The problem is that we need to monitor even client at the 
snapshot beginning and make sure they answered whether streamer is on. We could 
adjust snapshot process so that it would gather client responses at the start 
stage. The process already has snapshot verification routines. 

5) V5 (PR 10330)
We could quickly check partition counters at the start stage. That would cover 
the case when Datastreamer failed or canceled before snapshot. But same 
counters do not mean same data.

6) V6 (PR 10346) 
We could just check snapshot after creating. Longer, but solves the counters 
problem. Cons is snapshot validation is comparable to the creation or 
restoration. Could be half long, almost same long or even longer.

7) V7
We can try use Batched receiver as default. It is slower. Probably 15-30%. Seem 
depends significantly on the persistence, WAL performance, page rotation, etc. 
But is consistent. Default Isolated receiver was brought before persistence. 
This receiver might be optional and documented, emphasized as faster, but 
requiring success finish. Meaning no cancelation, no node failure etc.


Others:

1) V1 (PR 10285). 
PR brings watching _DataStreamer_ futures in snapshot process. The futures are 
created before writing streamer batch on any node. We cannot rely on the future 
as on final and consistent write for streamer batch or certain entry. But we 
know that datastreamer is in progress at the checkpoint and that it is on 
pause. We can invalidate snapshot at this moment.
In theory, the solution is not resilient. On streamer batch could've been 
entirely written before snapshot. Second batch after. First batch writes 
partition on primaries or backups. Second writes the rest. Snapshot is 
inconsistent.

2) V2 (PR 10286).
_IsolatedUpdater_ could just notify snapshot process, if exists, that 
concurrent inconsistent update is on. A notification of at least one entry on 
any node wound be enough. Should work in practice. In theory, the solution is 
not resilient. On streamer batch could've been entirely written before 
snapshot. Second batch after. First batch writes partition on primaries or 
backups. Second writes the rest. Snapshot is inconsistent.

3) V3 (PR 10284).
We could mark that _DataStreamer_ is on on any first streamer batch received. 
And unmark somehow later. If _DataStreamer_ is marked as active, the snapshot 
process could check this mark. Since the mark is set before writing data, it is 
set before the datastreamer future which is being waited for in the snapshot 
process. This guaranties the mark is visible before the snapshot.

The problem is how to close such mark. When the streaming node left? Node can 
live forever. Send special closing request? The streamer node can do not close 
streamer at all. Meaning no _close()_ is invoked. Moreover, _DataStreamer_ 
works through _CommunicationSPI_. Which doesn't guarantee delivery. We can't be 
sure that closing request is delivered and streamer is unmarked on the 
accepting node. Do we need to set this mark with a timeout and re-set with next 
datastreamer batch? Which timeout? Bind to what? 
On closing requests, a rebalance can happen. Should be processed too. Looks 
like we need a discovery closing message. Much simpler and reliable. 
Also, datastreamer can be canceled. Meaning, some batches were written before 
snapshot. Others won't ever. 

8) V8
We may try waiting on PME, flushing the rest of the batches on streamer side to 
keep consistency.
Const: doesn't solve cancelation or node failure. Might be nothing to wait, but 
the data is corrupted. Doesn't solve canceled/failed before problem. Client 
PME/pme-free are affected. 
We will wait for a node, client. In general, this solution is close to V4 (PR 
10299). We ask and wait every node (including client) before snapshot. Almost 
the same.

> Snapshot is inconsistent under streamed loading with 'allowOverwrite==false'.
> -----------------------------------------------------------------------------
>
>                 Key: IGNITE-17369
>                 URL: https://issues.apache.org/jira/browse/IGNITE-17369
>             Project: Ignite
>          Issue Type: Bug
>            Reporter: Vladimir Steshin
>            Assignee: Vladimir Steshin
>            Priority: Major
>              Labels: ise, ise.lts
>         Attachments: IgniteClusterShanpshotStreamerTest.java
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Ignite fails to restore snapshot created under streamed load:
> {code:java}
> Conflict partition: PartitionKeyV2 [grpId=109386747, 
> grpName=SQL_PUBLIC_TEST_TBL1, partId=148]
> Partition instances: [PartitionHashRecordV2 [isPrimary=false, 
> consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=29, 
> partitionState=OWNING, size=29, partHash=827765854], PartitionHashRecordV2 
> [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, 
> updateCntr=9, partitionState=OWNING, size=9, partHash=-1515069105]]
> Conflict partition: PartitionKeyV2 [grpId=109386747, 
> grpName=SQL_PUBLIC_TEST_TBL1, partId=146]
> Partition instances: [PartitionHashRecordV2 [isPrimary=false, 
> consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=28, 
> partitionState=OWNING, size=28, partHash=1497908810], PartitionHashRecordV2 
> [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, 
> updateCntr=5, partitionState=OWNING, size=5, partHash=821195757]]
> {code}
> Test (attached):
> {code:java}
>     public void testClusterSnapshotConsistencyWithStreamer() throws Exception 
> {
>         int grids = 2;
>         CountDownLatch loadNumberBeforeSnapshot = new CountDownLatch(60_000);
>         AtomicBoolean stopLoading = new AtomicBoolean(false);
>         dfltCacheCfg = null;
>         Class.forName("org.apache.ignite.IgniteJdbcDriver");
>         String tableName = "TEST_TBL1";
>         startGrids(grids);
>         grid(0).cluster().state(ACTIVE);
>         IgniteInternalFuture<?> load1 = runLoad(tableName, false, 1, true, 
> stopLoading, loadNumberBeforeSnapshot);
>         loadNumberBeforeSnapshot.await();
>         grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
>         stopLoading.set(true);
>         load1.get();
>         grid(0).cache("SQL_PUBLIC_" + tableName).destroy();
>         grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, 
> F.asList("SQL_PUBLIC_TEST_TBL1")).get();
>     }
>     /** */
>     private IgniteInternalFuture<?> runLoad(String tblName, boolean useCache, 
> int backups, boolean streaming, AtomicBoolean stop,
>         CountDownLatch startSnp) {
>         return GridTestUtils.runMultiThreadedAsync(() -> {
>             if(useCache) {
>                 String cacheName = "SQL_PUBLIC_" + tblName.toUpperCase();
>                 IgniteCache<Integer, Object> cache = grid(0)
>                     .createCache(new CacheConfiguration<Integer, 
> Object>(cacheName).setBackups(backups)
>                         .setCacheMode(CacheMode.REPLICATED));
>                 try (IgniteDataStreamer<Integer, Object> ds = 
> grid(0).dataStreamer(cacheName)) {
>                     for (int i = 0; !stop.get(); ++i) {
>                         if (streaming)
>                             ds.addData(i, new Account(i, i - 1));
>                         else
>                             cache.put(i, new Account(i, i - 1));
>                         if (startSnp.getCount() > 0)
>                             startSnp.countDown();
>                         Thread.yield();
>                     }
>                 }
>             } else {
>                 try (Connection conn = 
> DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/")) {
>                     createTable(conn, tblName, backups);
>                     try (PreparedStatement stmt = 
> conn.prepareStatement("INSERT INTO " + tblName +
>                         "(id, name, orgid, dep) VALUES(?, ?, ?, ?)")) {
>                         if (streaming)
>                             conn.prepareStatement("SET STREAMING 
> ON;").execute();
>                         int leftLimit = 97; // letter 'a'
>                         int rightLimit = 122; // letter'z'
>                         int targetStringLength = 15;
>                         Random rand = new Random();
> //
>                         for (int i = 0; !stop.get(); ++i) {
>                             int orgid = rand.ints(1, 0, 
> 5).findFirst().getAsInt();
>                             String val = rand.ints(leftLimit, rightLimit + 
> 1).limit(targetStringLength)
>                                 .collect(StringBuilder::new, 
> StringBuilder::appendCodePoint, StringBuilder::append)
>                                 .toString();
>                             stmt.setInt(1, i);
>                             stmt.setString(2, val);
>                             stmt.setInt(3, orgid);
>                             stmt.setInt(4, 0);
>                             stmt.executeUpdate();
>                             if (startSnp.getCount() > 0)
>                                 startSnp.countDown();
>                             Thread.yield();
>                         }
>                     }
>                 }
>                 catch (Exception e) {
>                     while (startSnp.getCount() > 0)
>                         startSnp.countDown();
>                     throw new IgniteException("Unable to load.", e);
>                 }
>             }
>         }, 1, "load-thread-" + tblName);
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to