Thank you for your reply.
This is my sql, self-join, calculate the proportion and then get top10
cumstors。
 "mytable" has only 60,000 records, after joining, the "records send" is
2,869,940 records, and is still increasing.

select * from (
select
t1.id,
    t1.month,
    t1.customer,
    t1.amount,
    t1.counts,
    t1.counts/t2.counts as countRate,
    t1.amount/t2.amount as amountRate,
    row_number() over(partition by t1.corpId, t1.month order by t1.amount
desc, t1.customer) as rn
from
(SELECT
 id,
 month,
 customer,
 sum(amount) AS amount,
 sum(counts) AS counts
  FROM
  mytable
  GROUP BY id,month,customer
)t1
 inner join
  (
   SELECT
     id,
     month,
     sum(amount) AS amount,
     sum(counts) AS counts
      FROM
   mytable
      WHERE
        GROUP BY id,month
  )t2
  on t1.id =  t2.id
  and t1.month = t2.month


  )t
where rn<=10
;

On Wed, Sep 4, 2019 at 7:48 PM Wesley Peng <wesley.pe...@googlemail.com>
wrote:

> Hi
>
> on 2019/9/4 19:30, liu ze wrote:
> > I use the row_number() over() function to do topN, the total amount of
> > data is 60,000, and the state is 12G .
> > Finally, oom, is there any way to optimize it?
>
> ref:
>
> https://stackoverflow.com/questions/50812837/flink-taskmanager-out-of-memory-and-memory-configuration
>
> The total amount of required physical and heap memory is quite difficult
> to compute since it strongly depends on your user code, your job's
> topology and which state backend you use.
>
> As a rule of thumb, if you experience OOM and are still using the
> FileSystemStateBackend or the MemoryStateBackend, then you should switch
> to RocksDBStateBackend, because it can gracefully spill to disk if the
> state grows too big.
>
> If you are still experiencing OOM exceptions as you have described, then
> you should check your user code whether it keeps references to state
> objects or generates in some other way large objects which cannot be
> garbage collected. If this is the case, then you should try to refactor
> your code to rely on Flink's state abstraction, because with RocksDB it
> can go out of core.
>
> RocksDB itself needs native memory which adds to Flink's memory
> footprint. This depends on the block cache size, indexes, bloom filters
> and memtables. You can find out more about these things and how to
> configure them here.
>
> Last but not least, you should not activate
> taskmanager.memory.preallocate when running streaming jobs, because
> streaming jobs currently don't use managed memory. Thus, by activating
> preallocation, you would allocate memory for Flink's managed memory
> which is reduces the available heap space.
>

Reply via email to