Hi, There are a few JIRA tickets that address this problem [1] [2].
Summary: The best execution strategy depends on the amount of data / window configuration. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-7001 [2] https://issues.apache.org/jira/browse/FLINK-5387 2018-05-04 7:22 GMT+02:00 Rong Rong <walter...@gmail.com>: > Agree with Bowen on this note: you should probably use some more efficient > way of handling the data in sliding window, since data will be "assigned" > to each sliding window through a window assigner and thus costs extra > memory usage. > > BTW: since we are on this topic, I was wondering if there's any way of > improving the memory efficiency in dealing elements that belongs to > overlapping windows. > > -- > Rong > > On Thu, May 3, 2018 at 9:40 PM, Bowen Li <bowenl...@gmail.com> wrote: > > > Hi Gabriel, > > > > Yes, using RocksDB state backend can relieve your RAM usage. I see a few > > issues with your job: 1) it's keeping track of 672 windows (28x24), > that's > > lots of data, so try to reduce number of windows 2) use reduce functions > to > > incrementally aggregate state, rather than buffering data internally > > > > BTW, this kind of questions should be posted to *user@flink alias* > rather > > than dev@flink. > > > > Bowen > > > > > > > > On Wed, May 2, 2018 at 2:20 PM, Gabriel Pelielo < > > gabrielpeli...@hotmail.com> > > wrote: > > > > > We use Flink to process transactional events. A job was created to > > > aggregate information about the clients, day of week and hour of day > and > > > thus creating a profile as shown in the attached code. > > > > > > > > > val stream = env.addSource(consumer) > > > val result = stream > > > .map(openTransaction => { > > > val transactionDate = openTransaction.get("transactionDate") > > > val date = if (transactionDate.isTextual) > > > LocalDateTime.parse(transactionDate.asText, > > > DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset. > UTC).toEpochMilli > > > else > > > transactionDate.asLong > > > (openTransaction.get("clientId").asLong, > > > openTransaction.get("amount").asDouble, new Timestamp(date)) > > > }) > > > .keyBy(0) > > > .window(SlidingEventWeekTimeWindows.of(Time.days(28), > Time.hours(1))) > > > .sum(1) > > > > > > In the code above, the stream has three fields: "transactionDate", > > > "clientId" and "amount". We make a keyed stream by the clientId and a > > > sliding window summing the amount. There are around 100.000 unique > active > > > clientIds in our database. > > > > > > After some time running, the total RAM used by the job is stabilized at > > 36 > > > GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to > > > reduce the RAM usage of the job, maybe by configuring Flink's > replication > > > factor or by using RocksDB? > > > > > > > > > Best regards > > > > > > > > > > > > > > > > > > > > >