Hi Xiao,

Thanks for reporting this.
You approach sounds good to me. But we have many similar problems in
existing streaming sql operator implementations.
So I think if State API / statebackend can provide a better state structure
to handle this situation would be great.

This is a similar problem with poor performance of RocksDBListState. And
the relative discussions have been raised several times [1][2].
The root cause is RocsDBStatBackend serialize the whole list as a byte[].
And there were some ideas proposed in the thread.

I cc'ed Yu Li who works on statebackend.

Thanks,
Jark


[1]: https://issues.apache.org/jira/browse/FLINK-8297
[2]:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLINK-8297-A-solution-for-FLINK-8297-Timebased-RocksDBListState-tc28259.html


On Wed, 14 Aug 2019 at 14:46, LIU Xiao <xiao.liu...@qq.com> wrote:

> Example SQL:
>
> SELECT *
> FROM stream1 s1, stream2 s2
> WHERE s1.id = s2.id AND s1.rowtime = s2.rowtime
>
> And we have lots of messages in stream1 and stream2 share a same rowtime.
>
> It runs fine when using heap as the state backend,
> but requires lots of heap memory sometimes (when upstream out of sync,
> etc), and a risk of full gc exists.
>
> When we use RocksDBStateBackend to lower the heap memory usage, we found
> our program runs unbearably slow.
>
> After examing the code we found
> org.apache.flink.table.runtime.join.TimeBoundedStreamJoin#processElement1()
> may be the cause of the problem (we are using Flink 1.6 but 1.8 should be
> same):
> ...
>     // Check if we need to cache the current row.
>     if (rightOperatorTime < rightQualifiedUpperBound) {
>       // Operator time of right stream has not exceeded the upper window
> bound of the current
>       // row. Put it into the left cache, since later coming records from
> the right stream are
>       // expected to be joined with it.
>       var leftRowList = leftCache.get(timeForLeftRow)
>       if (null == leftRowList) {
>         leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
>       }
>       leftRowList.add(JTuple2.of(leftRow, emitted))
>       leftCache.put(timeForLeftRow, leftRowList)
> ...
>
> In above code, if there are lots of messages with a same timeForLeftRow,
> the serialization and deserialization cost will be very high when using
> RocksDBStateBackend.
>
> A simple fix I came up with:
> ...
>   // cache to store rows from the left stream
>   //private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _
>   private var leftCache: MapState[JTuple2[Long, Integer],
> JList[JTuple2[Row, Boolean]]] = _
>   private var leftCacheSize: MapState[Long, Integer] = _
> ...
>     // Check if we need to cache the current row.
>     if (rightOperatorTime < rightQualifiedUpperBound) {
>       // Operator time of right stream has not exceeded the upper window
> bound of the current
>       // row. Put it into the left cache, since later coming records from
> the right stream are
>       // expected to be joined with it.
>       //var leftRowList = leftCache.get(timeForLeftRow)
>       //if (null == leftRowList) {
>       //  leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
>       //}
>       //leftRowList.add(JTuple2.of(leftRow, emitted))
>       //leftCache.put(timeForLeftRow, leftRowList)
>       var leftRowListSize = leftCacheSize.get(timeForLeftRow)
>       if (null == leftRowListSize) {
>         leftRowListSize = new Integer(0)
>       }
>       leftCache.put(JTuple2.of(timeForLeftRow, leftRowListSize),
> JTuple2.of(leftRow, emitted))
>       leftCacheSize.put(timeForLeftRow, leftRowListSize + 1)
> ...
>
> --
> LIU Xiao <xiao.liu...@qq.com>
>
>

Reply via email to