----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/21588/#review43423 -----------------------------------------------------------
The patch looks promising. The places that we make a call to unblock producer/consumer need some adjustment. The following is the summary. 1.To unblock produce requests: 1.1 ack=-1: when HW moves (not completely handled in the patch) 1.2 ack>1: when a follower fetch request is received (not handled in the patch) 2. To unblock regular consumer requests: 2.1 when HW actually moves (not completely handled in the patch) 3. To unblock follower consumer requests: 3.1 when log append completes in leader replica (handled in the patch) To handle 1.1 and 2.1: We can probably register an onLeaderAndHWChangeCallback in Partition. The callback will be invoked in Partition.maybeIncrementLeaderHW() when HW actually moves. The callback will try to unblock both produce and consumer requests. Also, when a replica transitions from the leader to the follower, we should also call onLeaderAndHWChangeCallback too. To handle 1.2: We need to call unblockProducer() in KafkaApis.maybeUpdatePartitionHw(). core/src/main/scala/kafka/cluster/Partition.scala <https://reviews.apache.org/r/21588/#comment77610> It seems that we can maintain both segmentBaseOffset and position incrementally. We can pass both info through ReplicaManager.recordFollowerPosition() while updating the logEndOffset in the follower replicas. Since we always update/read offset/position/baseOffset together, we probably can package them better. I was thinking of creating the following case class. In each Replica (including the leader replica), we maintain a volatile var logEndOffset of OffsetMetadata. Everytime logEndOffset changes, we create a new instance of OffsetMetadata and update logEndOffset. This makes sure that any reader will always read the three values in OffsetMetadata corresponding to a same offset. The implementation in the patch has the issue that a reader may see those values corresponding to different offsets since there are maintained separately. case class OffsetMetadata{ val offset val segmentBaseOffset val relativePositionInSegment } In the leader replica (perhaps it's better in Partition?), we also maintain a volatile var highWatermark of OffsetMetadata. core/src/main/scala/kafka/log/FileMessageSet.scala <https://reviews.apache.org/r/21588/#comment77598> Log.read() could return an empty ByteBufferMessageSet. In this case, we still need to know the initial fetch position and baseOffset. So, instead of tracking position and baseOffset in FileMessageSet, we could probably return a FetchDataInfo which includes a MessageSet and position and baseOffset. core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/21588/#comment77574> We probably can just simplify the comment to "Add the produce request for watch if it's not satisfied". core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/21588/#comment77609> Perhaps we can optimize this a bit: only call unblock if HW is actually updated. We can return whether HW is updated in recordFollowerPosition(). core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/21588/#comment77600> typo satisified core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/21588/#comment77546> either "the rest of" or "remaining" core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/21588/#comment77554> Should that be decrement? core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/21588/#comment77555> Should that be decrement? core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/21588/#comment77612> The purgeSatisfied() here is redundant since it's already done in purgeSatisfied(). core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/21588/#comment77559> - Jun Rao On May 16, 2014, 10:55 p.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/21588/ > ----------------------------------------------------------- > > (Updated May 16, 2014, 10:55 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1430 > https://issues.apache.org/jira/browse/KAFKA-1430 > > > Repository: kafka > > > Description > ------- > > 1. change the watch() API to checkAndMaybeWatch(). In that function, > purgatory will try to add the delayed request to each keyed watchers list. > > 1.a. When the watcher is trying to add the delayed request, it first check if > it is already satisified, and only add the request if it is not satisfied yet. > > 1.b. If one of the watchers failed to add the request since it is already > satisfied, checkAndMaybeWatch() returns immediately. > > 1.c. There may be some requests that are watched in some other lists while it > is already satisfied and removed from the lists it is checked by the key. To > fix this, add the logic of also purging the watcher list when the expire > reaper purges the delayed queue. Hence now the logic becomes "when the > purgatory has too many requests, purge both the queue and the watcher lists". > > 1.d. The purgatory size gauge now is the watcher lists' size plus the delayed > queue size. > > 2. Add the current log segment size into the append info. > > 3. Replica maintains the following additional information > > a) current log active segment's size. > b) current log active segment's base offset. > c) current HW's corresponding segment physical position. > d) current HW's corresponding segment's base offset. > > a) and b) will be updated on log.append(), c) and d) will be updated on > maybeIncrementLeaderHW() by searching in the log with the HW (I have thought > about optimizing this but cannot come up with a better approach.) > > 4. The delayed fetch request now maintains for each of its fetching > partitions: > > a) fetch segment's physical position. > b) fetch segment's base offset. > > 5. Delayed fetch request's satisfaction criterion now is: > > Case A: This broker is no longer the leader for ANY of the partitions it > tries to fetch > Case B: The fetch offset locates not on the fetchable segment of the log > Case C: The accumulated bytes from all the fetchable segments exceeds the > minimum bytes > > For follower fetch request, the fetchable segment is the log active segment, > for consumer fetch request, the fetchable segment is the HW's corresponding > segment. > > Checking of Case B/C uses the segment's physical position and base offset > mentioned above. > > 6. After HW update, both delayed produce and fetch requests are checked for > satisfaction; after log.append, fetch requests are checked for satisfaction. > > For these cases: > > a) broker make leader > b) leader shrink ISR > > HW may also be moved but we do not check for delayed fetch requests (Is this > expected?) > > 7. When checkAndMaybeWatch returns false for delayed produce/fetch, respond > immediately. > > 8. Some other comment/logging minor changes. > > > Diffs > ----- > > core/src/main/scala/kafka/cluster/Partition.scala > 518d2df5ae702d8c0937e1f9603fd11a54e24be8 > core/src/main/scala/kafka/cluster/Replica.scala > 5e659b4a5c0256431aecc200a6b914472da9ecf3 > core/src/main/scala/kafka/log/FileMessageSet.scala > b2652ddbe2f857028d5980e29a008b2c614694a3 > core/src/main/scala/kafka/log/Log.scala > b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 > core/src/main/scala/kafka/log/LogCleaner.scala > 2faa196a4dc612bc634d5ff5f5f275d09073f13b > core/src/main/scala/kafka/log/LogSegment.scala > 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 > core/src/main/scala/kafka/server/AbstractFetcherThread.scala > 3b15254f32252cf824d7a292889ac7662d73ada1 > core/src/main/scala/kafka/server/KafkaApis.scala > 0b668f230c8556fdf08654ce522a11847d0bf39b > core/src/main/scala/kafka/server/ReplicaManager.scala > 6a56a772c134dbf1e70c1bfe067223009bfdbac8 > core/src/main/scala/kafka/server/RequestPurgatory.scala > c064c5c4cf1191335572da8a2caf5f95dce902c1 > core/src/test/scala/unit/kafka/log/LogTest.scala > 1da1393983d4b20330e7c7f374424edd1b26f2a3 > core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala > 4f61f8469df99e02d6ce7aad897d10e158cca8fd > > Diff: https://reviews.apache.org/r/21588/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >
