Hi all, I'm trying to add mini-batch optimizations for Regular Join (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java) in Blink planner. And there're some test cases that failed, such as AggregateITCase.testGroupBySingleValue.
After debugging, I found the corresponding heap memory for BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from [0,0,0,0,0,0,0,0] to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key. However, my mini-batch code doesn't have any low-level operators with MemorySegment. I only buffered some records (RowData) in a Map just like AbstractMapBundleOperator did. Object reuse was also disabled by env.getConfig.disableObjectReuse(). It looks like there's something wrong when StreamOneInputProcessor.processInput changed the memory segments that do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The debugging page with more information was attached. I'm not familiar with org.apache.flink.core.memory.MemorySegment or sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any ideas about why it happens or where to check next? Thank you. Smile <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
