----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33088/#review81743 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java <https://reviews.apache.org/r/33088/#comment132173> Maybe we can merge them into one error code, UNKNOWN_OR_INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132174> Add a TODO for exposing them in broker configs. For now make them as "static final private" just to be consistent with coding style conventions. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132175> Is removing the comments intentional? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132184> How about creating another class called CoordinatorMetadata, and hide the lock synchronization within its data access API functions? This will make the Coordinator class' function logic cleaner. In addition, we can put addGroup / addConsumer / etc to this class as well. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132180> How about adding another "Start up completed" INFO log here? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132178> This function's name is a bit misleading: I think it is called to indicate the loading is finished, right? Plus as we are not doing the loading metadata yet, let's just remove this boolean and the loadMetadata logic to make it clearer. When we add this optimizaiton we can add this piece of logic back. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132179> How about adding another "Shut down completed" INFO log here? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132185> How about renaming the public APIs to "handleXXX" such as handleJoinGroup, handleHeartbeat, etc, to be distinguished with internal functions, so that internal functions could remove the "do-" prefix? Maybe it is just me being paranoid about namings here.. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132181> See my comment above: I think here we just need to check if coordinator has started up or not, hence we can use some atomic boolean like startupComplete (as in KafkaServer), or isActive (as in KafkaController). core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132186> I think it is OK for a consumer to send join-group with no subscription change, just like you did we can just treat as a heartbeat request and return the originally assigned partitions. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132191> Some coding style suggestion: if { // comments ... } else { // comments ... } core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132192> Ditto above. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132193> Ditto above about CoordinatorMetadata class. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132194> Function name "scheduleHeartbeatExpiration" is a bit misleading, it is really "mark the current heartbeat as completed before expiration if there is any and schedule the next heartbeat". So how about spliting it into two functions, "finishCurrentHeartbeat" and "expectNextHeartbeat", and upon consumer creation we only need to call the latter one. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132195> If we add the CoordinatorMetadata class, these functions can all be moved to that class. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132202> It seems the transition from Rebalancing to Stable is trivial now. In this case shall we just remove the Rebalancing state, or it is actually meaningful to keep it? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132197> "Starting to rebalance .." core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132196> "Stabilized" => "Finished rebalancing" core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132199> This function name is also misleading, from the logic we see the rebalance is actually "never fails": we just take whoever is left at the time of rebalance timeout, and do the assignment. It is pretty strange to me to call some logic named "onRebalanceFailure" inside a function named "onCompleteRebalance". Actually, since this function is only triggered at one place and it will likely to stay this way, how about just remove this function and add the logic back into onCompleteRebalance. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132200> I think we need to add a broker config for maximum heartbeat failure here; right now it is just 1. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132201> These two functions can also be added to the CoordinatorMetadata class if we create one. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132203> Is it ever possible that the group is not Stable here? Since we only try to complete joinGroup once after we finish the rebalance, if it is ever possible to return false this operation will stay forever in the purgatory right? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132205> Throw exception in onExpirationJoinGroup as it should never happen. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132204> Let's call "finishCurrentHeartbeat" upon receiving the join group request (line 155 above), and only "expectNextHeartbeat" here. Because otherwise if the rebalance is waiting long then this consumer will be unnecessarily marked as dead right? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132206> Add a TODO to record some metrics for rebalance timeout. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132213> I think once we split the scheduleHeartbeat Expiration function, we do not need to check here as after the join-group is received but before rebalance is finished, we will not expect any heartbeat anymore. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132209> Add a TRACE level entry here? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132215> This comment is not very clear: most people may not understand what it means except you and me.. Please elaborate a bit more. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132216> Ditto here. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala <https://reviews.apache.org/r/33088/#comment132217> Ditto here. core/src/main/scala/kafka/coordinator/Group.scala <https://reviews.apache.org/r/33088/#comment132218> => "notYetRejoinedConsumers" Some general comments: 1. Could you add some step-by-step and if-else comments in coordinator's functions to make it more self-explanary? 2. Note that onCompleteXXX functions will be called from purgatory repear threads, which will in turn call other internal functions like rebalance(), while other times these functions will be called by the request handler threads. Are there any possible deadlock issues? I ask for another thought-through just because we have been bite-on before due to this fact. - Guozhang Wang On April 26, 2015, 12:57 a.m., Onur Karaman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33088/ > ----------------------------------------------------------- > > (Updated April 26, 2015, 12:57 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1334 > https://issues.apache.org/jira/browse/KAFKA-1334 > > > Repository: kafka > > > Description > ------- > > add heartbeat to coordinator > > todo: > - see how it performs under real load > - add error code handling on the consumer side > > > Diffs > ----- > > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > e55ab11df4db0b0084f841a74cbcf819caf780d5 > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java > 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala > 456b602245e111880e1b8b361319cabff38ee0e9 > core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala > 2f5797064d4131ecfc9d2750d9345a9fa3972a9a > core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala > 6a6bc7bc4ceb648b67332e789c2c33de88e4cd86 > core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala > df60cbc35d09937b4e9c737c67229889c69d8698 > core/src/main/scala/kafka/coordinator/DelayedRebalance.scala > 8defa2e41c92f1ebe255177679d275c70dae5b3e > core/src/main/scala/kafka/coordinator/Group.scala PRE-CREATION > core/src/main/scala/kafka/coordinator/GroupRegistry.scala > 94ef5829b3a616c90018af1db7627bfe42e259e5 > core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala > 821e26e97eaa97b5f4520474fff0fedbf406c82a > core/src/main/scala/kafka/coordinator/PartitionAssignor.scala PRE-CREATION > core/src/main/scala/kafka/server/DelayedOperationKey.scala > b673e43b0ba401b2e22f27aef550e3ab0ef4323c > core/src/main/scala/kafka/server/KafkaApis.scala > b4004aa3a1456d337199aa1245fb0ae61f6add46 > core/src/main/scala/kafka/server/KafkaServer.scala > c63f4ba9d622817ea8636d4e6135fba917ce085a > core/src/main/scala/kafka/server/OffsetManager.scala > 18680ce100f10035175cc0263ba7787ab0f6a17a > core/src/test/scala/unit/kafka/coordinator/GroupTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/33088/diff/ > > > Testing > ------- > > > Thanks, > > Onur Karaman > >