[
https://issues.apache.org/jira/browse/IGNITE-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pavel Pereslegin updated IGNITE-12605:
--------------------------------------
Ignite Flags: (was: Docs Required,Release Notes Required)
Environment:
was:
> Historical (WAL) rebalance can start on a cleared partition if some baseline
> node leaves the cluster and then joins back.
> -------------------------------------------------------------------------------------------------------------------------
>
> Key: IGNITE-12605
> URL: https://issues.apache.org/jira/browse/IGNITE-12605
> Project: Ignite
> Issue Type: Bug
> Affects Versions: 2.9
> Environment:
> Reporter: Pavel Pereslegin
> Assignee: Pavel Pereslegin
> Priority: Major
>
> On partition map exchange initiated by baseline node leaving, the historical
> supplier is not provided in the full message (assignPartitionStates() isn't
> called when a node leaves).
> Since we don't have a historical supplier "historical" partition scheduled
> for clearing, then when a node joins back assignPartitionStates() is called
> and we have a supplier for historical rebalance, but partition may be cleared
> already.
> After such rebalance we have inconsistent partitions on a "historically
> rebalanced" node (with consistent partition counters and state).
> "Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but
> this issue can be "unstable" reproduced without it (see attachment)).
> Reproducer shows the following errors.
> Error 1 (partitions have been cleared).
> {noformat}
> java.lang.AssertionError:
> |------|-----------------------|
> | | entries count |
> | part |-----------------------|
> | | node1 | node2 | node3 |
> |------|-----------------------|
> | 0 | 6250 | 6250 | 3125 |
> | 1 | 6250 | 6250 | 3125 |
> | 2 | 6250 | 6250 | 3125 |
> ...
> | 31 | 6250 | 6250 | 3125 |
> |------|-------|-------|-------|
> {noformat}
> Error 2 (should be investigated deeply).
> {noformat}
> java.lang.AssertionError: Reached end of WAL but not all partitions are done
> at
> org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.advance(GridCacheOffheapManager.java:1419)
> at
> org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.next(GridCacheOffheapManager.java:1295)
> at
> org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1255)
> at
> org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1163)
> at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.nextX(IgniteRebalanceIteratorImpl.java:135)
> at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.next(IgniteRebalanceIteratorImpl.java:215)
> at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.peek(IgniteRebalanceIteratorImpl.java:155)
> at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier.handleDemandMessage(GridDhtPartitionSupplier.java:316)
> at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.lambda$handleDemandMessage$1(GridDhtPreloader.java:374)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {noformat}
>
> Reproducer:
> {code:java}
> import java.util.ArrayList;
> import java.util.LinkedHashSet;
> import java.util.List;
> import java.util.Set;
> import java.util.concurrent.CountDownLatch;
> import org.apache.ignite.Ignite;
> import org.apache.ignite.IgniteCache;
> import org.apache.ignite.IgniteDataStreamer;
> import org.apache.ignite.cache.CacheAtomicityMode;
> import org.apache.ignite.cache.CacheMode;
> import org.apache.ignite.cache.CacheRebalanceMode;
> import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
> import org.apache.ignite.cluster.ClusterNode;
> import org.apache.ignite.cluster.ClusterState;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.DataRegionConfiguration;
> import org.apache.ignite.configuration.DataStorageConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.configuration.WALMode;
> import org.apache.ignite.internal.IgniteEx;
> import org.apache.ignite.internal.TestRecordingCommunicationSpi;
> import
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
> import
> org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
> import
> org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
> import org.apache.ignite.internal.util.typedef.G;
> import org.apache.ignite.internal.util.typedef.P2;
> import org.apache.ignite.internal.util.typedef.internal.CU;
> import org.apache.ignite.internal.util.typedef.internal.SB;
> import org.apache.ignite.internal.util.typedef.internal.U;
> import org.apache.ignite.plugin.extensions.communication.Message;
> import org.apache.ignite.testframework.GridTestUtils;
> import org.apache.ignite.testframework.junits.WithSystemProperty;
> import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
> import org.junit.Test;
> import static
> org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
> /** */
> public class WalRebalanceOnCleanPartitionReproducer extends
> GridCommonAbstractTest {
> /** Block predicate. */
> private P2<ClusterNode, Message> blockPred;
> /** {@inheritDoc} */
> @Override protected IgniteConfiguration getConfiguration(String gridName)
> throws Exception {
> IgniteConfiguration cfg = super.getConfiguration(gridName);
> cfg.setConsistentId(gridName);
> cfg.setRebalanceThreadPoolSize(1);
> CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME)
> .setCacheMode(CacheMode.PARTITIONED)
> .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
> .setBackups(2)
> .setAffinity(new RendezvousAffinityFunction(false, 32))
> .setRebalanceMode(CacheRebalanceMode.ASYNC);
> cfg.setCacheConfiguration(ccfg1);
> TestRecordingCommunicationSpi commSpi = new
> TestRecordingCommunicationSpi();
> commSpi.blockMessages(blockPred);
> cfg.setCommunicationSpi(commSpi);
> DataStorageConfiguration dsCfg = new DataStorageConfiguration()
> .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() *
> 4)
> .setCheckpointFrequency(5_000)
> .setWalMode(WALMode.LOG_ONLY)
> .setPageSize(1024)
> .setWalSegmentSize(8 * 1024 * 1024)
> .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
> .setName("dfltDataRegion")
> .setPersistenceEnabled(true)
> .setMaxSize(512 * 1024 * 1024)
> );
> cfg.setDataStorageConfiguration(dsCfg);
> return cfg;
> }
> /** {@inheritDoc} */
> @Override protected void beforeTestsStarted() throws Exception {
> stopAllGrids();
> cleanPersistenceDir();
> super.beforeTestsStarted();
> }
> /**
> *
> */
> @Test
> @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
> public void testHistoricalRebalanceNotStartsAfterNodeLeft() throws
> Exception {
> IgniteEx crd = startGrid(0);
> crd.cluster().state(ClusterState.ACTIVE);
> crd.cluster().baselineAutoAdjustEnabled(false);
> Ignite node1 = startGrid(1);
> Ignite node2 = startGrid(2);
> List<ClusterNode> blt = new
> ArrayList<>(crd.context().discovery().aliveServerNodes());
> crd.cluster().setBaselineTopology(blt);
> IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME);
> System.out.println(">>> load 100k entries");
> loadData(cache0, 0, 100_000);
> forceCheckpoint();
> System.out.println(">>> stop node 2");
> node2.close();
> awaitPartitionMapExchange();
> System.out.println(">>> load 100k entries again");
> loadData(cache0, 100_000, 100_000);
> CountDownLatch startLatch = new CountDownLatch(1);
> blockPred = (node, msg) -> {
> if (msg instanceof GridDhtPartitionDemandMessage) {
> GridDhtPartitionDemandMessage msg0 =
> (GridDhtPartitionDemandMessage)msg;
> return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
> }
> return false;
> };
> GridTestUtils.runAsync(() -> {
> System.out.println(">>> start grid 2");
> startGrid(2);
> startLatch.countDown();
> return null;
> });
> startLatch.await();
> TestRecordingCommunicationSpi spi2 =
> TestRecordingCommunicationSpi.spi(grid(2));
> spi2.waitForBlocked();
> spi2.stopBlock();
> // Forces rebalanceing to restart without assign partition states.
> System.out.println(">>> stop grid 1");
> node1.close();
> spi2.blockMessages(blockPred);
> spi2.waitForBlocked();
> System.out.println(">>> start grid 1");
> startGrid(1);
> spi2.stopBlock();
> // just to be sure
> U.sleep(3_000);
> awaitPartitionMapExchange();
> verifyPartittionSizes();
> }
> /** */
> private void verifyPartittionSizes() {
> int grids = G.allGrids().size();
> SB buf = new SB();
> for (int p = 0; p < 32; p++) {
> Set<Long> sizesSet = new LinkedHashSet<>();
> List<GridDhtLocalPartition> parts = new ArrayList<>();
> for (int n = 0; n < grids; n++) {
> GridDhtLocalPartition part =
> grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p);
> assert part != null;
> assert part.state() == GridDhtPartitionState.OWNING;
> sizesSet.add(part.fullSize());
> parts.add(part);
> }
> if (sizesSet.size() == 1)
> continue;
> buf.a(String.format("\n| %2d | ", p));
> for (GridDhtLocalPartition part : parts)
> buf.a(String.format(" %04d", part.fullSize())).a(" | ");
> }
> assertTrue("\n|------|-----------------------|" +
> "\n| | entries count |" +
> "\n| part |-----------------------|" +
> "\n| | node1 | node2 | node3 |" +
> "\n|------|-----------------------|" +
> buf +
> "\n|------|-------|-------|-------|", buf.length() == 0);
> }
> /** */
> private void loadData(IgniteCache cache, int off, int cnt) {
> try (IgniteDataStreamer<Integer, String> streamer =
> grid(0).dataStreamer(cache.getName())) {
> for (int i = off; i < off + cnt; i++)
> streamer.addData(i, String.valueOf(i));
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)