Hi Jiangang, The IntervalJoin is actually the DataStream-level implementation of the SQL time-windowed join[1].
To ensure the completeness of the join results, we have to cache all the records (from both sides) in the most recent time interval. That may lead to state backend problems when huge streams flooding in. One benefit of SQL is that the optimizer will help to reduce the join inputs as much as possible (e.g., via predicate pushdown), but that should be done manually in DataStream programs. Thus, I suggest you to 1) try increasing the parallelism (and number of nodes if possible); 2) filter out some records or reduce the number of fields in advance. Best, Xingcan [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins > On Nov 21, 2018, at 2:06 AM, liujiangang <liujiangangp...@gmail.com> wrote: > > I am using IntervalJoin function to join two streams within 10 minutes. As > below: > > labelStream.intervalJoin(adLogStream) > .between(Time.milliseconds(0), Time.milliseconds(600000)) > .process(new processFunction()) > .sink(kafkaProducer) > labelStream and adLogStream are proto-buf class that are keyed by Long id. > > Our two input-streams are huge. After running about 30minutes, the output to > kafka go down slowly, like this: > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/1.png> > > > When data output begins going down, I use jstack and pstack sevaral times to > get these: > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/2.png> > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/3.png> > > > It seems the program is stucked in rockdb's seek. And I find that some > rockdb's srt file are accessed slowly by iteration. > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/4.png> > > > I have tried several ways: > > 1)Reduce the input amount to half. This works well. > 2)Replace labelStream and adLogStream with simple Strings. This way, data > amount will not change. This works well. > 3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and > SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails. > 4)Use new versions of rocksdbjni. This still fails. > Can anyone give me some suggestions? Thank you very much. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/