> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 120-130
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line120>
> >
> >     We may not be able to remove the readlock here. The issue is that this 
> > method accesses not only leaderReplicaIdOpt, but other internal data 
> > structures like assignedReplicaMap. Without the lock, the read from the Map 
> > could fail even it's being concurrently modified.
> >     
> >     In general, we can get away with the lock only if we want to read a 
> > single internal value. Perhaps we can introduce another function 
> > isLeaderLocal() that returns a boolean. This method will only need to 
> > access leaderReplicaIdOpt. Then all callers will first call 
> > leaderReplicaIfLocal and hold onto the leader replica. They can then use 
> > isLeaderLocal to see if the leader has changed subsequently.

Would this be the same as what we did now? In getReplica(localBrokerId), if the 
replica map has changed and the id is no longer in the map, it will return 
None; if the replica map has changed and the id is no longer the leader, it is 
just the same as when we callled leaderReplicaIfLocal() to get the leader, and 
then immediately the leader changed?


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 268-269
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line268>
> >
> >     Instead of using negation, could we do 
> > leaderHW.precedes(replica.logEndOffset)?
> >     
> >     Also, could we move && to the previous line?

We can not simply leaderHW.precedes(replica.logEndOffset) since we need to 
consider ">=", will use messagesDiff instead.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Replica.scala, lines 33-36
> > <https://reviews.apache.org/r/23767/diff/2/?file=637562#file637562line33>
> >
> >     Should we rename highWatermarkValue and logEndOffsetValue to 
> > highWatermarkMetadata and logEndOffsetMetadata?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 372-373
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line372>
> >
> >     Move && to previous line?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 324-325
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line324>
> >
> >     Perhaps we could create TopicPartionRequestKey just once?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 296-297
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line296>
> >
> >     This seems to be an existing problem. If ack=-1, a safer check is HW >= 
> > requiredOffset. This way, we will be sure that if ISR expands, the acked 
> > message is guaranteed to be in the replicas newly added to ISR.
> >     
> >     The following is an example that shows the issue with the existing 
> > check. Suppose that all replicas in ISR are at offset 10, but HW is still 
> > at 8 and we call checkEnoughReplicasReachOffset on offset 9.  The check 
> > will be satisfied and the message is considered committed. We will be 
> > updating HW to 10 pretty soon. However, before that happens, another 
> > replica whose LEO is at 8 can be added to ISR. This replica won't have 
> > message 9, which is acked as committed.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 194-195
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line194>
> >
> >     It's possible that we get an UnknownOffsetMetadata during the 
> > conversion. In this case, we probably should set HW to logEndOffset.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/Log.scala, lines 173-174
> > <https://reviews.apache.org/r/23767/diff/2/?file=637565#file637565line173>
> >
> >     It would be a bit confusing to reason about the consistency btw 
> > nextOffset and nextOffsetMetadata since they are not updated atomically. 
> > Could we just keep nextOffsetMetadata?

Good point. Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/Log.scala, lines 429-431
> > <https://reviews.apache.org/r/23767/diff/2/?file=637565#file637565line429>
> >
> >     Perhaps we can add a bit details to the comment. So, we are in the 
> > situation that the startOffset is in range, but we can't find a single 
> > message whose offset is >= startOffset. One possibility seems to be that 
> > all messages after startOffset have been deleted due to compaction. Is that 
> > the only case? Let's describe all situations when this can happen.

Great catch. I was originally thinking this can only happen when the regular 
consumer's fetch request is issued with a max offset set to the HW, and all 
messages beyond start offset is also beyond max offset. But later I realized in 
this case LogSegment.read() will not actually return null. So the only case 
will be compaction, and in this case we cannot return unknown offset metadata, 
but instead we can return log end offset metadata.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedFetch.scala, lines 27-38
> > <https://reviews.apache.org/r/23767/diff/2/?file=637569#file637569line27>
> >
> >     In DelayedProduce, we don't send the response immediately if one 
> > partition has an error. Should we do the same thing for DelayedFetch? Will 
> > that make the logic simpler?

Sure we can, and it will not simply the logic though. I did this to follow the 
old we have before which is when a partition is no longer hosting the leader or 
not known already we return the fetch request immediately with whatever is 
available. Let me know if you have a strong preference.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedFetch.scala, lines 61-63
> > <https://reviews.apache.org/r/23767/diff/2/?file=637569#file637569line61>
> >
> >     Will this case ever happen? If so, could we add a comment how this can 
> > happen?

Comment added.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/FetchRequestPurgatory.scala, lines 41-42
> > <https://reviews.apache.org/r/23767/diff/2/?file=637572#file637572line41>
> >
> >     This can be private.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, lines 448-449
> > <https://reviews.apache.org/r/23767/diff/2/?file=637573#file637573line448>
> >
> >     If ack is >1, it won't be -1. So "but no = -1" is redundant.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/LogOffsetMetadata.scala, lines 25-26
> > <https://reviews.apache.org/r/23767/diff/2/?file=637574#file637574line25>
> >
> >     Would it be better to name it offsetOrdering?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/LogOffsetMetadata.scala, lines 36-37
> > <https://reviews.apache.org/r/23767/diff/2/?file=637574#file637574line36>
> >
> >     The ordering doesn't match that in the signature.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/LogOffsetMetadata.scala, lines 41-42
> > <https://reviews.apache.org/r/23767/diff/2/?file=637574#file637574line41>
> >
> >     Should we just use one constructor with defaults?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedFetch.scala, lines 40-44
> > <https://reviews.apache.org/r/23767/diff/2/?file=637569#file637569line40>
> >
> >     All those should probably be private.

I cannot since other wise "illegal combination of modifiers: private and 
override for: value ... ".


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/LogOffsetMetadata.scala, lines 60-61
> > <https://reviews.apache.org/r/23767/diff/2/?file=637574#file637574line60>
> >
> >     Do we need the space after !?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala, lines 44-45
> > <https://reviews.apache.org/r/23767/diff/2/?file=637576#file637576line44>
> >
> >     This can be private.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, lines 262-263
> > <https://reviews.apache.org/r/23767/diff/2/?file=637578#file637578line262>
> >
> >     TopicAndPartition -> (PartitionData, OffsetMetadta)
> >     
> >     It probably will be clearer if we use a case Class 
> > PartitionDataAndOffsetMetadata, instead of using a pair.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, lines 298-299
> > <https://reviews.apache.org/r/23767/diff/2/?file=637578#file637578line298>
> >
> >     Perhaps it will be clearer if we return a 
> > FetchResponseAndOffsetMetadata instead of a pair?

As above.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, lines 592-593
> > <https://reviews.apache.org/r/23767/diff/2/?file=637578#file637578line592>
> >
> >     Would updateReplicaLEOAndHW be enough?

It should really be updateReplicaLEOAndPartitionHW.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, lines 603-604
> > <https://reviews.apache.org/r/23767/diff/2/?file=637578#file637578line603>
> >
> >     Do we need to log (topic, partition) twice?

Ack.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/RequestKey.scala, lines 25-26
> > <https://reviews.apache.org/r/23767/diff/2/?file=637579#file637579line25>
> >
> >     Should we rename this to DelayedRequestKey?

Ack.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 152-153
> > <https://reviews.apache.org/r/23767/diff/2/?file=637580#file637580line152>
> >
> >     key doesn't seem to be used.

Ack.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 48-49
> > <https://reviews.apache.org/r/23767/diff/2/?file=637580#file637580line48>
> >
> >     These comments may need to be changed according to the comments below.

Ack.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 93-94
> > <https://reviews.apache.org/r/23767/diff/2/?file=637580#file637580line93>
> >
> >     I actually think it's more intuitive to return true if the request is 
> > satisfied by the caller. Then, we can assign a meaningful return val in the 
> > caller.
> >     
> >     val isSatisfiedByMe = 
> > producerRequestPurgatory.checkAndMaybeWatch(request)
> >     
> >     Also, We need to explain this method better. How about the following 
> > comment?
> >     
> >     Potentially add the request for watch on all keys. Return true iff the 
> > request is satisfied and the satisfaction is done by the caller.

Ack.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 155-156
> > <https://reviews.apache.org/r/23767/diff/2/?file=637580#file637580line155>
> >
> >     Could we name this checkAndMaybeAdd and add the comment?

Ack.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala, lines 88-89
> > <https://reviews.apache.org/r/23767/diff/2/?file=637593#file637593line88>
> >
> >     Can this just be created as Map(a -> b)?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 287-288
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line287>
> >
> >     Not sure if we need to copy since inSyncReplicas is immutable. We 
> > probably just need to do a reference assignment.

Ah you are right.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/#review48283
-----------------------------------------------------------


On July 21, 2014, 7:53 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23767/
> -----------------------------------------------------------
> 
> (Updated July 21, 2014, 7:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Rebased on KAFKA-1462: 1. LogSegment.read() will also return fetch info, even 
> if the corresponding message set is empty; 2. Purgatory checking satisfactory 
> in checkAndMaybeWatch synchronously, and will only return false if this 
> thread successfully set the satisfactory bit to true; 3. Remove the read lock 
> on Partition's reading of the leaderOpt and epoch and making them volatile 
> instead since these two functions are just single read; 4. Fix some minor 
> issues in TestEndToEndLatency; 5. Other minor fixes
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchResponse.scala 
> d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> f2ca8562f833f09d96ec4bd37efcacf69cd84b2e 
>   core/src/main/scala/kafka/cluster/Replica.scala 
> 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> 0e64632210385ef63c2ad3445b55ac4f37a63df2 
>   core/src/main/scala/kafka/log/FileMessageSet.scala 
> b2652ddbe2f857028d5980e29a008b2c614694a3 
>   core/src/main/scala/kafka/log/Log.scala 
> b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
>   core/src/main/scala/kafka/log/LogSegment.scala 
> 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
> 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
>   core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 
> 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 
> 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
>   core/src/test/scala/other/kafka/StressTestLog.scala 
> 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
> 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala 
> cec1caecc51507ae339ebf8f3b8a028b12a1a056 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
> d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
>   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
> 6b7603728ae5217565d68b92dd5349e7c6508f31 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 
> 6db245c956d2172cde916defdb0749081bf891fd 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
> 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
> 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
> 0ec120a4a953114e88c575dd6b583874371a09e3 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
> 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/23767/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to