Hi, could you tell me which version do you use? I just want to check whether there are any problems.
Best, Shengkai 张颖 <queyue...@163.com> 于2021年4月25日周日 下午5:23写道: > hi,I met an appearance like this: > > this is my sql: > SELECT distinct header,label,reqsig,dense_feat,item_id_feat,user_id_feat > FROM app.app_ranking_feature_table_clk_ord_hp_new_all_tree_orc where > dt='2021-04-01' > > > When I useBlinkPlanner inBatchMode, It works well; But if I set > inStreamMode, > It cause a heap OOM. > > Caused by: java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:3236) > at org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer( > DataOutputSerializer.java:85) > at org.apache.flink.contrib.streaming.state. > RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace( > RocksDBSerializedCompositeKeyBuilder.java:113) > at org.apache.flink.contrib.streaming.state.AbstractRocksDBState > .serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163) > at org.apache.flink.contrib.streaming.state.RocksDBValueState.value( > RocksDBValueState.java:83) > at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction > .processElement(GroupAggFunction.java:129) > at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction > .processElement(GroupAggFunction.java:43) > at org.apache.flink.streaming.api.operators.KeyedProcessOperator > .processElement(KeyedProcessOperator.java:83) > at org.apache.flink.streaming.runtime.tasks. > OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask > .java:191) > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput > .processElement(StreamTaskNetworkInput.java:204) > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput > .emitNext(StreamTaskNetworkInput.java:174) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:65) > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:399) > at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$221/ > 285424866.runDefaultAction(Unknown Source) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:191) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:620) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:584) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:844) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:636) > at java.lang.Thread.run(Thread.java:748) > > > I use the rocksdb, and I confirm it works,then I jmap the tm: > num #instances #bytes class name > ---------------------------------------------- > 1: 214656 4420569368 [C > 2: 111199 2376771576 [B > 3: 137904 7722624 > org.apache.flink.core.memory.HybridMemorySegment > 4: 214539 5148936 java.lang.String > 5: 31796 2635104 [Ljava.lang.Object; > 6: 105133 2523192 > [Lorg.apache.flink.core.memory.MemorySegment; > 7: 105115 2522760 > org.apache.flink.table.data.binary.BinarySection > 8: 105115 2522760 > org.apache.flink.table.data.binary.BinaryStringData > 9: 32812 2099968 java.nio.DirectByteBuffer > 10: 14838 1651560 java.lang.Class > 11: 50002 1600064 > java.util.concurrent.ConcurrentHashMap$Node > 12: 43014 1376448 java.util.Hashtable$Entry > 13: 32805 1312200 sun.misc.Cleaner > > > It looks like the data is in heap rather than in rocksdb, Is there any way > to set the data to the rocksdb? > > > > >