I am using IntervalJoin function to join two streams within 10 minutes. As
below:

labelStream.intervalJoin(adLogStream)
           .between(Time.milliseconds(0), Time.milliseconds(600000))
           .process(new processFunction())
           .sink(kafkaProducer)
labelStream and adLogStream are proto-buf class that are keyed by Long id.

Our two input-streams are huge. After running about 30minutes, the output to
kafka go down slowly, like this:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/1.png>
 

When data output begins going down, I use jstack and pstack sevaral times to
get these: 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/2.png>
 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/3.png>
 

It seems the program is stucked in rockdb's seek. And I find that some
rockdb's srt file are accessed slowly by iteration.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/4.png>
 

I have tried several ways:

1)Reduce the input amount to half. This works well.
2)Replace labelStream and adLogStream with simple Strings. This way, data
amount will not change. This works well.
3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and
SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails.
4)Use new versions of rocksdbjni. This still fails.
Can anyone give me some suggestions? Thank you very much.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to