On Thu, Aug 14, 2014 at 10:39:15AM -0700, Chen Wang wrote: > Suppose I have a topic with 5 partitions, and partition 0, 1 has lag of 0, > while the other 3 all have lags. In this case, should I best start 3 > threads, or 5 threads to read from this topic again to achieve best > performance? > I am currently using 3 threads in this case, but it seems that each thread > still first try to get hold of partition0, 1 first.(which seems unnecessary > in my case)
The default partition assignment is described here: https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-CanIpredicttheresultsoftheconsumerrebalance? I'm not sure what you mean by each thread tries to get hold of partition 0, 1 first - by threads you mean stream count on a single consumer instance right? Since these threads are in the same consumer group, there should be a uniform spread across the streams of that instance. i.e., if there are five partitions and five threads on one consumer instance that would give you the most parallelism (provided you have enough CPU cores to support that). Also, wrt your question below: > Are these two methods fundamentally the same, or the later one is > preferred? You mean 'n' consumer instances (under the same group) with one stream each vs one consumer instance with 'n' streams. If so, the latter is preferred and easier to work with. You would use the former approach when the consumer instances are on different machines. Joel > > Another question is that I am currently using a signal thread to spawn > different thread to read from kafka. So if a topic has 5 partitions, 5 > signals will be sent, and 5 different threads will start polling the topic > in the following manner: > > Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); > > topicCountMap.put(kafkaTopic, new* Integer(1))*; > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer > > .createMessageStreams(topicCountMap); > > List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaTopic); > > KafkaStream<byte[], byte[]> stream = streams.*get(0);* > > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > > As you can see, I specify *Integer(1) to ensure there is only one stream in > the polling thread.* > > But in my testing, I am using: > > Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); > > topicCountMap.put(topic, *new Integer(numOfPartitions));* > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer > > .createMessageStreams(topicCountMap); > > List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); > > executor = Executors.newFixedThreadPool(*numOfPartitions*); > > for (final KafkaStream stream : streams) { > > executor.submit(new ConsumerTest(stream, threadNumber,this.targetTopic)); > > threadNumber++; > > } > > Are these two methods fundamentally the same, or the later one is > preferred? > > Thanks much! > > Chen