Thank you for your reply. This is my sql, self-join, calculate the proportion and then get top10 cumstors。 "mytable" has only 60,000 records, after joining, the "records send" is 2,869,940 records, and is still increasing.
select * from ( select t1.id, t1.month, t1.customer, t1.amount, t1.counts, t1.counts/t2.counts as countRate, t1.amount/t2.amount as amountRate, row_number() over(partition by t1.corpId, t1.month order by t1.amount desc, t1.customer) as rn from (SELECT id, month, customer, sum(amount) AS amount, sum(counts) AS counts FROM mytable GROUP BY id,month,customer )t1 inner join ( SELECT id, month, sum(amount) AS amount, sum(counts) AS counts FROM mytable WHERE GROUP BY id,month )t2 on t1.id = t2.id and t1.month = t2.month )t where rn<=10 ; On Wed, Sep 4, 2019 at 7:48 PM Wesley Peng <wesley.pe...@googlemail.com> wrote: > Hi > > on 2019/9/4 19:30, liu ze wrote: > > I use the row_number() over() function to do topN, the total amount of > > data is 60,000, and the state is 12G . > > Finally, oom, is there any way to optimize it? > > ref: > > https://stackoverflow.com/questions/50812837/flink-taskmanager-out-of-memory-and-memory-configuration > > The total amount of required physical and heap memory is quite difficult > to compute since it strongly depends on your user code, your job's > topology and which state backend you use. > > As a rule of thumb, if you experience OOM and are still using the > FileSystemStateBackend or the MemoryStateBackend, then you should switch > to RocksDBStateBackend, because it can gracefully spill to disk if the > state grows too big. > > If you are still experiencing OOM exceptions as you have described, then > you should check your user code whether it keeps references to state > objects or generates in some other way large objects which cannot be > garbage collected. If this is the case, then you should try to refactor > your code to rely on Flink's state abstraction, because with RocksDB it > can go out of core. > > RocksDB itself needs native memory which adds to Flink's memory > footprint. This depends on the block cache size, indexes, bloom filters > and memtables. You can find out more about these things and how to > configure them here. > > Last but not least, you should not activate > taskmanager.memory.preallocate when running streaming jobs, because > streaming jobs currently don't use managed memory. Thus, by activating > preallocation, you would allocate memory for Flink's managed memory > which is reduces the available heap space. >