Hi Jiangang,

The IntervalJoin is actually the DataStream-level implementation of the SQL 
time-windowed join[1]. 

To ensure the completeness of the join results, we have to cache all the 
records (from both sides) in the most recent time interval. That may lead to 
state backend problems when huge streams flooding in. 

One benefit of SQL is that the optimizer will help to reduce the join inputs as 
much as possible (e.g., via predicate pushdown), but that should be done 
manually in DataStream programs. Thus, I suggest you to 1) try increasing the 
parallelism (and number of nodes if possible); 2) filter out some records or 
reduce the number of fields in advance.

Best,
Xingcan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins

> On Nov 21, 2018, at 2:06 AM, liujiangang <liujiangangp...@gmail.com> wrote:
> 
> I am using IntervalJoin function to join two streams within 10 minutes. As
> below:
> 
> labelStream.intervalJoin(adLogStream)
>           .between(Time.milliseconds(0), Time.milliseconds(600000))
>           .process(new processFunction())
>           .sink(kafkaProducer)
> labelStream and adLogStream are proto-buf class that are keyed by Long id.
> 
> Our two input-streams are huge. After running about 30minutes, the output to
> kafka go down slowly, like this:
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/1.png>
>  
> 
> When data output begins going down, I use jstack and pstack sevaral times to
> get these: 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/2.png>
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/3.png>
>  
> 
> It seems the program is stucked in rockdb's seek. And I find that some
> rockdb's srt file are accessed slowly by iteration.
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/4.png>
>  
> 
> I have tried several ways:
> 
> 1)Reduce the input amount to half. This works well.
> 2)Replace labelStream and adLogStream with simple Strings. This way, data
> amount will not change. This works well.
> 3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and
> SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails.
> 4)Use new versions of rocksdbjni. This still fails.
> Can anyone give me some suggestions? Thank you very much.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to