shishkovilja commented on code in PR #11512: URL: https://github.com/apache/ignite/pull/11512#discussion_r1831469849
########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java: ########## @@ -216,120 +177,66 @@ public void testOptimisticRepeatableReadCheckContentionTxMetricNear() throws Exc * @param isolation Isolation level. * @throws Exception If failed. */ - private void runKeyCollisionsMetric(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { + private void runKeyCollisionsMetric(TransactionConcurrency concurrency, TransactionIsolation isolation, boolean nearCache) + throws Exception { Ignite ig = startGridsMultiThreaded(3); - int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 5; - - CountDownLatch txLatch = new CountDownLatch(contCnt); + int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 2; ig.cluster().state(ClusterState.ACTIVE); - client = true; - - Ignite cl = startGrid(); - - IgniteTransactions cliTxMgr = cl.transactions(); - - IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME); - - IgniteCache<Integer, Integer> cache0 = cl.cache(DEFAULT_CACHE_NAME); - - final Integer keyId = primaryKey(cache); - - CountDownLatch blockOnce = new CountDownLatch(1); - - for (Ignite ig0 : G.allGrids()) { - if (ig0.configuration().isClientMode()) - continue; - - TestRecordingCommunicationSpi commSpi0 = - (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi(); - - commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { - @Override public boolean apply(ClusterNode node, Message msg) { - if (msg instanceof GridNearTxFinishResponse && blockOnce.getCount() > 0) { - blockOnce.countDown(); - - return true; - } + Ignite cl = startClientGrid(); - return false; - } - }); - } + IgniteCache<Integer, Integer> clientCache = cl.createCache(getCacheConfiguration(nearCache)); - IgniteInternalFuture f = GridTestUtils.runAsync(() -> { - try (Transaction tx = cliTxMgr.txStart(concurrency, isolation)) { - cache0.put(keyId, 0); - tx.commit(); - } - }); + final Integer keyId = primaryKey(ig.getOrCreateCache(getCacheConfiguration(nearCache))); - blockOnce.await(); + IgniteTransactions transactions = cl.transactions(); - GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>(); + AtomicBoolean doTest = new AtomicBoolean(true); - for (int i = 0; i < contCnt; ++i) { - IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> { - try (Transaction tx = cliTxMgr.txStart(concurrency, isolation)) { - cache0.put(keyId, 0); + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(() -> { + while (doTest.get()) { + try (Transaction tx = transactions.txStart(concurrency, isolation)) { + clientCache.put(keyId, 0); tx.commit(); - - txLatch.countDown(); } - }); + } + }, + contCnt, "threadName"); - finishFut.add(f0); + try { + assertTrue(GridTestUtils.waitForCondition( + () -> checkMetrics(ig), Review Comment: Fix indentations here and below. ########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java: ########## @@ -216,120 +177,66 @@ public void testOptimisticRepeatableReadCheckContentionTxMetricNear() throws Exc * @param isolation Isolation level. * @throws Exception If failed. */ - private void runKeyCollisionsMetric(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { + private void runKeyCollisionsMetric(TransactionConcurrency concurrency, TransactionIsolation isolation, boolean nearCache) + throws Exception { Ignite ig = startGridsMultiThreaded(3); - int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 5; - - CountDownLatch txLatch = new CountDownLatch(contCnt); + int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 2; ig.cluster().state(ClusterState.ACTIVE); - client = true; - - Ignite cl = startGrid(); - - IgniteTransactions cliTxMgr = cl.transactions(); - - IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME); - - IgniteCache<Integer, Integer> cache0 = cl.cache(DEFAULT_CACHE_NAME); - - final Integer keyId = primaryKey(cache); - - CountDownLatch blockOnce = new CountDownLatch(1); - - for (Ignite ig0 : G.allGrids()) { - if (ig0.configuration().isClientMode()) - continue; - - TestRecordingCommunicationSpi commSpi0 = - (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi(); - - commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { - @Override public boolean apply(ClusterNode node, Message msg) { - if (msg instanceof GridNearTxFinishResponse && blockOnce.getCount() > 0) { - blockOnce.countDown(); - - return true; - } + Ignite cl = startClientGrid(); - return false; - } - }); - } + IgniteCache<Integer, Integer> clientCache = cl.createCache(getCacheConfiguration(nearCache)); - IgniteInternalFuture f = GridTestUtils.runAsync(() -> { - try (Transaction tx = cliTxMgr.txStart(concurrency, isolation)) { - cache0.put(keyId, 0); - tx.commit(); - } - }); + final Integer keyId = primaryKey(ig.getOrCreateCache(getCacheConfiguration(nearCache))); - blockOnce.await(); + IgniteTransactions transactions = cl.transactions(); - GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>(); + AtomicBoolean doTest = new AtomicBoolean(true); - for (int i = 0; i < contCnt; ++i) { - IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> { - try (Transaction tx = cliTxMgr.txStart(concurrency, isolation)) { - cache0.put(keyId, 0); + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(() -> { + while (doTest.get()) { + try (Transaction tx = transactions.txStart(concurrency, isolation)) { + clientCache.put(keyId, 0); tx.commit(); - - txLatch.countDown(); } - }); + } + }, + contCnt, "threadName"); - finishFut.add(f0); + try { + assertTrue(GridTestUtils.waitForCondition( + () -> checkMetrics(ig), + getTestTimeout())); } + finally { + doTest.set(false); - finishFut.markInitialized(); - - for (Ignite ig0 : G.allGrids()) { - TestRecordingCommunicationSpi commSpi0 = - (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi(); - - if (ig0.configuration().isClientMode()) - continue; - - commSpi0.stopBlock(); + fut.get(); } + } + /** + * Checks if the transaction collision metrics contain the string "queueSize" for the given Ignite instance. + * + * @param ig Ignite instance. + * @return {@code true} if the metrics contain "queueSize"; otherwise {@code false}. + */ + private static boolean checkMetrics(Ignite ig) { IgniteTxManager srvTxMgr = ((IgniteEx)ig).context().cache().context().tm(); - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - U.invoke(IgniteTxManager.class, srvTxMgr, "collectTxCollisionsInfo"); - } - catch (IgniteCheckedException e) { - fail(e.toString()); - } - - CacheMetrics metrics = ig.cache(DEFAULT_CACHE_NAME).localMetrics(); - - String coll1 = metrics.getTxKeyCollisions(); - - if (!coll1.isEmpty()) { - String coll2 = metrics.getTxKeyCollisions(); - - // check idempotent - assertEquals(coll1, coll2); - - assertTrue(coll1.contains("queueSize")); - - return true; - } - else - return false; - } - }, 10_000)); - - f.get(); - - finishFut.get(); + try { + U.invoke(IgniteTxManager.class, srvTxMgr, "collectTxCollisionsInfo"); + } + catch (IgniteCheckedException e) { + fail(e.toString()); + } - txLatch.await(); + return ig.cache(DEFAULT_CACHE_NAME) + .localMetrics() Review Comment: Fix indentations here and below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org