[ 
https://issues.apache.org/jira/browse/IGNITE-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mikhail Petrov updated IGNITE-12866:
------------------------------------
    Description: 

CQ fails if CQ filter deserialization exception occurred on a node that does 
not match the cache node filter in case cache node filter must be loaded via 
p2p.

Reproducer:

{code:java}
        public class CacheContinuousQueryExternalNodeFilterTest extends 
GridCommonAbstractTest {
    /** */
    private static final String EXT_EVT_FILTER_CLS = 
"org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilter";

    /** */
    private static final String EXT_NODE_FILTER_CLS = 
"org.apache.ignite.tests.p2p.AttributeBasedNodeFilter";

    /** */
    private static final URL[] URLS;

    static {
        try {
            URLS = new URL[] {new URL(getProperty("p2p.uri.cls.second"))};
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
    }

    /** */
    private final ClassLoader extLdr = getExternalClassLoader();

    /** */
    private final ClassLoader secondExtLdr = new URLClassLoader(URLS, 
U.gridClassLoader());

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        if (getTestIgniteInstanceName(0).equals(igniteInstanceName))
            cfg.setClassLoader(extLdr);
        else {
            cfg.setClassLoader(secondExtLdr);
            cfg.setUserAttributes(U.map("skipCacheStart", true));
        }

        return cfg;
    }

    /** */
    @Test
    public void test() throws Exception {
        startGrids(2).cluster().state(ClusterState.ACTIVE);

        Class<IgnitePredicate<ClusterNode>> nodeFilter = 
(Class<IgnitePredicate<ClusterNode>>)extLdr
            .loadClass(EXT_NODE_FILTER_CLS);

        grid(0).createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
            .setNodeFilter(nodeFilter.newInstance()));

        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();

        Class<CacheEntryEventFilter<Integer, Integer>> filterCls =
            (Class<CacheEntryEventFilter<Integer, 
Integer>>)extLdr.loadClass(EXT_EVT_FILTER_CLS);

        Factory<CacheEntryEventFilter<Integer, Integer>> rmtFilterFactory = new 
ClassFilterFactory(filterCls);

        qry.setRemoteFilterFactory(rmtFilterFactory);

        assertNull(grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME));

        grid(0).cache(DEFAULT_CACHE_NAME).query(qry);

        assertEquals(1, 
grid(0).context().systemView().view(CQ_SYS_VIEW).size());
        assertEquals(0, 
grid(1).context().systemView().view(CQ_SYS_VIEW).size());
    }

    /** */
    private static class ClassFilterFactory implements 
Factory<CacheEntryEventFilter<Integer, Integer>> {
        /** */
        private Class<CacheEntryEventFilter<Integer, Integer>> cls;

        /** */
        public ClassFilterFactory(Class<CacheEntryEventFilter<Integer, 
Integer>> cls) {
            this.cls = cls;
        }

        /** {@inheritDoc} */
        @Override public CacheEntryEventFilter<Integer, Integer> create() {
            try {
                return cls.newInstance();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}
{code}

To run the reproducer place the following class to 
modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/ and 
modules/extdata/uri/src/main/java/org/apache/ignite/tests/p2p/


{code:java}
public class AttributeBasedNodeFilter implements IgnitePredicate<ClusterNode> {
    /** {@inheritDoc} */
    @Override public boolean apply(ClusterNode node) {
        return node.attribute("skipCacheStart") == null;
    }
}
{code}


  was:
CQ fails if CQ filter deserialization exception occurred on a node that does 
not match the cache node filter in case cache node filter must be loaded via 
p2p.

Reproducer is linked as PR to the ticket.


> CQ fails due to CQ filter deserialization exception.
> ----------------------------------------------------
>
>                 Key: IGNITE-12866
>                 URL: https://issues.apache.org/jira/browse/IGNITE-12866
>             Project: Ignite
>          Issue Type: Bug
>            Reporter: Mikhail Petrov
>            Priority: Minor
>
> CQ fails if CQ filter deserialization exception occurred on a node that does 
> not match the cache node filter in case cache node filter must be loaded via 
> p2p.
> Reproducer:
> {code:java}
>       public class CacheContinuousQueryExternalNodeFilterTest extends 
> GridCommonAbstractTest {
>     /** */
>     private static final String EXT_EVT_FILTER_CLS = 
> "org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilter";
>     /** */
>     private static final String EXT_NODE_FILTER_CLS = 
> "org.apache.ignite.tests.p2p.AttributeBasedNodeFilter";
>     /** */
>     private static final URL[] URLS;
>     static {
>         try {
>             URLS = new URL[] {new URL(getProperty("p2p.uri.cls.second"))};
>         }
>         catch (MalformedURLException e) {
>             throw new RuntimeException(e);
>         }
>     }
>     /** */
>     private final ClassLoader extLdr = getExternalClassLoader();
>     /** */
>     private final ClassLoader secondExtLdr = new URLClassLoader(URLS, 
> U.gridClassLoader());
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String 
> igniteInstanceName) throws Exception {
>         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
>         if (getTestIgniteInstanceName(0).equals(igniteInstanceName))
>             cfg.setClassLoader(extLdr);
>         else {
>             cfg.setClassLoader(secondExtLdr);
>             cfg.setUserAttributes(U.map("skipCacheStart", true));
>         }
>         return cfg;
>     }
>     /** */
>     @Test
>     public void test() throws Exception {
>         startGrids(2).cluster().state(ClusterState.ACTIVE);
>         Class<IgnitePredicate<ClusterNode>> nodeFilter = 
> (Class<IgnitePredicate<ClusterNode>>)extLdr
>             .loadClass(EXT_NODE_FILTER_CLS);
>         grid(0).createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
>             .setNodeFilter(nodeFilter.newInstance()));
>         ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
>         Class<CacheEntryEventFilter<Integer, Integer>> filterCls =
>             (Class<CacheEntryEventFilter<Integer, 
> Integer>>)extLdr.loadClass(EXT_EVT_FILTER_CLS);
>         Factory<CacheEntryEventFilter<Integer, Integer>> rmtFilterFactory = 
> new ClassFilterFactory(filterCls);
>         qry.setRemoteFilterFactory(rmtFilterFactory);
>         
> assertNull(grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME));
>         grid(0).cache(DEFAULT_CACHE_NAME).query(qry);
>         assertEquals(1, 
> grid(0).context().systemView().view(CQ_SYS_VIEW).size());
>         assertEquals(0, 
> grid(1).context().systemView().view(CQ_SYS_VIEW).size());
>     }
>     /** */
>     private static class ClassFilterFactory implements 
> Factory<CacheEntryEventFilter<Integer, Integer>> {
>         /** */
>         private Class<CacheEntryEventFilter<Integer, Integer>> cls;
>         /** */
>         public ClassFilterFactory(Class<CacheEntryEventFilter<Integer, 
> Integer>> cls) {
>             this.cls = cls;
>         }
>         /** {@inheritDoc} */
>         @Override public CacheEntryEventFilter<Integer, Integer> create() {
>             try {
>                 return cls.newInstance();
>             }
>             catch (Exception e) {
>                 throw new RuntimeException(e);
>             }
>         }
>     }
> }
> {code}
> To run the reproducer place the following class to 
> modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/ and 
> modules/extdata/uri/src/main/java/org/apache/ignite/tests/p2p/
> {code:java}
> public class AttributeBasedNodeFilter implements IgnitePredicate<ClusterNode> 
> {
>     /** {@inheritDoc} */
>     @Override public boolean apply(ClusterNode node) {
>         return node.attribute("skipCacheStart") == null;
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to