> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 261 > > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line261> > > > > Function name "scheduleHeartbeatExpiration" is a bit misleading, it is > > really "mark the current heartbeat as completed before expiration if there > > is any and schedule the next heartbeat". So how about spliting it into two > > functions, "finishCurrentHeartbeat" and "expectNextHeartbeat", and upon > > consumer creation we only need to call the latter one.
I'm assuming here that you want to the split to be something like this: ```java private def finishCurrentHeartbeat(group: Group, consumer: Consumer) { consumer.latestHeartbeat = SystemTime.milliseconds val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId) // TODO: can we fix DelayedOperationPurgatory to remove keys in watchersForKey with empty watchers list? heartbeatPurgatory.checkAndComplete(consumerKey) } private def scheduleNextHeartbeatExpiration(group: Group, consumer: Consumer) { val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId) val heartbeatDeadline = consumer.latestHeartbeat + consumer.sessionTimeoutMs val delayedHeartbeat = new DelayedHeartbeat(this, group, consumer, heartbeatDeadline, consumer.sessionTimeoutMs) heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(consumerKey)) } ``` I'd rather not do the split. It adds some confusion because now you have to think about which of the two you'd need to call in a given scenario, and in what order. Both the finishCurrentHeartbeat and the scheduleNextHeartbeatExpiration rely on latestHeartbeat being correctly updated, so I'd rather just put all the related logic in one place. - Onur ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33088/#review81743 ----------------------------------------------------------- On May 8, 2015, 5:55 p.m., Onur Karaman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33088/ > ----------------------------------------------------------- > > (Updated May 8, 2015, 5:55 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1334 > https://issues.apache.org/jira/browse/KAFKA-1334 > > > Repository: kafka > > > Description > ------- > > add heartbeat to coordinator > > todo: > - see how it performs under real load > - add error code handling on the consumer side > > > Diffs > ----- > > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > e55ab11df4db0b0084f841a74cbcf819caf780d5 > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java > 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala > 456b602245e111880e1b8b361319cabff38ee0e9 > core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala > 2f5797064d4131ecfc9d2750d9345a9fa3972a9a > core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala > PRE-CREATION > core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala > 6a6bc7bc4ceb648b67332e789c2c33de88e4cd86 > core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala > f5bd5dc802bc9fb8d175a0813308948e88c2a8b1 > core/src/main/scala/kafka/coordinator/DelayedRebalance.scala > 60fbdae164fda74eab859f4ff25b7beff7fce757 > core/src/main/scala/kafka/coordinator/Group.scala PRE-CREATION > core/src/main/scala/kafka/coordinator/GroupRegistry.scala > 94ef5829b3a616c90018af1db7627bfe42e259e5 > core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala > 821e26e97eaa97b5f4520474fff0fedbf406c82a > core/src/main/scala/kafka/coordinator/PartitionAssignor.scala PRE-CREATION > core/src/main/scala/kafka/server/DelayedOperationKey.scala > b673e43b0ba401b2e22f27aef550e3ab0ef4323c > core/src/main/scala/kafka/server/KafkaApis.scala > 417960dd1ab407ebebad8fdb0e97415db3e91a2f > core/src/main/scala/kafka/server/KafkaServer.scala > b7d2a2842e17411a823b93bdedc84657cbd62be1 > core/src/main/scala/kafka/server/OffsetManager.scala > 18680ce100f10035175cc0263ba7787ab0f6a17a > core/src/test/scala/integration/kafka/api/ConsumerTest.scala > ffbdf5dc106e2a59563768280074696c76491337 > core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala > 2bbd4c96f8c70f22d12e593941e2c26c39352900 > core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/coordinator/GroupTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/33088/diff/ > > > Testing > ------- > > > Thanks, > > Onur Karaman > >