> On June 18, 2015, 9:59 p.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 1364 > > <https://reviews.apache.org/r/34789/diff/9-10/?file=983137#file983137line1364> > > > > This seems like one of these things that is clever but invariably ends > > up not quite working. Did we actually determine there is a performance hit > > from just synchronizing. Biased locking and lock elision should make this > > very cheap right? Given we acquire locks for all the perf stats and there > > are several stat updates for each poll call I don't see the problem with > > just synchronizing.
Adding synchronization has at least one other downside besides performance - you can pretty easily end up deadlocking due to callbacks (consumer rebalance, commit) if you also have other synchronization outside the consumer itself. And if someone has a chance of accessing the consumer from multiple threads, that probably does mean they have some other synchronization (or should). Deadlocks are generally easier to diagnose than synchronization bugs, but you're introducing potential issues either way. Based on previous conversation, I think this addresses the main concern with the unsynchronized version and actually results in a lot *less* unintuitive behavior than the synchronized version. If I'm already unaware of the fact that I am incorrectly calling the consumer from multiple threads, using the synchronized version is just going to make me think commits/seeks/etc are running really slow as I wait for poll() calls to return, and then blame the library because as far as I know I'm using it correctly and it's just not behaving well. This solution seems a lot better because although it is not guaranteed to catch conflicts, you're a lot more likely to hit them, they get turned into actual exceptions, and the cause is made very clear to you -- it specifically puts the blame on the code calling these methods and indicates that the caller has a bug in their code since they have not properly synchronized access to the consumer. A real race detector is more likely to pick up these errors, but this is a pretty good way to have a fair chance of catching the error and informing the user, especially since the most likely error is calling a method while a poll() is running, and those should generally have pretty long timeouts. Having to litter the code with try/finally blocks is a definite drawback, but just encourages us to keep the API as small as possible :) - Ewen ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review88441 ----------------------------------------------------------- On June 18, 2015, 9:40 p.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34789/ > ----------------------------------------------------------- > > (Updated June 18, 2015, 9:40 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2168 > https://issues.apache.org/jira/browse/KAFKA-2168 > > > Repository: kafka > > > Description > ------- > > KAFKA-2168; refactored callback handling to prevent unnecessary requests > > > KAFKA-2168; address review comments > > > KAFKA-2168; fix rebase error and checkstyle issue > > > KAFKA-2168; address review comments and add docs > > > KAFKA-2168; handle polling with timeout 0 > > > KAFKA-2168; timeout=0 means return immediately > > > KAFKA-2168; address review comments > > > KAFKA-2168; address more review comments > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > 8f587bc0705b65b3ef37c86e0c25bb43ab8803de > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > 1ca75f83d3667f7d01da1ae2fd9488fb79562364 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > 951c34c92710fc4b38d656e99d2a41255c60aeb7 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > f50da825756938c193d7f07bee953e000e2627d9 > > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > 41cb9458f51875ac9418fce52f264b35adba92f4 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > 56281ee15cc33dfc96ff64d5b1e596047c7132a4 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java > e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > cee75410127dd1b86c1156563003216d93a086b3 > clients/src/main/java/org/apache/kafka/common/utils/Utils.java > f73eedb030987f018d8446bb1dcd98d19fa97331 > > clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java > 677edd385f35d4262342b567262c0b874876d25b > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > 1454ab73df22cce028f41f74b970628829da4e9d > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 419541011d652becf0cda7a5e62ce813cddb1732 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java > ecc78cedf59a994fcf084fa7a458fe9ed5386b00 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java > e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 > clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java > 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 > > Diff: https://reviews.apache.org/r/34789/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >