> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java, lines > > 73-76 > > <https://reviews.apache.org/r/33088/diff/9/?file=941786#file941786line73> > > > > Maybe we can merge them into one error code, > > UNKNOWN_OR_INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY?
I have mixed feelings on this. The growing number of error codes bothers me, but it also seems like the distinct error codes will be helpful in consumer logs. This issue comes up again with roundrobin where a group is doing roundrobin and a new consumer joining the group subscribes to a different set of topics from the others in the group. It's not really INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY, at least not with the current naming and exception message. > On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 60-66 > > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line60> > > > > Is removing the comments intentional? Yeah it was intentional. The comments were almost exactly repeating the same information as the variable types. > On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 200 > > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line200> > > > > Some coding style suggestion: > > > > if { > > // comments > > ... > > } else { > > // comments > > ... > > } Will do. Somewhat of a tangent: It looks like we started using a checkstyle on kafka-clients. It would be nice if kafka came with eclipse and intellij style schemes for java and scala to reduce the style related comments and have the IDEs more proactively do the fixes for us. > On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines > > 374-382 > > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line374> > > > > It seems the transition from Rebalancing to Stable is trivial now. In > > this case shall we just remove the Rebalancing state, or it is actually > > meaningful to keep it? I prefer having each state representing one action for that group. If we reduce it down to something like Dead, Stable, and Rebalancing as I think you're suggesting, then the Rebalancing state would mean that we're either waiting for consumers to rejoin OR we're recomputing the partition assignments, but we don't know which. With Dead, Stable, PreparingRebalance, and Rebalancing, we reduce the amount of guessing for what the group is doing within a given state. > On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines > > 400-401 > > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line400> > > > > I think we need to add a broker config for maximum heartbeat failure > > here; right now it is just 1. Paraphrased offline discussion I had with Guozhang: The only use case I can imagine for the maximum failures concept is where we already have consumers out in the wild across many different services and we've found that the default session timeout is too quick, so we end up getting a lot of group membership churn. It may be hard to get all the services to change their consumer configs, so we'd rolling bounce brokers with a higher maxAllowedExpiredHeartbeats. Pros of having the maximum failures concept: - It's a sort of hotfix means of giving the brokers some coarse-grained control over reducing group membership churn. Cons of having the maximum failures concept: - It's a sort of hotfix means of giving the brokers some coarse-grained control over reducing group membership churn. - Consumers don't know the broker's maxAllowedExpiredHeartbeats, so it seems odd from the consumer's perspective that they'd specify the session timeout for session expiration, but it may end up actually being some multiple of that value (and this entirely depends on which broker they happen to communicate with). - If we accept session timouts in the range [low, high], then really the consumer failures would happen in a time range of [low * maxAllowedExpiredHeartbeats, high * maxAllowedExpiredHeartbeats]. This gap can end up uncomfortably large. Guozhang and I agreed to remove this concept for now. If we see later on that we need to make the coordinator more conservative in marking consumers as dead, we can add similar logic in later. P.S.: If we want to eventually use this maximum failures concept, there's a bug in onConsumerHeartbeatExpired. If we have NOT exceeded the maxAllowedExpiredHeartbeats, then we should schedule another DelayedHeartbeat and not update consumer.latestHeartbeat. > On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines > > 451-453 > > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line451> > > > > Is it ever possible that the group is not Stable here? Since we only > > try to complete joinGroup once after we finish the rebalance, if it is ever > > possible to return false this operation will stay forever in the purgatory > > right? tryComplete gets called: - when a group is Stable just after it finishes rebalance (joinGroupPurgatory.checkAndComplete) - when a group is PreparingRebalance and we add a DelayedJoinGroup to the purgatory (joinGroupPurgatory.tryCompleteElseWatch) The check is there for the 2nd case. > On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 504 > > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line504> > > > > Add a TRACE level entry here? I don't see the point. A log entry here just says the DelayedHeartbeat somehow finished (either gracefully completed or expired). - Onur ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33088/#review81743 ----------------------------------------------------------- On May 5, 2015, 5:50 p.m., Onur Karaman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33088/ > ----------------------------------------------------------- > > (Updated May 5, 2015, 5:50 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1334 > https://issues.apache.org/jira/browse/KAFKA-1334 > > > Repository: kafka > > > Description > ------- > > add heartbeat to coordinator > > todo: > - see how it performs under real load > - add error code handling on the consumer side > > > Diffs > ----- > > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > e55ab11df4db0b0084f841a74cbcf819caf780d5 > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java > 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala > 456b602245e111880e1b8b361319cabff38ee0e9 > core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala > 2f5797064d4131ecfc9d2750d9345a9fa3972a9a > core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala > PRE-CREATION > core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala > 6a6bc7bc4ceb648b67332e789c2c33de88e4cd86 > core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala > df60cbc35d09937b4e9c737c67229889c69d8698 > core/src/main/scala/kafka/coordinator/DelayedRebalance.scala > 8defa2e41c92f1ebe255177679d275c70dae5b3e > core/src/main/scala/kafka/coordinator/Group.scala PRE-CREATION > core/src/main/scala/kafka/coordinator/GroupRegistry.scala > 94ef5829b3a616c90018af1db7627bfe42e259e5 > core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala > 821e26e97eaa97b5f4520474fff0fedbf406c82a > core/src/main/scala/kafka/coordinator/PartitionAssignor.scala PRE-CREATION > core/src/main/scala/kafka/server/DelayedOperationKey.scala > b673e43b0ba401b2e22f27aef550e3ab0ef4323c > core/src/main/scala/kafka/server/KafkaApis.scala > b4004aa3a1456d337199aa1245fb0ae61f6add46 > core/src/main/scala/kafka/server/KafkaServer.scala > c63f4ba9d622817ea8636d4e6135fba917ce085a > core/src/main/scala/kafka/server/OffsetManager.scala > 18680ce100f10035175cc0263ba7787ab0f6a17a > core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/coordinator/GroupTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/33088/diff/ > > > Testing > ------- > > > Thanks, > > Onur Karaman > >