[
https://issues.apache.org/jira/browse/IGNITE-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mikhail Petrov updated IGNITE-12866:
------------------------------------
Description:
CQ fails if CQ filter deserialization exception occurred on a node that does
not match the cache node filter in case cache node filter must be loaded via
p2p.
Reproducer:
{code:java}
public class CacheContinuousQueryExternalNodeFilterTest extends
GridCommonAbstractTest {
/** */
private static final String EXT_EVT_FILTER_CLS =
"org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilter";
/** */
private static final String EXT_NODE_FILTER_CLS =
"org.apache.ignite.tests.p2p.AttributeBasedNodeFilter";
/** */
private static final URL[] URLS;
static {
try {
URLS = new URL[] {new URL(getProperty("p2p.uri.cls.second"))};
}
catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}
/** */
private final ClassLoader extLdr = getExternalClassLoader();
/** */
private final ClassLoader secondExtLdr = new URLClassLoader(URLS,
U.gridClassLoader());
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (getTestIgniteInstanceName(0).equals(igniteInstanceName))
cfg.setClassLoader(extLdr);
else {
cfg.setClassLoader(secondExtLdr);
cfg.setUserAttributes(U.map("skipCacheStart", true));
}
return cfg;
}
/** */
@Test
public void test() throws Exception {
startGrids(2).cluster().state(ClusterState.ACTIVE);
Class<IgnitePredicate<ClusterNode>> nodeFilter =
(Class<IgnitePredicate<ClusterNode>>)extLdr
.loadClass(EXT_NODE_FILTER_CLS);
grid(0).createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setNodeFilter(nodeFilter.newInstance()));
ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
Class<CacheEntryEventFilter<Integer, Integer>> filterCls =
(Class<CacheEntryEventFilter<Integer,
Integer>>)extLdr.loadClass(EXT_EVT_FILTER_CLS);
Factory<CacheEntryEventFilter<Integer, Integer>> rmtFilterFactory = new
ClassFilterFactory(filterCls);
qry.setRemoteFilterFactory(rmtFilterFactory);
assertNull(grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME));
grid(0).cache(DEFAULT_CACHE_NAME).query(qry);
assertEquals(1,
grid(0).context().systemView().view(CQ_SYS_VIEW).size());
assertEquals(0,
grid(1).context().systemView().view(CQ_SYS_VIEW).size());
}
/** */
private static class ClassFilterFactory implements
Factory<CacheEntryEventFilter<Integer, Integer>> {
/** */
private Class<CacheEntryEventFilter<Integer, Integer>> cls;
/** */
public ClassFilterFactory(Class<CacheEntryEventFilter<Integer,
Integer>> cls) {
this.cls = cls;
}
/** {@inheritDoc} */
@Override public CacheEntryEventFilter<Integer, Integer> create() {
try {
return cls.newInstance();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
{code}
To run the reproducer place the following class to
modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/ and
modules/extdata/uri/src/main/java/org/apache/ignite/tests/p2p/
{code:java}
public class AttributeBasedNodeFilter implements IgnitePredicate<ClusterNode> {
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode node) {
return node.attribute("skipCacheStart") == null;
}
}
{code}
was:
CQ fails if CQ filter deserialization exception occurred on a node that does
not match the cache node filter in case cache node filter must be loaded via
p2p.
Reproducer is linked as PR to the ticket.
> CQ fails due to CQ filter deserialization exception.
> ----------------------------------------------------
>
> Key: IGNITE-12866
> URL: https://issues.apache.org/jira/browse/IGNITE-12866
> Project: Ignite
> Issue Type: Bug
> Reporter: Mikhail Petrov
> Priority: Minor
>
> CQ fails if CQ filter deserialization exception occurred on a node that does
> not match the cache node filter in case cache node filter must be loaded via
> p2p.
> Reproducer:
> {code:java}
> public class CacheContinuousQueryExternalNodeFilterTest extends
> GridCommonAbstractTest {
> /** */
> private static final String EXT_EVT_FILTER_CLS =
> "org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilter";
> /** */
> private static final String EXT_NODE_FILTER_CLS =
> "org.apache.ignite.tests.p2p.AttributeBasedNodeFilter";
> /** */
> private static final URL[] URLS;
> static {
> try {
> URLS = new URL[] {new URL(getProperty("p2p.uri.cls.second"))};
> }
> catch (MalformedURLException e) {
> throw new RuntimeException(e);
> }
> }
> /** */
> private final ClassLoader extLdr = getExternalClassLoader();
> /** */
> private final ClassLoader secondExtLdr = new URLClassLoader(URLS,
> U.gridClassLoader());
> /** {@inheritDoc} */
> @Override protected IgniteConfiguration getConfiguration(String
> igniteInstanceName) throws Exception {
> IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
> if (getTestIgniteInstanceName(0).equals(igniteInstanceName))
> cfg.setClassLoader(extLdr);
> else {
> cfg.setClassLoader(secondExtLdr);
> cfg.setUserAttributes(U.map("skipCacheStart", true));
> }
> return cfg;
> }
> /** */
> @Test
> public void test() throws Exception {
> startGrids(2).cluster().state(ClusterState.ACTIVE);
> Class<IgnitePredicate<ClusterNode>> nodeFilter =
> (Class<IgnitePredicate<ClusterNode>>)extLdr
> .loadClass(EXT_NODE_FILTER_CLS);
> grid(0).createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
> .setNodeFilter(nodeFilter.newInstance()));
> ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
> Class<CacheEntryEventFilter<Integer, Integer>> filterCls =
> (Class<CacheEntryEventFilter<Integer,
> Integer>>)extLdr.loadClass(EXT_EVT_FILTER_CLS);
> Factory<CacheEntryEventFilter<Integer, Integer>> rmtFilterFactory =
> new ClassFilterFactory(filterCls);
> qry.setRemoteFilterFactory(rmtFilterFactory);
>
> assertNull(grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME));
> grid(0).cache(DEFAULT_CACHE_NAME).query(qry);
> assertEquals(1,
> grid(0).context().systemView().view(CQ_SYS_VIEW).size());
> assertEquals(0,
> grid(1).context().systemView().view(CQ_SYS_VIEW).size());
> }
> /** */
> private static class ClassFilterFactory implements
> Factory<CacheEntryEventFilter<Integer, Integer>> {
> /** */
> private Class<CacheEntryEventFilter<Integer, Integer>> cls;
> /** */
> public ClassFilterFactory(Class<CacheEntryEventFilter<Integer,
> Integer>> cls) {
> this.cls = cls;
> }
> /** {@inheritDoc} */
> @Override public CacheEntryEventFilter<Integer, Integer> create() {
> try {
> return cls.newInstance();
> }
> catch (Exception e) {
> throw new RuntimeException(e);
> }
> }
> }
> }
> {code}
> To run the reproducer place the following class to
> modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/ and
> modules/extdata/uri/src/main/java/org/apache/ignite/tests/p2p/
> {code:java}
> public class AttributeBasedNodeFilter implements IgnitePredicate<ClusterNode>
> {
> /** {@inheritDoc} */
> @Override public boolean apply(ClusterNode node) {
> return node.attribute("skipCacheStart") == null;
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)