alex-plekhanov commented on code in PR #11580:
URL: https://github.com/apache/ignite/pull/11580#discussion_r1850239700


##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionIsolationTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import javax.cache.Cache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.CLIENT;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.SERVER;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_JDBC;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_CACHE_API;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_QUERY;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ModifyApi.QUERY;
+
+/** */
+public class ScanQueryTransactionIsolationTest extends 
AbstractQueryTransactionIsolationTest {
+    /** @return Test parameters. */
+    @Parameterized.Parameters(
+        name = 
"gridCnt={0},backups={1},partitionAwareness={2},mode={3},execType={4},modify={5},commit={6},multi={7},txConcurrency={8}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (int gridCnt : new int[]{1, 3}) {
+            int[] backups = gridCnt > 1
+                ? new int[]{1, gridCnt - 1}
+                : new int[]{0};
+
+            for (int backup : backups) {
+                for (CacheMode mode : CacheMode.values()) {
+                    for (ModifyApi modify : new ModifyApi[]{ModifyApi.CACHE, 
ModifyApi.ENTRY_PROCESSOR}) {
+                        for (boolean commit : new boolean[]{false, true}) {
+                            for (boolean mutli : new boolean[]{false, true}) {
+                                for (TransactionConcurrency txConcurrency : 
TransactionConcurrency.values()) {
+                                    for (ExecutorType execType : new 
ExecutorType[]{SERVER, ExecutorType.CLIENT}) {
+                                        params.add(new Object[]{
+                                            gridCnt,
+                                            backup,
+                                            false, //partition awareness
+                                            mode,
+                                            execType,
+                                            modify,
+                                            commit,
+                                            mutli,
+                                            txConcurrency
+                                        });
+                                    }
+
+                                    for (boolean partitionAwareness : new 
boolean[]{false, true}) {
+                                        params.add(new Object[]{
+                                            gridCnt,
+                                            backup,
+                                            partitionAwareness,
+                                            mode,
+                                            THIN_VIA_QUERY, // executor type
+                                            modify,
+                                            commit,
+                                            mutli,
+                                            txConcurrency
+                                        });
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected User select(Integer id, ModifyApi api) {
+        assertTrue(type != THIN_VIA_CACHE_API);
+        assertTrue(type != THIN_JDBC);
+
+        if (api == QUERY) {
+            ScanQuery<Integer, User> qry = new ScanQuery<Integer, User>()
+                .setFilter((id0, user) -> Objects.equals(id0, id));
+
+            boolean withTrasformer = (type == SERVER || type == CLIENT) && 
ThreadLocalRandom.current().nextBoolean();
+            boolean useGetAll = ThreadLocalRandom.current().nextBoolean();
+            boolean useCacheIter = (type == SERVER || type == CLIENT) && 
ThreadLocalRandom.current().nextBoolean();
+
+            if (!withTrasformer) {
+                if (useCacheIter) {
+                    assertTrue(type == SERVER || type == CLIENT);
+
+                    List<Cache.Entry<Integer, User>> res =
+                        toList(F.iterator0(node().cache(users()), true, e -> 
Objects.equals(e.getKey(), id)));
+
+                    assertTrue(F.size(res) + "", F.size(res) <= 1);
+
+                    return F.isEmpty(res) ? null : res.get(0).getValue();
+                }
+                else {
+                    QueryCursor<Cache.Entry<Integer, User>> cursor = null;
+
+                    if (type == THIN_VIA_QUERY)
+                        cursor = thinCli.<Integer, 
User>cache(users()).query(qry);
+                    else if (type == SERVER || type == CLIENT) {

Review Comment:
   Redundant braces



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java:
##########
@@ -2350,6 +2354,53 @@ public void dumpListener(DumpEntryChangeListener 
dumpEntryChangeLsnr) {
         this.dumpLsnr = dumpEntryChangeLsnr;
     }
 
+    /**
+     * @param part Partition.
+     * @return First, set of object changed in transaction, second, list of 
transaction data in required format.
+     * @see ExecutionContext#transactionChanges(int, int[], Function)
+     */
+    public IgniteBiTuple<Set<KeyCacheObject>, List<Object>> 
transactionChanges(Integer part) {

Review Comment:
   There are a lot of declaration like `IgniteBiTuple<Set<KeyCacheObject>, 
List<Object>>` now in code, and a lot of calls like `txChanges.get1()`, 
`txChanges.get2()`. Perhaps we are ready to introduce new class for query 
transactional changes.



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionIsolationTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import javax.cache.Cache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.CLIENT;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.SERVER;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_JDBC;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_CACHE_API;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_QUERY;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ModifyApi.QUERY;
+
+/** */
+public class ScanQueryTransactionIsolationTest extends 
AbstractQueryTransactionIsolationTest {
+    /** @return Test parameters. */
+    @Parameterized.Parameters(
+        name = 
"gridCnt={0},backups={1},partitionAwareness={2},mode={3},execType={4},modify={5},commit={6},multi={7},txConcurrency={8}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (int gridCnt : new int[]{1, 3}) {
+            int[] backups = gridCnt > 1
+                ? new int[]{1, gridCnt - 1}
+                : new int[]{0};
+
+            for (int backup : backups) {
+                for (CacheMode mode : CacheMode.values()) {
+                    for (ModifyApi modify : new ModifyApi[]{ModifyApi.CACHE, 
ModifyApi.ENTRY_PROCESSOR}) {
+                        for (boolean commit : new boolean[]{false, true}) {
+                            for (boolean mutli : new boolean[]{false, true}) {
+                                for (TransactionConcurrency txConcurrency : 
TransactionConcurrency.values()) {
+                                    for (ExecutorType execType : new 
ExecutorType[]{SERVER, ExecutorType.CLIENT}) {
+                                        params.add(new Object[]{
+                                            gridCnt,
+                                            backup,
+                                            false, //partition awareness
+                                            mode,
+                                            execType,
+                                            modify,
+                                            commit,
+                                            mutli,
+                                            txConcurrency
+                                        });
+                                    }
+
+                                    for (boolean partitionAwareness : new 
boolean[]{false, true}) {
+                                        params.add(new Object[]{
+                                            gridCnt,
+                                            backup,
+                                            partitionAwareness,
+                                            mode,
+                                            THIN_VIA_QUERY, // executor type
+                                            modify,
+                                            commit,
+                                            mutli,
+                                            txConcurrency
+                                        });
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected User select(Integer id, ModifyApi api) {
+        assertTrue(type != THIN_VIA_CACHE_API);
+        assertTrue(type != THIN_JDBC);
+
+        if (api == QUERY) {
+            ScanQuery<Integer, User> qry = new ScanQuery<Integer, User>()
+                .setFilter((id0, user) -> Objects.equals(id0, id));
+
+            boolean withTrasformer = (type == SERVER || type == CLIENT) && 
ThreadLocalRandom.current().nextBoolean();
+            boolean useGetAll = ThreadLocalRandom.current().nextBoolean();
+            boolean useCacheIter = (type == SERVER || type == CLIENT) && 
ThreadLocalRandom.current().nextBoolean();
+
+            if (!withTrasformer) {
+                if (useCacheIter) {
+                    assertTrue(type == SERVER || type == CLIENT);
+
+                    List<Cache.Entry<Integer, User>> res =
+                        toList(F.iterator0(node().cache(users()), true, e -> 
Objects.equals(e.getKey(), id)));
+
+                    assertTrue(F.size(res) + "", F.size(res) <= 1);
+
+                    return F.isEmpty(res) ? null : res.get(0).getValue();
+                }
+                else {
+                    QueryCursor<Cache.Entry<Integer, User>> cursor = null;
+
+                    if (type == THIN_VIA_QUERY)
+                        cursor = thinCli.<Integer, 
User>cache(users()).query(qry);
+                    else if (type == SERVER || type == CLIENT) {
+                        cursor = node().<Integer, 
User>cache(users()).query(qry);
+                    }
+                    else
+                        fail("Unsupported executor type: " + type);
+
+                    List<Cache.Entry<Integer, User>> res = toList(cursor, 
useGetAll);
+
+                    assertTrue("useGetAll=" + useGetAll + ", useCacheIter=" + 
useCacheIter, F.size(res) <= 1);
+
+                    return F.isEmpty(res) ? null : res.get(0).getValue();
+                }
+            }
+            else {
+                assertTrue(type == SERVER || type == CLIENT);
+
+                List<User> res = toList(node().<Integer, 
User>cache(users()).query(qry, Cache.Entry::getValue), useGetAll);
+
+                assertTrue("withTransformer=" + withTrasformer + ", 
useGetAll=" + useGetAll, F.size(res) <= 1);
+
+                return F.first(res);
+            }
+
+        }
+
+        return super.select(id, api);
+    }
+
+    /** */
+    private static <R> List<R> toList(QueryCursor<R> cursor, boolean 
useGetAll) {
+        return useGetAll ? cursor.getAll() : toList(cursor.iterator());
+    }
+
+    /** */
+    private static <R> List<R> toList(Iterator<R> iter) {
+        List<R> res = new ArrayList<>();
+
+        iter.forEachRemaining(res::add);
+
+        return res;
+    }
+
+    /** */
+    private List<Cache.Entry<Integer, User>> unwrapBinary(List<?> all) {

Review Comment:
   Never used



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionIsolationTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import javax.cache.Cache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.CLIENT;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.SERVER;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_JDBC;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_CACHE_API;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_QUERY;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ModifyApi.QUERY;
+
+/** */
+public class ScanQueryTransactionIsolationTest extends 
AbstractQueryTransactionIsolationTest {
+    /** @return Test parameters. */
+    @Parameterized.Parameters(
+        name = 
"gridCnt={0},backups={1},partitionAwareness={2},mode={3},execType={4},modify={5},commit={6},multi={7},txConcurrency={8}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (int gridCnt : new int[]{1, 3}) {
+            int[] backups = gridCnt > 1
+                ? new int[]{1, gridCnt - 1}
+                : new int[]{0};
+
+            for (int backup : backups) {
+                for (CacheMode mode : CacheMode.values()) {
+                    for (ModifyApi modify : new ModifyApi[]{ModifyApi.CACHE, 
ModifyApi.ENTRY_PROCESSOR}) {
+                        for (boolean commit : new boolean[]{false, true}) {
+                            for (boolean mutli : new boolean[]{false, true}) {
+                                for (TransactionConcurrency txConcurrency : 
TransactionConcurrency.values()) {
+                                    for (ExecutorType execType : new 
ExecutorType[]{SERVER, ExecutorType.CLIENT}) {
+                                        params.add(new Object[]{
+                                            gridCnt,
+                                            backup,
+                                            false, //partition awareness
+                                            mode,
+                                            execType,
+                                            modify,
+                                            commit,
+                                            mutli,
+                                            txConcurrency
+                                        });
+                                    }
+
+                                    for (boolean partitionAwareness : new 
boolean[]{false, true}) {
+                                        params.add(new Object[]{
+                                            gridCnt,
+                                            backup,
+                                            partitionAwareness,
+                                            mode,
+                                            THIN_VIA_QUERY, // executor type
+                                            modify,
+                                            commit,
+                                            mutli,
+                                            txConcurrency
+                                        });
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected User select(Integer id, ModifyApi api) {
+        assertTrue(type != THIN_VIA_CACHE_API);
+        assertTrue(type != THIN_JDBC);
+
+        if (api == QUERY) {
+            ScanQuery<Integer, User> qry = new ScanQuery<Integer, User>()
+                .setFilter((id0, user) -> Objects.equals(id0, id));
+
+            boolean withTrasformer = (type == SERVER || type == CLIENT) && 
ThreadLocalRandom.current().nextBoolean();
+            boolean useGetAll = ThreadLocalRandom.current().nextBoolean();
+            boolean useCacheIter = (type == SERVER || type == CLIENT) && 
ThreadLocalRandom.current().nextBoolean();
+
+            if (!withTrasformer) {
+                if (useCacheIter) {
+                    assertTrue(type == SERVER || type == CLIENT);
+
+                    List<Cache.Entry<Integer, User>> res =
+                        toList(F.iterator0(node().cache(users()), true, e -> 
Objects.equals(e.getKey(), id)));
+
+                    assertTrue(F.size(res) + "", F.size(res) <= 1);
+
+                    return F.isEmpty(res) ? null : res.get(0).getValue();
+                }
+                else {
+                    QueryCursor<Cache.Entry<Integer, User>> cursor = null;
+
+                    if (type == THIN_VIA_QUERY)
+                        cursor = thinCli.<Integer, 
User>cache(users()).query(qry);
+                    else if (type == SERVER || type == CLIENT) {
+                        cursor = node().<Integer, 
User>cache(users()).query(qry);
+                    }
+                    else
+                        fail("Unsupported executor type: " + type);
+
+                    List<Cache.Entry<Integer, User>> res = toList(cursor, 
useGetAll);
+
+                    assertTrue("useGetAll=" + useGetAll + ", useCacheIter=" + 
useCacheIter, F.size(res) <= 1);
+
+                    return F.isEmpty(res) ? null : res.get(0).getValue();
+                }
+            }
+            else {
+                assertTrue(type == SERVER || type == CLIENT);
+
+                List<User> res = toList(node().<Integer, 
User>cache(users()).query(qry, Cache.Entry::getValue), useGetAll);
+
+                assertTrue("withTransformer=" + withTrasformer + ", 
useGetAll=" + useGetAll, F.size(res) <= 1);
+
+                return F.first(res);
+            }
+
+        }
+
+        return super.select(id, api);
+    }
+
+    /** */
+    private static <R> List<R> toList(QueryCursor<R> cursor, boolean 
useGetAll) {
+        return useGetAll ? cursor.getAll() : toList(cursor.iterator());
+    }
+
+    /** */
+    private static <R> List<R> toList(Iterator<R> iter) {
+        List<R> res = new ArrayList<>();
+
+        iter.forEachRemaining(res::add);
+
+        return res;
+    }
+
+    /** */
+    private List<Cache.Entry<Integer, User>> unwrapBinary(List<?> all) {
+        return all.stream()
+            .map(e0 -> new CacheEntryImpl<>(
+                this.<Integer>unwrap(((Cache.Entry<?, ?>)e0).getKey()),
+                this.<User>unwrap(((Cache.Entry<?, ?>)e0).getValue()))
+            ).collect(Collectors.toList());
+    }
+
+    /** */
+    private <T> T unwrap(Object o) {
+        if (o instanceof KeyCacheObject)
+            return ((CacheObject)o).value(null, false);
+        else if (o instanceof BinaryObject) {

Review Comment:
   Redundant braces



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java:
##########
@@ -158,18 +165,15 @@ public void testGetObjectField() throws Exception {
 
             for (int i = 0; i < 50; i++)
                 assertEquals(i * 100, res.get(i).intValue());
-        }
-        finally {
-            cache.destroy();
-        }
+        });
     }
 
     /**
      * @throws Exception If failed.
      */
     @Test
     public void testGetObjectFieldPartitioned() throws Exception {
-        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+        IgniteCache<Integer, Value> cache = createTestCache();

Review Comment:
   This test only fill one partition, I think for transactional mode we should 
also check that tx entries with are filtered correcly by partition if there are 
data for other partitions exists.



##########
modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java:
##########
@@ -93,8 +95,10 @@
 
     SqlQueriesTopologyMappingTest.class,
 
-    IgniteCacheQueryReservationOnUnstableTopologyTest.class
+    IgniteCacheQueryReservationOnUnstableTopologyTest.class,
 
+    ScanQueryTransactionsUnsupportedModesTest.class,
+    ScanQueryTransactionIsolationTest.class

Review Comment:
   Please add comma at the end of line



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java:
##########
@@ -147,6 +152,10 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
     /** */
     private AffinityTopologyVersion topVer;
 
+    /** Set of keys that must be skiped during iteration. */
+    @GridDirectCollection(KeyCacheObject.class)
+    private Collection<KeyCacheObject> skipKeys;

Review Comment:
   Perhaps it worth to filter keys on reducer instead of modifying network 
protocol. I think it will be a little bit simplier. WDYT?



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AbstractTransactionalQueryTest.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
+/** */
+@RunWith(Parameterized.class)
+public abstract class AbstractTransactionalQueryTest extends 
GridCommonAbstractTest {
+    /** */
+    @Parameterized.Parameter()
+    public TestTransactionMode sqlTxMode;

Review Comment:
   Test not related to SQL, so `SQL`/`DML` in fields and comments looks 
confusing.



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java:
##########
@@ -311,23 +294,17 @@ public void testKeepBinaryFiltered() throws Exception {
 
             for (int i = 0; i < 5; i++)
                 assertEquals(i * 1000, res.get(i).intValue());
-        }
-        finally {
-            cache.destroy();
-        }
+        });
     }
 
     /**
      * @throws Exception If failed.
      */
     @Test
     public void testLocal() throws Exception {
-        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
-
-        try {
-            for (int i = 0; i < 50; i++)
-                cache.put(i, new Value("str" + i, i * 100));
+        assumeTrue(sqlTxMode == TestTransactionMode.NONE);

Review Comment:
   Any transactional test with local flag?



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionIsolationTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import javax.cache.Cache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.CLIENT;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.SERVER;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_JDBC;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_CACHE_API;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_QUERY;
+import static 
org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ModifyApi.QUERY;
+
+/** */
+public class ScanQueryTransactionIsolationTest extends 
AbstractQueryTransactionIsolationTest {
+    /** @return Test parameters. */
+    @Parameterized.Parameters(
+        name = 
"gridCnt={0},backups={1},partitionAwareness={2},mode={3},execType={4},modify={5},commit={6},multi={7},txConcurrency={8}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (int gridCnt : new int[]{1, 3}) {
+            int[] backups = gridCnt > 1
+                ? new int[]{1, gridCnt - 1}
+                : new int[]{0};
+
+            for (int backup : backups) {
+                for (CacheMode mode : CacheMode.values()) {
+                    for (ModifyApi modify : new ModifyApi[]{ModifyApi.CACHE, 
ModifyApi.ENTRY_PROCESSOR}) {
+                        for (boolean commit : new boolean[]{false, true}) {
+                            for (boolean mutli : new boolean[]{false, true}) {
+                                for (TransactionConcurrency txConcurrency : 
TransactionConcurrency.values()) {
+                                    for (ExecutorType execType : new 
ExecutorType[]{SERVER, ExecutorType.CLIENT}) {
+                                        params.add(new Object[]{
+                                            gridCnt,
+                                            backup,
+                                            false, //partition awareness
+                                            mode,
+                                            execType,
+                                            modify,
+                                            commit,
+                                            mutli,
+                                            txConcurrency
+                                        });
+                                    }
+
+                                    for (boolean partitionAwareness : new 
boolean[]{false, true}) {
+                                        params.add(new Object[]{
+                                            gridCnt,
+                                            backup,
+                                            partitionAwareness,
+                                            mode,
+                                            THIN_VIA_QUERY, // executor type
+                                            modify,
+                                            commit,
+                                            mutli,
+                                            txConcurrency
+                                        });
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected User select(Integer id, ModifyApi api) {
+        assertTrue(type != THIN_VIA_CACHE_API);
+        assertTrue(type != THIN_JDBC);
+
+        if (api == QUERY) {
+            ScanQuery<Integer, User> qry = new ScanQuery<Integer, User>()
+                .setFilter((id0, user) -> Objects.equals(id0, id));
+
+            boolean withTrasformer = (type == SERVER || type == CLIENT) && 
ThreadLocalRandom.current().nextBoolean();
+            boolean useGetAll = ThreadLocalRandom.current().nextBoolean();
+            boolean useCacheIter = (type == SERVER || type == CLIENT) && 
ThreadLocalRandom.current().nextBoolean();
+
+            if (!withTrasformer) {
+                if (useCacheIter) {
+                    assertTrue(type == SERVER || type == CLIENT);
+
+                    List<Cache.Entry<Integer, User>> res =
+                        toList(F.iterator0(node().cache(users()), true, e -> 
Objects.equals(e.getKey(), id)));
+
+                    assertTrue(F.size(res) + "", F.size(res) <= 1);
+
+                    return F.isEmpty(res) ? null : res.get(0).getValue();
+                }
+                else {
+                    QueryCursor<Cache.Entry<Integer, User>> cursor = null;
+
+                    if (type == THIN_VIA_QUERY)
+                        cursor = thinCli.<Integer, 
User>cache(users()).query(qry);
+                    else if (type == SERVER || type == CLIENT) {
+                        cursor = node().<Integer, 
User>cache(users()).query(qry);
+                    }
+                    else
+                        fail("Unsupported executor type: " + type);
+
+                    List<Cache.Entry<Integer, User>> res = toList(cursor, 
useGetAll);
+
+                    assertTrue("useGetAll=" + useGetAll + ", useCacheIter=" + 
useCacheIter, F.size(res) <= 1);
+
+                    return F.isEmpty(res) ? null : res.get(0).getValue();
+                }
+            }
+            else {
+                assertTrue(type == SERVER || type == CLIENT);
+
+                List<User> res = toList(node().<Integer, 
User>cache(users()).query(qry, Cache.Entry::getValue), useGetAll);
+
+                assertTrue("withTransformer=" + withTrasformer + ", 
useGetAll=" + useGetAll, F.size(res) <= 1);
+
+                return F.first(res);
+            }
+

Review Comment:
   Redundant NL



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionsUnsupportedModesTest.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.HashSet;
+import java.util.Set;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Test;
+
+/** */
+public class ScanQueryTransactionsUnsupportedModesTest extends 
GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true);
+
+        return cfg;
+    }
+
+    /** */
+    @Test
+    public void testUnsupportedTransactionModes() throws Exception {
+        try (IgniteEx srv = startGrid(0)) {
+            for (boolean client : new boolean[] {/*false,*/ true}) {

Review Comment:
   `/*false*/`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to