> On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, line 75
> > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line75>
> >
> >     It seems that (a) implies (b).

Not really - within a consumer instance we need each topic to have the same 
number of streams. However, the number of streams can be different on another 
consumer instance. The intent of the constraint is that any partition can be 
assigned to any of the available thread-id's on a given consumer instance.


> On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, line 83
> > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line83>
> >
> >     Are those fields needed in the contructor? The same set of fields are 
> > already passed into AssignmentContext.

Can be removed.


> On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, lines 104-120
> > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line104>
> >
> >     One of the implications of this is that the distribution is going to be 
> > sensitive to # topics. In the case when #topics == #consumerThreads, 
> > partitions from the same topic will be assigned to the same consumerThread. 
> > In general, if #topics is a multiple of #consumerThreads, a similar issue 
> > can happen.
> >     
> >     So, I am wondering if this is better than just sorting the consumer ids 
> > in some random order like hashcode. For debugging purpose, we can log the 
> > consumer ids in that order.

As discussed, we could but I'm not sure if we should. I tried it earlier for 
other reasons so I have that code as well. It makes it much harder to predict 
the final assigment though.

That said most operations folks want to have a uniform distribution so I can go 
back to using that sort version.


> On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, lines 105-106
> > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line105>
> >
> >     Since RoundRobin assigns partitions across topics, should we just log 
> > all partitions and all consumers once in the right order?

We should probably avoid logging everything in one line (although I agree that 
logging the sort order would have been useful). There could be tens of 
thousands of partitions so it would be an extremely long line. So I would 
prefer to stick with this (line per-topic).


> On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, line 154
> > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line154>
> >
> >     Could we do partitionsForTopic(topic) here too?

Yes.


> On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala, lines 
> > 85-86
> > <https://reviews.apache.org/r/23655/diff/4/?file=665829#file665829line85>
> >
> >     Do we need a while loop here? Can this just be 2 + 
> > random.nextInt(consumerCount - 1)?

I will remove the consistency assertion across consumers.


> On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala, lines 
> > 260-263
> > <https://reviews.apache.org/r/23655/diff/4/?file=665829#file665829line260>
> >
> >     Shouldn't we assert count==1? Do we need to also make sure every 
> > original partition is in assignment?

Yes.

Also, the check for every partition being in the assignment is above this 
(givenPartitions == assignedPartitions)


> On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala, line 
> > 289
> > <https://reviews.apache.org/r/23655/diff/4/?file=665829#file665829line289>
> >
> >     Should we rename this to partitionsPerStream?

Sure - will rename to partitionCountPerStream


> On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, line 88
> > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line88>
> >
> >     Instead of passing in forConsumers, would it be simpler to just return 
> > the assignment for each consumer in a map? The unit test can also be made 
> > simpler since we just need to test the coverage and uniformity.

We could, but it seems a bit unwieldy to do that - i.e., having to maintain yet 
another map of map... and in the consumer connector we only want the assignment 
for that consumer id. Also, I ended up removing the "conflict" test from the 
unit test so it is simpler.


> On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, line 53
> > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line53>
> >
> >     To avoid duplicating the ZK read, we can probably change all the vals 
> > in the constructor to def. Then, we can intialize some private vals during 
> > initialization.

Actually, I reverted to the old form because we ended up not needing the 
current form in the unit test.


> On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala, lines 
> > 88-124
> > <https://reviews.apache.org/r/23655/diff/4/?file=665829#file665829line88>
> >
> >     This test is pretty complicated. I am wondering if we need to test the 
> > case that c1 and cx give consistent assignment. If the inputs to the 
> > assignor are the same and the assignor is deterministic, the assignment is 
> > always going to be consistent. We probably just need to test completeness 
> > and uniqueness.

See above.


- Joel


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23655/#review51182
-----------------------------------------------------------


On Aug. 21, 2014, 1:10 a.m., Joel Koshy wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23655/
> -----------------------------------------------------------
> 
> (Updated Aug. 21, 2014, 1:10 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-687
>     https://issues.apache.org/jira/browse/KAFKA-687
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> v4
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
> 1cf2f62ba02e4aa66bfa7575865e5d57baf82212 
>   core/src/main/scala/kafka/consumer/PartitionAssignor.scala PRE-CREATION 
>   core/src/main/scala/kafka/consumer/TopicCount.scala 
> 8b0ae5785e08272d0ea12483beae597f2eac4343 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> acfd064bdba2b031f8869011da79649efd80946f 
>   core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala 
> 00df4621fd724826a1e79d849c762ac1c4f49868 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> dcdc1ce2b02c996294e19cf480736106aaf29511 
>   core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala 
> PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/23655/diff/
> 
> 
> Testing
> -------
> 
> * I did the unit tests (including the new one) as well as mirror maker system 
> test suite with roundrobin. While this is being reviewed I will run the 
> system tests with symmetric
> 
> 
> Thanks,
> 
> Joel Koshy
> 
>

Reply via email to