Hi,

There are a few JIRA tickets that address this problem [1] [2].

Summary: The best execution strategy depends on the amount of data / window
configuration.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-7001
[2] https://issues.apache.org/jira/browse/FLINK-5387

2018-05-04 7:22 GMT+02:00 Rong Rong <walter...@gmail.com>:

> Agree with Bowen on this note: you should probably use some more efficient
> way of handling the data in sliding window, since data will be "assigned"
> to each sliding window through a window assigner and thus costs extra
> memory usage.
>
> BTW: since we are on this topic, I was wondering if there's any way of
> improving the memory efficiency in dealing elements that belongs to
> overlapping windows.
>
> --
> Rong
>
> On Thu, May 3, 2018 at 9:40 PM, Bowen Li <bowenl...@gmail.com> wrote:
>
> > Hi Gabriel,
> >
> > Yes, using RocksDB state backend can relieve your RAM usage. I see a few
> > issues with your job: 1) it's keeping track of 672 windows (28x24),
> that's
> > lots of data, so try to reduce number of windows 2) use reduce functions
> to
> > incrementally aggregate state, rather than buffering data internally
> >
> > BTW, this kind of questions should be posted to *user@flink alias*
> rather
> > than dev@flink.
> >
> > Bowen
> >
> >
> >
> > On Wed, May 2, 2018 at 2:20 PM, Gabriel Pelielo <
> > gabrielpeli...@hotmail.com>
> > wrote:
> >
> > > We use Flink to process transactional events. A job was created to
> > > aggregate information about the clients, day of week and hour of day
> and
> > > thus creating a profile as shown in the attached code.
> > >
> > >
> > > val stream = env.addSource(consumer)
> > > val result = stream
> > >   .map(openTransaction => {
> > >     val transactionDate = openTransaction.get("transactionDate")
> > >     val date = if (transactionDate.isTextual)
> > >       LocalDateTime.parse(transactionDate.asText,
> > > DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.
> UTC).toEpochMilli
> > >     else
> > >       transactionDate.asLong
> > >     (openTransaction.get("clientId").asLong,
> > > openTransaction.get("amount").asDouble, new Timestamp(date))
> > >   })
> > >   .keyBy(0)
> > >   .window(SlidingEventWeekTimeWindows.of(Time.days(28),
> Time.hours(1)))
> > >   .sum(1)
> > >
> > > In the code above, the stream has three fields: "transactionDate",
> > > "clientId" and "amount". We make a keyed stream by the clientId and a
> > > sliding window summing the amount. There are around 100.000 unique
> active
> > > clientIds in our database.
> > >
> > > After some time running, the total RAM used by the job is stabilized at
> > 36
> > > GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to
> > > reduce the RAM usage of the job, maybe by configuring Flink's
> replication
> > > factor or by using RocksDB?
> > >
> > >
> > > Best regards
> > >
> > >
> > >
> > >
> > >
> > >
> >
>

Reply via email to