hi, community:

The implements of closable blocking queue[1]  use a fair lock 
ReentrantLock to guarantee thread-safe. The changes below may be help:


1. replace ReentrantLock with ReentrantReadWriteLock to improve concurrency: in 
fair mode, the ReentrantReadWriteLock has better performance than 
ReentrantLock;[2]


2. replace singnalAll() with signal() to reduce the thread scheduling: signal() 
is safe and reasonable, since all the threads which waiting on the nonEmpty 
Condition wish to take element from queue, and at most one thread can get 
element from queue.






jira: https://issues.apache.org/jira/browse/FLINK-19089




[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java


[2]http://isuru-perera.blogspot.com/2016/05/benchmarking-java-locks-with-counters.html


--
Best,
kui















 

Reply via email to