Hi Experts, Recently, I was learning how temporal table join works in Flink via reading the source code of TemporalRowTimeJoinOperator. and I found these comments in the source code:
/** > * Mapping from artificial row index (generated by `nextLeftIndex`) > into the left side `Row`. We > * can not use List to accumulate Rows, because we need efficient > deletes of the oldest rows. > * > * <p>TODO: this could be OrderedMultiMap[Jlong, Row] indexed by row's > timestamp, to avoid full > * map traversals (if we have lots of rows on the state that exceed > `currentWatermark`). > */ > private transient MapState<Long, RowData> leftState; > AFAIK that currently Flink hasn't supported such a complex map state, OrderedMultiMap, so I tried to implement a similar one [1] that meets the requirement via existing map state but having some space overhead. And I need some feedback from you about this implementation. Before explaining the overhead and trade-off between current implementation and min, let me try to give a brief introduction of my implementation. First, I implemented a min-heap state so that I can use it to extract the earliest row time of left rows. based on this heap, I implemented a data structure similar to adjacency list, that I can use it to simulate a MapState<K, List<V>> state and archive putting a new value without deserializing the whole list of values that actual MapState<K, List<V>> state will do. Regarding to time complexity and space complexity, let's assume these conditions: 1. The total number of all left rows buffered in state is "N". 2. The distinct number of row times among these buffered rows is "R". 3. The number of emitted results is approximately "K" each time. 4. The distinct number of row times among these emitted results is "P". current implementation my implementation "processElement1" time complexity O(1) O(log R) "extract left rows to emit" time complexity O(N) O(K + PlogR) space complexity N + 1 (1 for nextLeftIndex) 2N + 2R + 2 >From this table, we know that the space overhead is more than two times of current implementation, but the benefit of "extract" time complexity is not always significant due to the fact that it depends on many conditions. Please let me know what you think about such an implementation. Is it worthy or not? Besides, I'm not sure if there is already a similar effort on this. For example, support OrderedMultiMap or OrderedMap state for general purposes. It will be great if you can point those JIRA issues to me. Any feedback is welcome. Thank you in advance. [1] https://github.com/tony810430/flink/commit/0d5d9d70df0e85b0ac161ffa89c11249a0b3db2a best regards,