I'm trying to add 100M time series measurements in chunks of BLOCK = 4_500
per value using structure:
Key:
public class Key {
private int securityId;
private long date;
Value:
public class OHLC {
private long date;
private int securityId;
private int size;
private long[] time;
private double[] open;
private double[] high;
private double[] low;
private double[] close;
private double[] marketVWAP;
I need some kind of checkpoints to flush the queues to the cache ideally
30second.
I've made attempts by configuring streamer:
streamer.allowOverwrite(true);
streamer.perNodeBufferSize(20);
streamer.autoFlushFrequency(TimeUnit.SECONDS.toMillis(30));
streamer.skipStore(false);
streamer.keepBinary(true);
and even explicitly flushing :
if (blockId % 20 == 0)
streamer.flush();
After the flush() invoked (suppose to be blocking operation). I'm checking
the count of the cache:
final IgniteCache<Object, Object> cache =
ignite.getOrCreateCache(CACHE_NAME);
System.out.println(" >>> Simulator - Inserted " + cache.size() *
BLOCK_SIZE + " " + new Date());
Thread.sleep(TimeUnit.SECONDS.toMillis(40));
System.out.println(" >>> Simulator - Inserted " + cache.size() *
BLOCK_SIZE + " " + new Date());
Thread.sleep(TimeUnit.SECONDS.toMillis(40));
System.out.println(" >>> Simulator - Inserted " + cache.size() *
BLOCK_SIZE + " " + new Date());
But getting .size() == 1
According documentation for
flush(): "Streams any remaining data, ... this method blocks and doesn't
allow to add any data until all data is streamed."
size(): "Gets the number of all entries cached across all nodes. By default,
if {@code peekModes} value isn't defined, only size of primary copies across
all nodes will be returned."
It does not work from what I understand on 2.1.0. Is there some know work
around how to flush the data from streamer to the cache?
Thanks a lot
Pranas
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/Why-DataStreamer-flush-is-not-flushing-tp16466.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.