[
https://issues.apache.org/jira/browse/IGNITE-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16395086#comment-16395086
]
Zbyszek B commented on IGNITE-7918:
-----------------------------------
{code:java}
package leak;
import org.apache.ignite.*;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class Demo {
private final static Ignite ignite = createIgnite();
private final static AtomicLong producedCount = new AtomicLong();
private final static AtomicLong consumedCount = new AtomicLong();
private final static int numOfLocalFields = 10;
private static BinaryObject localPrototype = createLocalPrototype(); //
prototype of local cache entity with all fields set to null
private final static int numOfGlobalFields = 5;
private final static List<String> globalFields = globalFields();
private static BinaryObject globalPrototype = createGlobalPrototype(); //
prototype of cluster-wide cache entity with all fields set to null
private static final IgniteCache<String, BinaryObject> globalCache =
getOrCreateGlobalCache();
private final static String ID = "id";
private final static boolean useLocalCacheToCauseMemLeak = true;
private final static BlockingQueue<Object> queue = new
LinkedBlockingQueue<>(10);
public static void main(String[] args) throws Exception {
CompletableFuture.runAsync(Demo::runProducer);
CompletableFuture.runAsync(Demo::runConsumer);
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
do {
String input = br.readLine();
if ("q".equals(input)) {
System.out.println("Exit!");
System.exit(0);
} else {
String timestamp = new SimpleDateFormat("HH:mm:ss").format(new Date());
System.out.println(String.format("[%s] Queue size: %d", timestamp,
queue.size()));
System.out.println(String.format("[%s] Local caches produced: %d", timestamp,
producedCount.longValue()));
System.out.println(String.format("[%s] Local caches consumed %d", timestamp,
consumedCount.longValue()));
System.out.println(String.format("[%s] Global cache size: %d", timestamp,
globalCache.size(CachePeekMode.ALL)));
}
} while (true);
}
private static void runProducer() {
do {
long cacheId = producedCount.incrementAndGet();
try {
BinaryObject bObj = createLocalObject(1);
if (useLocalCacheToCauseMemLeak) {
IgniteCache<String, BinaryObject> cache = createLocalCache(cacheId);
cache.put(bObj.field(ID), bObj);
queue.put(cache);
} else {
Map<String, BinaryObject> cache = new HashMap<>();
cache.put(bObj.field(ID), bObj);
queue.put(cache);
}
} catch (Exception e) {
System.out.println(e.toString());
System.exit(1);
}
} while (true);
}
private static List<?> getSQLSelectRow(BinaryObject localObject) {
List<?> res = new ArrayList<>();
for (String f : globalFields) {
res.add(localObject.field(f));
}
return res;
}
@SuppressWarnings("unchecked")
private static void runConsumer() {
do {
try {
final Object taken = queue.take();
final List<List<?>> globalSQLRows = new ArrayList<>(); // Note: In practice
rows are generated by executing SqlFieldsQuery against local cache; here
omitted for sake of simplicity.
if (taken instanceof IgniteCache) {
final IgniteCache<String, BinaryObject> cache = IgniteCache.class.cast(taken);
cache.forEach(e -> {
globalSQLRows.add(getSQLSelectRow(e.getValue()));
});
cache.destroy();
} else if (taken instanceof Map) {
final Map<String, BinaryObject> cache = Map.class.cast(taken);
cache.forEach((k, v) -> {
globalSQLRows.add(getSQLSelectRow(v));
});
}
consumedCount.incrementAndGet();
final IgniteDataStreamer<String, BinaryObject> globalStreamer =
createGlobalStreamer(globalCache);
for (List<?> row : globalSQLRows) {
final BinaryObjectBuilder globalBuilder = globalPrototype.toBuilder();
for (int i = 0; i < globalFields.size(); i++) {
globalBuilder.setField(globalFields.get(i), row.get(i));
}
final BinaryObject gObj = globalBuilder.build();
globalStreamer.addData(gObj.field(ID), gObj);
}
globalStreamer.flush();
globalStreamer.close();
} catch (Exception e) {
System.out.println(e.toString());
System.exit(1);
}
} while (true);
}
private static Ignite createIgnite() {
IgniteConfiguration iCfg = new IgniteConfiguration();
TcpCommunicationSpi tcpCommunication = new TcpCommunicationSpi();
tcpCommunication.setMessageQueueLimit(1024); // to get rid of the warning
iCfg.setCommunicationSpi(tcpCommunication);
String workDirectory = System.getProperty("user.home") + File.separator +
"ignite";
iCfg.setWorkDirectory(workDirectory);
System.out.println();
System.out.println(String.format(">>> Starting Ignite on %s; work directory %s
...", "MyLeakingNode", workDirectory));
System.out.println();
final Ignite ignite = Ignition.start(iCfg);
ClusterNode localNode = ignite.cluster().localNode();
System.out.println();
System.out.println(String.format(">>> Ignite started on %s (%s) successfully!",
"MyLeakingNode", localNode.id()));
System.out.println();
return ignite;
}
private static IgniteCache<String, BinaryObject> createLocalCache(long id) {
final String cacheName = "LocalCache" + id;
final CacheConfiguration<String, BinaryObject> cCfg = new
CacheConfiguration<>();
cCfg.setName(cacheName);
cCfg.setStoreKeepBinary(true);
cCfg.setCacheMode(CacheMode.LOCAL);
cCfg.setOnheapCacheEnabled(false);
cCfg.setCopyOnRead(false);
cCfg.setBackups(0);
cCfg.setWriteBehindEnabled(false);
cCfg.setReadThrough(false);
cCfg.setReadFromBackup(false);
final QueryEntity queryEntity = new QueryEntity(String.class.getTypeName(),
"LocalEntity");
for (String field : localFields()) {
queryEntity.addQueryField(field, String.class.getTypeName(), field);
}
cCfg.setQueryEntities(Collections.singletonList( queryEntity));
ignite.destroyCache(cacheName); // local cache is not really local - reference
can be kept by other nodes if restart during the load happens
return ignite.createCache(cCfg).withKeepBinary();
}
private static List<String> localFields() {
return Stream.concat(Stream.of(ID), IntStream.rangeClosed(1,
numOfLocalFields).boxed().map(e -> String.format("field%s",
e))).collect(Collectors.toList());
}
private static BinaryObject createLocalPrototype() {
BinaryObjectBuilder builder = ignite.binary().builder("LocalEntity");
for (String field : localFields()) {
builder.setField(field, null, String.class);
}
return builder.build();
}
private static BinaryObject createLocalObject(long id) {
BinaryObjectBuilder res = localPrototype.toBuilder();
for (String field : localFields()) {
if (ID.equals(field)) {
res.setField(field, String.valueOf(id), String.class);
} else {
res.setField(field, id + "@" + field, String.class);
}
}
return res.build();
}
private static IgniteCache<String, BinaryObject> getOrCreateGlobalCache() {
final String cacheName = "GlobalCache";
final CacheConfiguration<String, BinaryObject> cCfg = new
CacheConfiguration<>();
cCfg.setName(cacheName);
cCfg.setStoreKeepBinary(true);
cCfg.setCacheMode(CacheMode.PARTITIONED);
cCfg.setOnheapCacheEnabled(false);
cCfg.setCopyOnRead(false);
cCfg.setBackups(0);
cCfg.setWriteBehindEnabled(false);
cCfg.setReadThrough(false);
cCfg.setQueryEntities(Collections.singletonList(new
QueryEntity(String.class.getTypeName(), cacheName))); // must be defined
otherwise indexing Spi remove method is not called
return ignite.getOrCreateCache(cCfg).withKeepBinary();
}
private static IgniteDataStreamer<String, BinaryObject>
createGlobalStreamer(IgniteCache<String, BinaryObject> cache) {
IgniteDataStreamer<String, BinaryObject> streamer =
ignite.dataStreamer(cache.getName());
streamer.allowOverwrite(true);
streamer.skipStore(true);
streamer.keepBinary(true);
return streamer;
}
private static List<String> globalFields() {
return Stream.concat(Stream.of(ID), IntStream.rangeClosed(1,
numOfGlobalFields).boxed().map(e -> String.format("field%s",
e))).collect(Collectors.toList());
}
private static BinaryObject createGlobalPrototype() {
BinaryObjectBuilder builder = ignite.binary().builder("GlobalEntity");
for (String field : globalFields) {
builder.setField(field, null, String.class);
}
return builder.build();
}
}
{code}
> Huge memory leak when data streamer used together with local cache
> ------------------------------------------------------------------
>
> Key: IGNITE-7918
> URL: https://issues.apache.org/jira/browse/IGNITE-7918
> Project: Ignite
> Issue Type: Bug
> Components: cache
> Affects Versions: 2.3
> Reporter: Zbyszek B
> Priority: Blocker
>
> Dear Igniters,
> We observe huge memory leak when data streamer used together with local cache.
> In the attached demo producer produces local cache with single binary object
> and passes this to queue. Consumer picks up the cache from the queue,
> constructs different binary object from it, adds it to global partitioned
> cache and destroys local cache.
> This design causes a significant leak - the whole heap it takes within
> minutes (no matter if this is 4G or 24G).
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)