[ 
https://issues.apache.org/jira/browse/IGNITE-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16395086#comment-16395086
 ] 

Zbyszek B commented on IGNITE-7918:
-----------------------------------

{code:java}
package leak;

import org.apache.ignite.*;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;


public class Demo {

private final static Ignite ignite = createIgnite();

private final static AtomicLong producedCount = new AtomicLong();
private final static AtomicLong consumedCount = new AtomicLong();
private final static int numOfLocalFields = 10;
private static BinaryObject localPrototype = createLocalPrototype(); // 
prototype of local cache entity with all fields set to null

private final static int numOfGlobalFields = 5;
private final static List<String> globalFields = globalFields();
private static BinaryObject globalPrototype = createGlobalPrototype(); // 
prototype of cluster-wide cache entity with all fields set to null
private static final IgniteCache<String, BinaryObject> globalCache = 
getOrCreateGlobalCache();

private final static String ID = "id";

private final static boolean useLocalCacheToCauseMemLeak = true;

private final static BlockingQueue<Object> queue = new 
LinkedBlockingQueue<>(10);

public static void main(String[] args) throws Exception {

CompletableFuture.runAsync(Demo::runProducer);
CompletableFuture.runAsync(Demo::runConsumer);

BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
do {
String input = br.readLine();
if ("q".equals(input)) {
System.out.println("Exit!");
System.exit(0);
} else {
String timestamp = new SimpleDateFormat("HH:mm:ss").format(new Date());
System.out.println(String.format("[%s] Queue size: %d", timestamp, 
queue.size()));
System.out.println(String.format("[%s] Local caches produced: %d", timestamp, 
producedCount.longValue()));
System.out.println(String.format("[%s] Local caches consumed %d", timestamp, 
consumedCount.longValue()));
System.out.println(String.format("[%s] Global cache size: %d", timestamp, 
globalCache.size(CachePeekMode.ALL)));
}
} while (true);
}

private static void runProducer() {
do {
long cacheId = producedCount.incrementAndGet();
try {
BinaryObject bObj = createLocalObject(1);
if (useLocalCacheToCauseMemLeak) {
IgniteCache<String, BinaryObject> cache = createLocalCache(cacheId);
cache.put(bObj.field(ID), bObj);
queue.put(cache);
} else {
Map<String, BinaryObject> cache = new HashMap<>();
cache.put(bObj.field(ID), bObj);
queue.put(cache);
}
} catch (Exception e) {
System.out.println(e.toString());
System.exit(1);
}
} while (true);
}


private static List<?> getSQLSelectRow(BinaryObject localObject) {
List<?> res = new ArrayList<>();
for (String f : globalFields) {
res.add(localObject.field(f));
}
return res;
}


@SuppressWarnings("unchecked")
private static void runConsumer() {
do {
try {
final Object taken = queue.take();
final List<List<?>> globalSQLRows = new ArrayList<>(); // Note: In practice 
rows are generated by executing SqlFieldsQuery against local cache; here 
omitted for sake of simplicity.
if (taken instanceof IgniteCache) {
final IgniteCache<String, BinaryObject> cache = IgniteCache.class.cast(taken);
cache.forEach(e -> {
globalSQLRows.add(getSQLSelectRow(e.getValue()));
});
cache.destroy();
} else if (taken instanceof Map) {
final Map<String, BinaryObject> cache = Map.class.cast(taken);
cache.forEach((k, v) -> {
globalSQLRows.add(getSQLSelectRow(v));
});
}

consumedCount.incrementAndGet();

final IgniteDataStreamer<String, BinaryObject> globalStreamer = 
createGlobalStreamer(globalCache);
for (List<?> row : globalSQLRows) {
final BinaryObjectBuilder globalBuilder = globalPrototype.toBuilder();
for (int i = 0; i < globalFields.size(); i++) {
globalBuilder.setField(globalFields.get(i), row.get(i));
}
final BinaryObject gObj = globalBuilder.build();
globalStreamer.addData(gObj.field(ID), gObj);
}
globalStreamer.flush();
globalStreamer.close();

} catch (Exception e) {
System.out.println(e.toString());
System.exit(1);
}
} while (true);
}

private static Ignite createIgnite() {
IgniteConfiguration iCfg = new IgniteConfiguration();
TcpCommunicationSpi tcpCommunication = new TcpCommunicationSpi();
tcpCommunication.setMessageQueueLimit(1024); // to get rid of the warning
iCfg.setCommunicationSpi(tcpCommunication);
String workDirectory = System.getProperty("user.home") + File.separator + 
"ignite";
iCfg.setWorkDirectory(workDirectory);
System.out.println();
System.out.println(String.format(">>> Starting Ignite on %s; work directory %s 
...", "MyLeakingNode", workDirectory));
System.out.println();
final Ignite ignite = Ignition.start(iCfg);
ClusterNode localNode = ignite.cluster().localNode();
System.out.println();
System.out.println(String.format(">>> Ignite started on %s (%s) successfully!", 
"MyLeakingNode", localNode.id()));
System.out.println();
return ignite;
}

private static IgniteCache<String, BinaryObject> createLocalCache(long id) {
final String cacheName = "LocalCache" + id;
final CacheConfiguration<String, BinaryObject> cCfg = new 
CacheConfiguration<>();
cCfg.setName(cacheName);
cCfg.setStoreKeepBinary(true);
cCfg.setCacheMode(CacheMode.LOCAL);
cCfg.setOnheapCacheEnabled(false);
cCfg.setCopyOnRead(false);
cCfg.setBackups(0);
cCfg.setWriteBehindEnabled(false);
cCfg.setReadThrough(false);
cCfg.setReadFromBackup(false);

final QueryEntity queryEntity = new QueryEntity(String.class.getTypeName(), 
"LocalEntity");
for (String field : localFields()) {
queryEntity.addQueryField(field, String.class.getTypeName(), field);
}
cCfg.setQueryEntities(Collections.singletonList( queryEntity));

ignite.destroyCache(cacheName); // local cache is not really local - reference 
can be kept by other nodes if restart during the load happens
return ignite.createCache(cCfg).withKeepBinary();
}


private static List<String> localFields() {
return Stream.concat(Stream.of(ID), IntStream.rangeClosed(1, 
numOfLocalFields).boxed().map(e -> String.format("field%s", 
e))).collect(Collectors.toList());
}

private static BinaryObject createLocalPrototype() {
BinaryObjectBuilder builder = ignite.binary().builder("LocalEntity");
for (String field : localFields()) {
builder.setField(field, null, String.class);
}
return builder.build();
}

private static BinaryObject createLocalObject(long id) {
BinaryObjectBuilder res = localPrototype.toBuilder();
for (String field : localFields()) {
if (ID.equals(field)) {
res.setField(field, String.valueOf(id), String.class);
} else {
res.setField(field, id + "@" + field, String.class);
}
}
return res.build();
}

private static IgniteCache<String, BinaryObject> getOrCreateGlobalCache() {
final String cacheName = "GlobalCache";
final CacheConfiguration<String, BinaryObject> cCfg = new 
CacheConfiguration<>();
cCfg.setName(cacheName);
cCfg.setStoreKeepBinary(true);
cCfg.setCacheMode(CacheMode.PARTITIONED);
cCfg.setOnheapCacheEnabled(false);
cCfg.setCopyOnRead(false);
cCfg.setBackups(0);
cCfg.setWriteBehindEnabled(false);
cCfg.setReadThrough(false);
cCfg.setQueryEntities(Collections.singletonList(new 
QueryEntity(String.class.getTypeName(), cacheName))); // must be defined 
otherwise indexing Spi remove method is not called
return ignite.getOrCreateCache(cCfg).withKeepBinary();
}

private static IgniteDataStreamer<String, BinaryObject> 
createGlobalStreamer(IgniteCache<String, BinaryObject> cache) {
IgniteDataStreamer<String, BinaryObject> streamer = 
ignite.dataStreamer(cache.getName());
streamer.allowOverwrite(true);
streamer.skipStore(true);
streamer.keepBinary(true);
return streamer;
}

private static List<String> globalFields() {
return Stream.concat(Stream.of(ID), IntStream.rangeClosed(1, 
numOfGlobalFields).boxed().map(e -> String.format("field%s", 
e))).collect(Collectors.toList());
}

private static BinaryObject createGlobalPrototype() {
BinaryObjectBuilder builder = ignite.binary().builder("GlobalEntity");
for (String field : globalFields) {
builder.setField(field, null, String.class);
}
return builder.build();
}


}


{code}

> Huge memory leak when data streamer used together with local cache
> ------------------------------------------------------------------
>
>                 Key: IGNITE-7918
>                 URL: https://issues.apache.org/jira/browse/IGNITE-7918
>             Project: Ignite
>          Issue Type: Bug
>          Components: cache
>    Affects Versions: 2.3
>            Reporter: Zbyszek B
>            Priority: Blocker
>
> Dear Igniters,
> We observe huge memory leak when data streamer used together with local cache.
> In the attached demo producer produces local cache with single binary object 
> and passes this to queue. Consumer picks up the cache from the queue, 
> constructs different binary object from it, adds it to global partitioned 
> cache and destroys local cache.
> This design causes a significant leak - the whole heap it takes within 
> minutes (no matter if this is 4G or 24G).
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to