Hello, Igniters.
I'm working on IGNITE-425 [1] issue.
I made a couple of changes in my branch [2] so I want to confirm that
changes with community before moving forward:
Text of issue:
```
Currently if updated entry passes the filter, it is sent to node initiated
the query entirely.
It would be good to provide user with the ability to transform entry and,
for example,
select only fields that are important. This may bring huge economy to
traffic and lower GC pressure as well.
```
1. I create new class ContinuousQueryWithTransformer extends Query:
Reasons to create entirely new class without extending ContinuousQuery:
a. ContinuousQuery is final so user can't extends it. I don't want to
change that.
b. ContinuousQuery contains some deprecated methods(setRemoteFilter) so
with new class we can get rid of them.
c. Such public API design disallow usage of existing localEventListener
with new transformedEventListenr in compile time.
```
public final class ContinuousQueryWithTransformer<K, V, T> extends
Query<Cache.Entry<K, V>> {
public ContinuousQueryWithTransformer<K, V, T>
setRemoteFilterFactory(Factory<? extends CacheEntryEventFilter<K, V>>
rmtFilterFactory) { /**/ }
public ContinuousQueryWithTransformer<K, V, T>
setRemoteTransformerFactory(Factory<? extends IgniteBiClosure<K, V, T>>
factory) { /**/ }
public ContinuousQueryWithTransformer<K, V, T>
setLocalTransformedEventListener(TransformedEventListener<T>
locTransEvtLsnr) { /**/ }
public interface TransformedEventListener<T> {
void onUpdated(Iterable<? extends T> events) throws
CacheEntryListenerException;
}
}
```
2. I want to edit all tests from package
`core/src/test/java/org/apach/ignite/internal/processors/cache/query/continuous/`
to ensure my implementation fully support existing tests.
I want to make each test can work both for regular ContinousQuery and
ContinuousQueryWithTransformer:
Existing test:
```
ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
qry.setLocalListener(new CacheEntryUpdatedListener<Object,
Object>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>>
evts) {
for (CacheEntryEvent evt : evts) {
if ((Integer)evt.getValue() >= 0)
evtCnt.incrementAndGet();
}
}
});
```
To be:
```
Query qry = createContinuousQuery();
setLocalListener(qry, new CI1<T2<Object, Object>>() {
@Override public void apply(T2<Object, Object> e) {
if ((Integer)e.getValue() >= 0)
evtCnt.incrementAndGet();
}
});
```
Base class to support setLocalListener:
```
protected <K, V> void setLocalListener(Query q, CI1<T2<K, V>> lsnrClsr)
{
if (isContinuousWithTransformer()) {
((ContinuousQueryWithTransformer)q)
.setLocalTransformedEventListener(new
TransformedEventListenerImpl(lsnrClsr));
} else
((ContinuousQuery)q).setLocalListener(new
CacheInvokeListener(lsnrClsr));
}
protected static class CacheInvokeListener<K, V> {
private CI1<T2<K, V>> clsr;
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends
K, ? extends V>> events)
throws CacheEntryListenerException {
for (CacheEntryEvent<? extends K, ? extends V> e : events)
clsr.apply(ignite, new T2<>(e.getKey(), e.getValue()));
}
}
protected static class TransformedEventListenerImpl<K, V> implements
TransformedEventListener {
private IgniteBiInClosure<Ignite, T2<K, V>> clsr;
@Override public void onUpdated(Iterable evts) throws
CacheEntryListenerException {
for (Object e : evts) {
clsr.apply((T2)e);
}
}
}
```
Thoughts?
[1] https://issues.apache.org/jira/browse/IGNITE-425
[2] https://github.com/nizhikov/ignite/pull/9/files
--
Nikolay Izhikov
[email protected]