Hi Gabriel, Yes, using RocksDB state backend can relieve your RAM usage. I see a few issues with your job: 1) it's keeping track of 672 windows (28x24), that's lots of data, so try to reduce number of windows 2) use reduce functions to incrementally aggregate state, rather than buffering data internally
BTW, this kind of questions should be posted to *user@flink alias* rather than dev@flink. Bowen On Wed, May 2, 2018 at 2:20 PM, Gabriel Pelielo <gabrielpeli...@hotmail.com> wrote: > We use Flink to process transactional events. A job was created to > aggregate information about the clients, day of week and hour of day and > thus creating a profile as shown in the attached code. > > > val stream = env.addSource(consumer) > val result = stream > .map(openTransaction => { > val transactionDate = openTransaction.get("transactionDate") > val date = if (transactionDate.isTextual) > LocalDateTime.parse(transactionDate.asText, > DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli > else > transactionDate.asLong > (openTransaction.get("clientId").asLong, > openTransaction.get("amount").asDouble, new Timestamp(date)) > }) > .keyBy(0) > .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1))) > .sum(1) > > In the code above, the stream has three fields: "transactionDate", > "clientId" and "amount". We make a keyed stream by the clientId and a > sliding window summing the amount. There are around 100.000 unique active > clientIds in our database. > > After some time running, the total RAM used by the job is stabilized at 36 > GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to > reduce the RAM usage of the job, maybe by configuring Flink's replication > factor or by using RocksDB? > > > Best regards > > > > > >