alex-plekhanov commented on code in PR #11580: URL: https://github.com/apache/ignite/pull/11580#discussion_r1850239700
########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionIsolationTest.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import javax.cache.Cache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.CLIENT; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.SERVER; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_JDBC; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_CACHE_API; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_QUERY; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ModifyApi.QUERY; + +/** */ +public class ScanQueryTransactionIsolationTest extends AbstractQueryTransactionIsolationTest { + /** @return Test parameters. */ + @Parameterized.Parameters( + name = "gridCnt={0},backups={1},partitionAwareness={2},mode={3},execType={4},modify={5},commit={6},multi={7},txConcurrency={8}") + public static Collection<?> parameters() { + List<Object[]> params = new ArrayList<>(); + + for (int gridCnt : new int[]{1, 3}) { + int[] backups = gridCnt > 1 + ? new int[]{1, gridCnt - 1} + : new int[]{0}; + + for (int backup : backups) { + for (CacheMode mode : CacheMode.values()) { + for (ModifyApi modify : new ModifyApi[]{ModifyApi.CACHE, ModifyApi.ENTRY_PROCESSOR}) { + for (boolean commit : new boolean[]{false, true}) { + for (boolean mutli : new boolean[]{false, true}) { + for (TransactionConcurrency txConcurrency : TransactionConcurrency.values()) { + for (ExecutorType execType : new ExecutorType[]{SERVER, ExecutorType.CLIENT}) { + params.add(new Object[]{ + gridCnt, + backup, + false, //partition awareness + mode, + execType, + modify, + commit, + mutli, + txConcurrency + }); + } + + for (boolean partitionAwareness : new boolean[]{false, true}) { + params.add(new Object[]{ + gridCnt, + backup, + partitionAwareness, + mode, + THIN_VIA_QUERY, // executor type + modify, + commit, + mutli, + txConcurrency + }); + } + } + } + } + } + } + } + } + + return params; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected User select(Integer id, ModifyApi api) { + assertTrue(type != THIN_VIA_CACHE_API); + assertTrue(type != THIN_JDBC); + + if (api == QUERY) { + ScanQuery<Integer, User> qry = new ScanQuery<Integer, User>() + .setFilter((id0, user) -> Objects.equals(id0, id)); + + boolean withTrasformer = (type == SERVER || type == CLIENT) && ThreadLocalRandom.current().nextBoolean(); + boolean useGetAll = ThreadLocalRandom.current().nextBoolean(); + boolean useCacheIter = (type == SERVER || type == CLIENT) && ThreadLocalRandom.current().nextBoolean(); + + if (!withTrasformer) { + if (useCacheIter) { + assertTrue(type == SERVER || type == CLIENT); + + List<Cache.Entry<Integer, User>> res = + toList(F.iterator0(node().cache(users()), true, e -> Objects.equals(e.getKey(), id))); + + assertTrue(F.size(res) + "", F.size(res) <= 1); + + return F.isEmpty(res) ? null : res.get(0).getValue(); + } + else { + QueryCursor<Cache.Entry<Integer, User>> cursor = null; + + if (type == THIN_VIA_QUERY) + cursor = thinCli.<Integer, User>cache(users()).query(qry); + else if (type == SERVER || type == CLIENT) { Review Comment: Redundant braces ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java: ########## @@ -2350,6 +2354,53 @@ public void dumpListener(DumpEntryChangeListener dumpEntryChangeLsnr) { this.dumpLsnr = dumpEntryChangeLsnr; } + /** + * @param part Partition. + * @return First, set of object changed in transaction, second, list of transaction data in required format. + * @see ExecutionContext#transactionChanges(int, int[], Function) + */ + public IgniteBiTuple<Set<KeyCacheObject>, List<Object>> transactionChanges(Integer part) { Review Comment: There are a lot of declaration like `IgniteBiTuple<Set<KeyCacheObject>, List<Object>>` now in code, and a lot of calls like `txChanges.get1()`, `txChanges.get2()`. Perhaps we are ready to introduce new class for query transactional changes. ########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionIsolationTest.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import javax.cache.Cache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.CLIENT; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.SERVER; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_JDBC; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_CACHE_API; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_QUERY; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ModifyApi.QUERY; + +/** */ +public class ScanQueryTransactionIsolationTest extends AbstractQueryTransactionIsolationTest { + /** @return Test parameters. */ + @Parameterized.Parameters( + name = "gridCnt={0},backups={1},partitionAwareness={2},mode={3},execType={4},modify={5},commit={6},multi={7},txConcurrency={8}") + public static Collection<?> parameters() { + List<Object[]> params = new ArrayList<>(); + + for (int gridCnt : new int[]{1, 3}) { + int[] backups = gridCnt > 1 + ? new int[]{1, gridCnt - 1} + : new int[]{0}; + + for (int backup : backups) { + for (CacheMode mode : CacheMode.values()) { + for (ModifyApi modify : new ModifyApi[]{ModifyApi.CACHE, ModifyApi.ENTRY_PROCESSOR}) { + for (boolean commit : new boolean[]{false, true}) { + for (boolean mutli : new boolean[]{false, true}) { + for (TransactionConcurrency txConcurrency : TransactionConcurrency.values()) { + for (ExecutorType execType : new ExecutorType[]{SERVER, ExecutorType.CLIENT}) { + params.add(new Object[]{ + gridCnt, + backup, + false, //partition awareness + mode, + execType, + modify, + commit, + mutli, + txConcurrency + }); + } + + for (boolean partitionAwareness : new boolean[]{false, true}) { + params.add(new Object[]{ + gridCnt, + backup, + partitionAwareness, + mode, + THIN_VIA_QUERY, // executor type + modify, + commit, + mutli, + txConcurrency + }); + } + } + } + } + } + } + } + } + + return params; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected User select(Integer id, ModifyApi api) { + assertTrue(type != THIN_VIA_CACHE_API); + assertTrue(type != THIN_JDBC); + + if (api == QUERY) { + ScanQuery<Integer, User> qry = new ScanQuery<Integer, User>() + .setFilter((id0, user) -> Objects.equals(id0, id)); + + boolean withTrasformer = (type == SERVER || type == CLIENT) && ThreadLocalRandom.current().nextBoolean(); + boolean useGetAll = ThreadLocalRandom.current().nextBoolean(); + boolean useCacheIter = (type == SERVER || type == CLIENT) && ThreadLocalRandom.current().nextBoolean(); + + if (!withTrasformer) { + if (useCacheIter) { + assertTrue(type == SERVER || type == CLIENT); + + List<Cache.Entry<Integer, User>> res = + toList(F.iterator0(node().cache(users()), true, e -> Objects.equals(e.getKey(), id))); + + assertTrue(F.size(res) + "", F.size(res) <= 1); + + return F.isEmpty(res) ? null : res.get(0).getValue(); + } + else { + QueryCursor<Cache.Entry<Integer, User>> cursor = null; + + if (type == THIN_VIA_QUERY) + cursor = thinCli.<Integer, User>cache(users()).query(qry); + else if (type == SERVER || type == CLIENT) { + cursor = node().<Integer, User>cache(users()).query(qry); + } + else + fail("Unsupported executor type: " + type); + + List<Cache.Entry<Integer, User>> res = toList(cursor, useGetAll); + + assertTrue("useGetAll=" + useGetAll + ", useCacheIter=" + useCacheIter, F.size(res) <= 1); + + return F.isEmpty(res) ? null : res.get(0).getValue(); + } + } + else { + assertTrue(type == SERVER || type == CLIENT); + + List<User> res = toList(node().<Integer, User>cache(users()).query(qry, Cache.Entry::getValue), useGetAll); + + assertTrue("withTransformer=" + withTrasformer + ", useGetAll=" + useGetAll, F.size(res) <= 1); + + return F.first(res); + } + + } + + return super.select(id, api); + } + + /** */ + private static <R> List<R> toList(QueryCursor<R> cursor, boolean useGetAll) { + return useGetAll ? cursor.getAll() : toList(cursor.iterator()); + } + + /** */ + private static <R> List<R> toList(Iterator<R> iter) { + List<R> res = new ArrayList<>(); + + iter.forEachRemaining(res::add); + + return res; + } + + /** */ + private List<Cache.Entry<Integer, User>> unwrapBinary(List<?> all) { Review Comment: Never used ########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionIsolationTest.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import javax.cache.Cache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.CLIENT; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.SERVER; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_JDBC; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_CACHE_API; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_QUERY; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ModifyApi.QUERY; + +/** */ +public class ScanQueryTransactionIsolationTest extends AbstractQueryTransactionIsolationTest { + /** @return Test parameters. */ + @Parameterized.Parameters( + name = "gridCnt={0},backups={1},partitionAwareness={2},mode={3},execType={4},modify={5},commit={6},multi={7},txConcurrency={8}") + public static Collection<?> parameters() { + List<Object[]> params = new ArrayList<>(); + + for (int gridCnt : new int[]{1, 3}) { + int[] backups = gridCnt > 1 + ? new int[]{1, gridCnt - 1} + : new int[]{0}; + + for (int backup : backups) { + for (CacheMode mode : CacheMode.values()) { + for (ModifyApi modify : new ModifyApi[]{ModifyApi.CACHE, ModifyApi.ENTRY_PROCESSOR}) { + for (boolean commit : new boolean[]{false, true}) { + for (boolean mutli : new boolean[]{false, true}) { + for (TransactionConcurrency txConcurrency : TransactionConcurrency.values()) { + for (ExecutorType execType : new ExecutorType[]{SERVER, ExecutorType.CLIENT}) { + params.add(new Object[]{ + gridCnt, + backup, + false, //partition awareness + mode, + execType, + modify, + commit, + mutli, + txConcurrency + }); + } + + for (boolean partitionAwareness : new boolean[]{false, true}) { + params.add(new Object[]{ + gridCnt, + backup, + partitionAwareness, + mode, + THIN_VIA_QUERY, // executor type + modify, + commit, + mutli, + txConcurrency + }); + } + } + } + } + } + } + } + } + + return params; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected User select(Integer id, ModifyApi api) { + assertTrue(type != THIN_VIA_CACHE_API); + assertTrue(type != THIN_JDBC); + + if (api == QUERY) { + ScanQuery<Integer, User> qry = new ScanQuery<Integer, User>() + .setFilter((id0, user) -> Objects.equals(id0, id)); + + boolean withTrasformer = (type == SERVER || type == CLIENT) && ThreadLocalRandom.current().nextBoolean(); + boolean useGetAll = ThreadLocalRandom.current().nextBoolean(); + boolean useCacheIter = (type == SERVER || type == CLIENT) && ThreadLocalRandom.current().nextBoolean(); + + if (!withTrasformer) { + if (useCacheIter) { + assertTrue(type == SERVER || type == CLIENT); + + List<Cache.Entry<Integer, User>> res = + toList(F.iterator0(node().cache(users()), true, e -> Objects.equals(e.getKey(), id))); + + assertTrue(F.size(res) + "", F.size(res) <= 1); + + return F.isEmpty(res) ? null : res.get(0).getValue(); + } + else { + QueryCursor<Cache.Entry<Integer, User>> cursor = null; + + if (type == THIN_VIA_QUERY) + cursor = thinCli.<Integer, User>cache(users()).query(qry); + else if (type == SERVER || type == CLIENT) { + cursor = node().<Integer, User>cache(users()).query(qry); + } + else + fail("Unsupported executor type: " + type); + + List<Cache.Entry<Integer, User>> res = toList(cursor, useGetAll); + + assertTrue("useGetAll=" + useGetAll + ", useCacheIter=" + useCacheIter, F.size(res) <= 1); + + return F.isEmpty(res) ? null : res.get(0).getValue(); + } + } + else { + assertTrue(type == SERVER || type == CLIENT); + + List<User> res = toList(node().<Integer, User>cache(users()).query(qry, Cache.Entry::getValue), useGetAll); + + assertTrue("withTransformer=" + withTrasformer + ", useGetAll=" + useGetAll, F.size(res) <= 1); + + return F.first(res); + } + + } + + return super.select(id, api); + } + + /** */ + private static <R> List<R> toList(QueryCursor<R> cursor, boolean useGetAll) { + return useGetAll ? cursor.getAll() : toList(cursor.iterator()); + } + + /** */ + private static <R> List<R> toList(Iterator<R> iter) { + List<R> res = new ArrayList<>(); + + iter.forEachRemaining(res::add); + + return res; + } + + /** */ + private List<Cache.Entry<Integer, User>> unwrapBinary(List<?> all) { + return all.stream() + .map(e0 -> new CacheEntryImpl<>( + this.<Integer>unwrap(((Cache.Entry<?, ?>)e0).getKey()), + this.<User>unwrap(((Cache.Entry<?, ?>)e0).getValue())) + ).collect(Collectors.toList()); + } + + /** */ + private <T> T unwrap(Object o) { + if (o instanceof KeyCacheObject) + return ((CacheObject)o).value(null, false); + else if (o instanceof BinaryObject) { Review Comment: Redundant braces ########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java: ########## @@ -158,18 +165,15 @@ public void testGetObjectField() throws Exception { for (int i = 0; i < 50; i++) assertEquals(i * 100, res.get(i).intValue()); - } - finally { - cache.destroy(); - } + }); } /** * @throws Exception If failed. */ @Test public void testGetObjectFieldPartitioned() throws Exception { - IgniteCache<Integer, Value> cache = grid().createCache("test-cache"); + IgniteCache<Integer, Value> cache = createTestCache(); Review Comment: This test only fill one partition, I think for transactional mode we should also check that tx entries with are filtered correcly by partition if there are data for other partitions exists. ########## modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java: ########## @@ -93,8 +95,10 @@ SqlQueriesTopologyMappingTest.class, - IgniteCacheQueryReservationOnUnstableTopologyTest.class + IgniteCacheQueryReservationOnUnstableTopologyTest.class, + ScanQueryTransactionsUnsupportedModesTest.class, + ScanQueryTransactionIsolationTest.class Review Comment: Please add comma at the end of line ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java: ########## @@ -147,6 +152,10 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac /** */ private AffinityTopologyVersion topVer; + /** Set of keys that must be skiped during iteration. */ + @GridDirectCollection(KeyCacheObject.class) + private Collection<KeyCacheObject> skipKeys; Review Comment: Perhaps it worth to filter keys on reducer instead of modifying network protocol. I think it will be a little bit simplier. WDYT? ########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AbstractTransactionalQueryTest.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.lang.RunnableX; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; + +/** */ +@RunWith(Parameterized.class) +public abstract class AbstractTransactionalQueryTest extends GridCommonAbstractTest { + /** */ + @Parameterized.Parameter() + public TestTransactionMode sqlTxMode; Review Comment: Test not related to SQL, so `SQL`/`DML` in fields and comments looks confusing. ########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java: ########## @@ -311,23 +294,17 @@ public void testKeepBinaryFiltered() throws Exception { for (int i = 0; i < 5; i++) assertEquals(i * 1000, res.get(i).intValue()); - } - finally { - cache.destroy(); - } + }); } /** * @throws Exception If failed. */ @Test public void testLocal() throws Exception { - IgniteCache<Integer, Value> cache = grid().createCache("test-cache"); - - try { - for (int i = 0; i < 50; i++) - cache.put(i, new Value("str" + i, i * 100)); + assumeTrue(sqlTxMode == TestTransactionMode.NONE); Review Comment: Any transactional test with local flag? ########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionIsolationTest.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import javax.cache.Cache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.CLIENT; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.SERVER; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_JDBC; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_CACHE_API; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_QUERY; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ModifyApi.QUERY; + +/** */ +public class ScanQueryTransactionIsolationTest extends AbstractQueryTransactionIsolationTest { + /** @return Test parameters. */ + @Parameterized.Parameters( + name = "gridCnt={0},backups={1},partitionAwareness={2},mode={3},execType={4},modify={5},commit={6},multi={7},txConcurrency={8}") + public static Collection<?> parameters() { + List<Object[]> params = new ArrayList<>(); + + for (int gridCnt : new int[]{1, 3}) { + int[] backups = gridCnt > 1 + ? new int[]{1, gridCnt - 1} + : new int[]{0}; + + for (int backup : backups) { + for (CacheMode mode : CacheMode.values()) { + for (ModifyApi modify : new ModifyApi[]{ModifyApi.CACHE, ModifyApi.ENTRY_PROCESSOR}) { + for (boolean commit : new boolean[]{false, true}) { + for (boolean mutli : new boolean[]{false, true}) { + for (TransactionConcurrency txConcurrency : TransactionConcurrency.values()) { + for (ExecutorType execType : new ExecutorType[]{SERVER, ExecutorType.CLIENT}) { + params.add(new Object[]{ + gridCnt, + backup, + false, //partition awareness + mode, + execType, + modify, + commit, + mutli, + txConcurrency + }); + } + + for (boolean partitionAwareness : new boolean[]{false, true}) { + params.add(new Object[]{ + gridCnt, + backup, + partitionAwareness, + mode, + THIN_VIA_QUERY, // executor type + modify, + commit, + mutli, + txConcurrency + }); + } + } + } + } + } + } + } + } + + return params; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected User select(Integer id, ModifyApi api) { + assertTrue(type != THIN_VIA_CACHE_API); + assertTrue(type != THIN_JDBC); + + if (api == QUERY) { + ScanQuery<Integer, User> qry = new ScanQuery<Integer, User>() + .setFilter((id0, user) -> Objects.equals(id0, id)); + + boolean withTrasformer = (type == SERVER || type == CLIENT) && ThreadLocalRandom.current().nextBoolean(); + boolean useGetAll = ThreadLocalRandom.current().nextBoolean(); + boolean useCacheIter = (type == SERVER || type == CLIENT) && ThreadLocalRandom.current().nextBoolean(); + + if (!withTrasformer) { + if (useCacheIter) { + assertTrue(type == SERVER || type == CLIENT); + + List<Cache.Entry<Integer, User>> res = + toList(F.iterator0(node().cache(users()), true, e -> Objects.equals(e.getKey(), id))); + + assertTrue(F.size(res) + "", F.size(res) <= 1); + + return F.isEmpty(res) ? null : res.get(0).getValue(); + } + else { + QueryCursor<Cache.Entry<Integer, User>> cursor = null; + + if (type == THIN_VIA_QUERY) + cursor = thinCli.<Integer, User>cache(users()).query(qry); + else if (type == SERVER || type == CLIENT) { + cursor = node().<Integer, User>cache(users()).query(qry); + } + else + fail("Unsupported executor type: " + type); + + List<Cache.Entry<Integer, User>> res = toList(cursor, useGetAll); + + assertTrue("useGetAll=" + useGetAll + ", useCacheIter=" + useCacheIter, F.size(res) <= 1); + + return F.isEmpty(res) ? null : res.get(0).getValue(); + } + } + else { + assertTrue(type == SERVER || type == CLIENT); + + List<User> res = toList(node().<Integer, User>cache(users()).query(qry, Cache.Entry::getValue), useGetAll); + + assertTrue("withTransformer=" + withTrasformer + ", useGetAll=" + useGetAll, F.size(res) <= 1); + + return F.first(res); + } + Review Comment: Redundant NL ########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionsUnsupportedModesTest.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.HashSet; +import java.util.Set; +import javax.cache.CacheException; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.junit.Test; + +/** */ +public class ScanQueryTransactionsUnsupportedModesTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true); + + return cfg; + } + + /** */ + @Test + public void testUnsupportedTransactionModes() throws Exception { + try (IgniteEx srv = startGrid(0)) { + for (boolean client : new boolean[] {/*false,*/ true}) { Review Comment: `/*false*/`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org