> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java, lines 
> > 73-76
> > <https://reviews.apache.org/r/33088/diff/9/?file=941786#file941786line73>
> >
> >     Maybe we can merge them into one error code, 
> > UNKNOWN_OR_INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY?

I have mixed feelings on this. The growing number of error codes bothers me, 
but it also seems like the distinct error codes will be helpful in consumer 
logs.

This issue comes up again with roundrobin where a group is doing roundrobin and 
a new consumer joining the group subscribes to a different set of topics from 
the others in the group. It's not really 
INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY, at least not with the current 
naming and exception message.


> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 60-66
> > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line60>
> >
> >     Is removing the comments intentional?

Yeah it was intentional. The comments were almost exactly repeating the same 
information as the variable types.


> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 200
> > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line200>
> >
> >     Some coding style suggestion:
> >     
> >     if {
> >       // comments
> >       ...
> >     } else {
> >       // comments
> >       ...
> >     }

Will do.

Somewhat of a tangent: It looks like we started using a checkstyle on 
kafka-clients. It would be nice if kafka came with eclipse and intellij style 
schemes for java and scala to reduce the style related comments and have the 
IDEs more proactively do the fixes for us.


> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 
> > 374-382
> > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line374>
> >
> >     It seems the transition from Rebalancing to Stable is trivial now. In 
> > this case shall we just remove the Rebalancing state, or it is actually 
> > meaningful to keep it?

I prefer having each state representing one action for that group. If we reduce 
it down to something like Dead, Stable, and Rebalancing as I think you're 
suggesting, then the Rebalancing state would mean that we're either waiting for 
consumers to rejoin OR we're recomputing the partition assignments, but we 
don't know which. With Dead, Stable, PreparingRebalance, and Rebalancing, we 
reduce the amount of guessing for what the group is doing within a given state.


> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 
> > 400-401
> > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line400>
> >
> >     I think we need to add a broker config for maximum heartbeat failure 
> > here; right now it is just 1.

Paraphrased offline discussion I had with Guozhang:

The only use case I can imagine for the maximum failures concept is where we 
already have consumers out in the wild across many different services and we've 
found that the default session timeout is too quick, so we end up getting a lot 
of group membership churn. It may be hard to get all the services to change 
their consumer configs, so we'd rolling bounce brokers with a higher 
maxAllowedExpiredHeartbeats.

Pros of having the maximum failures concept:
- It's a sort of hotfix means of giving the brokers some coarse-grained control 
over reducing group membership churn.

Cons of having the maximum failures concept:
- It's a sort of hotfix means of giving the brokers some coarse-grained control 
over reducing group membership churn.
- Consumers don't know the broker's maxAllowedExpiredHeartbeats, so it seems 
odd from the consumer's perspective that they'd specify the session timeout for 
session expiration, but it may end up actually being some multiple of that 
value (and this entirely depends on which broker they happen to communicate 
with).
- If we accept session timouts in the range [low, high], then really the 
consumer failures would happen in a time range of [low * 
maxAllowedExpiredHeartbeats, high * maxAllowedExpiredHeartbeats]. This gap can 
end up uncomfortably large.

Guozhang and I agreed to remove this concept for now. If we see later on that 
we need to make the coordinator more conservative in marking consumers as dead, 
we can add similar logic in later.

P.S.: If we want to eventually use this maximum failures concept, there's a bug 
in onConsumerHeartbeatExpired. If we have NOT exceeded the 
maxAllowedExpiredHeartbeats, then we should schedule another DelayedHeartbeat 
and not update consumer.latestHeartbeat.


> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 
> > 451-453
> > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line451>
> >
> >     Is it ever possible that the group is not Stable here? Since we only 
> > try to complete joinGroup once after we finish the rebalance, if it is ever 
> > possible to return false this operation will stay forever in the purgatory 
> > right?

tryComplete gets called:
- when a group is Stable just after it finishes rebalance 
(joinGroupPurgatory.checkAndComplete)
- when a group is PreparingRebalance and we add a DelayedJoinGroup to the 
purgatory (joinGroupPurgatory.tryCompleteElseWatch)

The check is there for the 2nd case.


> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 504
> > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line504>
> >
> >     Add a TRACE level entry here?

I don't see the point. A log entry here just says the DelayedHeartbeat somehow 
finished (either gracefully completed or expired).


- Onur


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33088/#review81743
-----------------------------------------------------------


On May 5, 2015, 5:50 p.m., Onur Karaman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33088/
> -----------------------------------------------------------
> 
> (Updated May 5, 2015, 5:50 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1334
>     https://issues.apache.org/jira/browse/KAFKA-1334
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> add heartbeat to coordinator
> 
> todo:
> - see how it performs under real load
> - add error code handling on the consumer side
> 
> 
> Diffs
> -----
> 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  e55ab11df4db0b0084f841a74cbcf819caf780d5 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> 456b602245e111880e1b8b361319cabff38ee0e9 
>   core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala 
> 2f5797064d4131ecfc9d2750d9345a9fa3972a9a 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala 
> 6a6bc7bc4ceb648b67332e789c2c33de88e4cd86 
>   core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala 
> df60cbc35d09937b4e9c737c67229889c69d8698 
>   core/src/main/scala/kafka/coordinator/DelayedRebalance.scala 
> 8defa2e41c92f1ebe255177679d275c70dae5b3e 
>   core/src/main/scala/kafka/coordinator/Group.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/GroupRegistry.scala 
> 94ef5829b3a616c90018af1db7627bfe42e259e5 
>   core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala 
> 821e26e97eaa97b5f4520474fff0fedbf406c82a 
>   core/src/main/scala/kafka/coordinator/PartitionAssignor.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedOperationKey.scala 
> b673e43b0ba401b2e22f27aef550e3ab0ef4323c 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> b4004aa3a1456d337199aa1245fb0ae61f6add46 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> c63f4ba9d622817ea8636d4e6135fba917ce085a 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 18680ce100f10035175cc0263ba7787ab0f6a17a 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/coordinator/GroupTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala 
> PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/33088/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Onur Karaman
> 
>

Reply via email to