> On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, line 75 > > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line75> > > > > It seems that (a) implies (b).
Not really - within a consumer instance we need each topic to have the same number of streams. However, the number of streams can be different on another consumer instance. The intent of the constraint is that any partition can be assigned to any of the available thread-id's on a given consumer instance. > On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, line 83 > > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line83> > > > > Are those fields needed in the contructor? The same set of fields are > > already passed into AssignmentContext. Can be removed. > On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, lines 104-120 > > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line104> > > > > One of the implications of this is that the distribution is going to be > > sensitive to # topics. In the case when #topics == #consumerThreads, > > partitions from the same topic will be assigned to the same consumerThread. > > In general, if #topics is a multiple of #consumerThreads, a similar issue > > can happen. > > > > So, I am wondering if this is better than just sorting the consumer ids > > in some random order like hashcode. For debugging purpose, we can log the > > consumer ids in that order. As discussed, we could but I'm not sure if we should. I tried it earlier for other reasons so I have that code as well. It makes it much harder to predict the final assigment though. That said most operations folks want to have a uniform distribution so I can go back to using that sort version. > On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, lines 105-106 > > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line105> > > > > Since RoundRobin assigns partitions across topics, should we just log > > all partitions and all consumers once in the right order? We should probably avoid logging everything in one line (although I agree that logging the sort order would have been useful). There could be tens of thousands of partitions so it would be an extremely long line. So I would prefer to stick with this (line per-topic). > On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, line 154 > > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line154> > > > > Could we do partitionsForTopic(topic) here too? Yes. > On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote: > > core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala, lines > > 85-86 > > <https://reviews.apache.org/r/23655/diff/4/?file=665829#file665829line85> > > > > Do we need a while loop here? Can this just be 2 + > > random.nextInt(consumerCount - 1)? I will remove the consistency assertion across consumers. > On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote: > > core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala, lines > > 260-263 > > <https://reviews.apache.org/r/23655/diff/4/?file=665829#file665829line260> > > > > Shouldn't we assert count==1? Do we need to also make sure every > > original partition is in assignment? Yes. Also, the check for every partition being in the assignment is above this (givenPartitions == assignedPartitions) > On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote: > > core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala, line > > 289 > > <https://reviews.apache.org/r/23655/diff/4/?file=665829#file665829line289> > > > > Should we rename this to partitionsPerStream? Sure - will rename to partitionCountPerStream > On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, line 88 > > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line88> > > > > Instead of passing in forConsumers, would it be simpler to just return > > the assignment for each consumer in a map? The unit test can also be made > > simpler since we just need to test the coverage and uniformity. We could, but it seems a bit unwieldy to do that - i.e., having to maintain yet another map of map... and in the consumer connector we only want the assignment for that consumer id. Also, I ended up removing the "conflict" test from the unit test so it is simpler. > On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/consumer/PartitionAssignor.scala, line 53 > > <https://reviews.apache.org/r/23655/diff/4/?file=665824#file665824line53> > > > > To avoid duplicating the ZK read, we can probably change all the vals > > in the constructor to def. Then, we can intialize some private vals during > > initialization. Actually, I reverted to the old form because we ended up not needing the current form in the unit test. > On Aug. 22, 2014, 4:46 p.m., Jun Rao wrote: > > core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala, lines > > 88-124 > > <https://reviews.apache.org/r/23655/diff/4/?file=665829#file665829line88> > > > > This test is pretty complicated. I am wondering if we need to test the > > case that c1 and cx give consistent assignment. If the inputs to the > > assignor are the same and the assignor is deterministic, the assignment is > > always going to be consistent. We probably just need to test completeness > > and uniqueness. See above. - Joel ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23655/#review51182 ----------------------------------------------------------- On Aug. 21, 2014, 1:10 a.m., Joel Koshy wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/23655/ > ----------------------------------------------------------- > > (Updated Aug. 21, 2014, 1:10 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-687 > https://issues.apache.org/jira/browse/KAFKA-687 > > > Repository: kafka > > > Description > ------- > > v4 > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ConsumerConfig.scala > 1cf2f62ba02e4aa66bfa7575865e5d57baf82212 > core/src/main/scala/kafka/consumer/PartitionAssignor.scala PRE-CREATION > core/src/main/scala/kafka/consumer/TopicCount.scala > 8b0ae5785e08272d0ea12483beae597f2eac4343 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > acfd064bdba2b031f8869011da79649efd80946f > core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala > 00df4621fd724826a1e79d849c762ac1c4f49868 > core/src/main/scala/kafka/utils/ZkUtils.scala > dcdc1ce2b02c996294e19cf480736106aaf29511 > core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/23655/diff/ > > > Testing > ------- > > * I did the unit tests (including the new one) as well as mirror maker system > test suite with roundrobin. While this is being reviewed I will run the > system tests with symmetric > > > Thanks, > > Joel Koshy > >