Hi all,
I have a question about a very suspicious behavior I see during
consuming messages using manual synchronous commit with Kafka 0.10.1.0.
The code looks something like this:

try (KafkaConsumer<...> consumer = ...) {
   Map<TopicPartition, OffsetAndMetadata> commitMap =
Collections.synchronizedMap(...);
   while (!Thread.currentThread().isInterrupted()) {
     ConsumerRecords records = consumer.poll(..);
     for (...) {
       // queue records for asynchronous processing in different thread.
       // when the asynchronous processing finishes, it updates the
       // `commitMap', so it has to be synchronized somehow
     }
     synchronized (commitMap) {
       // commit if we have anything to commit
       if (!commitMap.isEmpty()) {
         consumer.commitSync(commitMap);
         commitMap.clear();
       }
     }
   }
}


Now, what time to time happens in my case is that the consumer thread is
stuck in the call to `commitSync`. By straing the PID I found out that
it periodically epolls on an *empty* list of file descriptors. By
further investigation I found out, that response to the `commitSync` is
being handled by the kafka-coordinator-heartbeat-thread, which during
handling of the response needs to access the `commitMap`, and therefore
blocks, because the lock is being held by the application main thread.
Therefore, the whole consumption stops and ends in live-lock. The
solution in my case was to clone the map and unsynchronize the call to
`commitSync` like this:

   final Map<TopicPartition, OffsetAndMetadata> clone;
   synchronized (commitMap) {
     if (!commitMap.isEmpty()) {
       clone = new HashMap<>(commitMap);
       commitMap.clear();
     } else {
       clone = null;
     }
   }
   if (clone != null) {
     consumer.commitSync(clone);
   }

which seems to work fine. My question is whether my interpretation of
the problem is correct and if so, should be anything done to avoid this?
I see two possibilities - either the call to `commitSync` should clone
the map itself, or there should be somehow guaranteed that the same
thread that issues synchronous requests receives the response. Am I right?

Thanks for comments,
  best,
   Jan


Obsah této zprávy má výlučně komunikační charakter. Nepředstavuje návrh na 
uzavření smlouvy či na její změnu ani přijetí případného návrhu. Smlouvy či 
jejich změny jsou společností O2 Czech Republic a.s. uzavírány v písemné formě 
nebo v podobě a postupem podle příslušných všeobecných podmínek společnosti O2 
Czech Republic a.s., a pokud jsou dohodnuty všechny náležitosti. Smlouvy jsou 
uzavírány oprávněnou osobou na základě písemného pověření. Smlouvy o smlouvě 
budoucí jsou uzavírány výhradně v písemné formě, vlastnoručně podepsané nebo s 
uznávaným elektronickým podpisem. Podmínky, za nichž O2 Czech Republic a.s. 
přistupuje k jednání o smlouvě a jakými se řídí, jsou dostupné 
zde<http://www.o2.cz/spolecnost/transparentnost-pri-vyjednavani-o-smlouve/>.

The content of this message is intended for communication purposes only. It 
does neither represent any contract proposal, nor its amendment or acceptance 
of any potential contract proposal. O2 Czech Republic a.s. concludes contracts 
or amendments thereto in a written form or in the form and the procedure in 
accordance with relevant general terms and conditions of O2 Czech Republic 
a.s., if all requirements are agreed. Contracts are concluded by an authorized 
person entitled on the basis of a written authorization. Contracts on a future 
contract are concluded solely in a written form, self-signed or signed by means 
of an advanced electronic signature. The conditions under which O2 Czech 
Republic a.s. negotiates contracts and under which it proceeds are available 
here<http://www.o2.cz/spolecnost/en/transparency-in-contract-negotiations/>.

Reply via email to