-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33088/#review81743
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
<https://reviews.apache.org/r/33088/#comment132173>

    Maybe we can merge them into one error code, 
UNKNOWN_OR_INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132174>

    Add a TODO for exposing them in broker configs. For now make them as 
"static final private" just to be consistent with coding style conventions.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132175>

    Is removing the comments intentional?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132184>

    How about creating another class called CoordinatorMetadata, and hide the 
lock synchronization within its data access API functions? This will make the 
Coordinator class' function logic cleaner.
    
    In addition, we can put addGroup / addConsumer / etc to this class as well.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132180>

    How about adding another "Start up completed" INFO log here?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132178>

    This function's name is a bit misleading: I think it is called to indicate 
the loading is finished, right?
    
    Plus as we are not doing the loading metadata yet, let's just remove this 
boolean and the loadMetadata logic to make it clearer. When we add this 
optimizaiton we can add this piece of logic back.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132179>

    How about adding another "Shut down completed" INFO log here?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132185>

    How about renaming the public APIs to "handleXXX" such as handleJoinGroup, 
handleHeartbeat, etc, to be distinguished with internal functions, so that 
internal functions could remove the "do-" prefix? Maybe it is just me being 
paranoid about namings here..



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132181>

    See my comment above: I think here we just need to check if coordinator has 
started up or not, hence we can use some atomic boolean like startupComplete 
(as in KafkaServer), or isActive (as in KafkaController).



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132186>

    I think it is OK for a consumer to send join-group with no subscription 
change, just like you did we can just treat as a heartbeat request and return 
the originally assigned partitions.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132191>

    Some coding style suggestion:
    
    if {
      // comments
      ...
    } else {
      // comments
      ...
    }



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132192>

    Ditto above.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132193>

    Ditto above about CoordinatorMetadata class.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132194>

    Function name "scheduleHeartbeatExpiration" is a bit misleading, it is 
really "mark the current heartbeat as completed before expiration if there is 
any and schedule the next heartbeat". So how about spliting it into two 
functions, "finishCurrentHeartbeat" and "expectNextHeartbeat", and upon 
consumer creation we only need to call the latter one.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132195>

    If we add the CoordinatorMetadata class, these functions can all be moved 
to that class.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132202>

    It seems the transition from Rebalancing to Stable is trivial now. In this 
case shall we just remove the Rebalancing state, or it is actually meaningful 
to keep it?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132197>

    "Starting to rebalance .."



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132196>

    "Stabilized" => "Finished rebalancing"



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132199>

    This function name is also misleading, from the logic we see the rebalance 
is actually "never fails": we just take whoever is left at the time of 
rebalance timeout, and do the assignment. It is pretty strange to me to call 
some logic named "onRebalanceFailure" inside a function named 
"onCompleteRebalance".
    
    Actually, since this function is only triggered at one place and it will 
likely to stay this way, how about just remove this function and add the logic 
back into onCompleteRebalance.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132200>

    I think we need to add a broker config for maximum heartbeat failure here; 
right now it is just 1.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132201>

    These two functions can also be added to the CoordinatorMetadata class if 
we create one.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132203>

    Is it ever possible that the group is not Stable here? Since we only try to 
complete joinGroup once after we finish the rebalance, if it is ever possible 
to return false this operation will stay forever in the purgatory right?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132205>

    Throw exception in onExpirationJoinGroup as it should never happen.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132204>

    Let's call "finishCurrentHeartbeat" upon receiving the join group request 
(line 155 above), and only "expectNextHeartbeat" here. Because otherwise if the 
rebalance is waiting long then this consumer will be unnecessarily marked as 
dead right?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132206>

    Add a TODO to record some metrics for rebalance timeout.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132213>

    I think once we split the scheduleHeartbeat Expiration function, we do not 
need to check here as after the join-group is received but before rebalance is 
finished, we will not expect any heartbeat anymore.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132209>

    Add a TRACE level entry here?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132215>

    This comment is not very clear: most people may not understand what it 
means except you and me.. Please elaborate a bit more.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132216>

    Ditto here.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/33088/#comment132217>

    Ditto here.



core/src/main/scala/kafka/coordinator/Group.scala
<https://reviews.apache.org/r/33088/#comment132218>

    => "notYetRejoinedConsumers"


Some general comments:

1. Could you add some step-by-step and if-else comments in coordinator's 
functions to make it more self-explanary?

2. Note that onCompleteXXX functions will be called from purgatory repear 
threads, which will in turn call other internal functions like rebalance(), 
while other times these functions will be called by the request handler 
threads. Are there any possible deadlock issues? I ask for another 
thought-through just because we have been bite-on before due to this fact.

- Guozhang Wang


On April 26, 2015, 12:57 a.m., Onur Karaman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33088/
> -----------------------------------------------------------
> 
> (Updated April 26, 2015, 12:57 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1334
>     https://issues.apache.org/jira/browse/KAFKA-1334
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> add heartbeat to coordinator
> 
> todo:
> - see how it performs under real load
> - add error code handling on the consumer side
> 
> 
> Diffs
> -----
> 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  e55ab11df4db0b0084f841a74cbcf819caf780d5 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> 456b602245e111880e1b8b361319cabff38ee0e9 
>   core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala 
> 2f5797064d4131ecfc9d2750d9345a9fa3972a9a 
>   core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala 
> 6a6bc7bc4ceb648b67332e789c2c33de88e4cd86 
>   core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala 
> df60cbc35d09937b4e9c737c67229889c69d8698 
>   core/src/main/scala/kafka/coordinator/DelayedRebalance.scala 
> 8defa2e41c92f1ebe255177679d275c70dae5b3e 
>   core/src/main/scala/kafka/coordinator/Group.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/GroupRegistry.scala 
> 94ef5829b3a616c90018af1db7627bfe42e259e5 
>   core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala 
> 821e26e97eaa97b5f4520474fff0fedbf406c82a 
>   core/src/main/scala/kafka/coordinator/PartitionAssignor.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedOperationKey.scala 
> b673e43b0ba401b2e22f27aef550e3ab0ef4323c 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> b4004aa3a1456d337199aa1245fb0ae61f6add46 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> c63f4ba9d622817ea8636d4e6135fba917ce085a 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 18680ce100f10035175cc0263ba7787ab0f6a17a 
>   core/src/test/scala/unit/kafka/coordinator/GroupTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala 
> PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/33088/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Onur Karaman
> 
>

Reply via email to