On Thu, Aug 14, 2014 at 10:39:15AM -0700, Chen Wang wrote:
> Suppose I have a topic with 5 partitions, and partition 0, 1 has lag of 0,
> while the other 3 all have lags. In this case, should I best start 3
> threads, or 5 threads to read from this topic again to achieve best
> performance?
> I am currently using 3 threads in this case, but it seems that each thread
> still first try to get hold of partition0, 1 first.(which seems unnecessary
> in my case)

The default partition assignment is described here:
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-CanIpredicttheresultsoftheconsumerrebalance?

I'm not sure what you mean by each thread tries to get hold of
partition 0, 1 first - by threads you mean stream count on a single
consumer instance right? Since these threads are in the same consumer
group, there should be a uniform spread across the streams of that
instance. i.e., if there are five partitions and five threads on one
consumer instance that would give you the most parallelism (provided
you have enough CPU cores to support that).


Also, wrt your question below:
> Are these  two  methods fundamentally the same, or the later one is
> preferred?

You mean 'n' consumer instances (under the same group) with one stream
each vs one consumer instance with 'n' streams. If so, the latter is
preferred and easier to work with. You would use the former approach
when the consumer instances are on different machines.

Joel

> 
> Another question is that I am currently using a signal thread to spawn
> different thread to read from kafka. So if a topic  has 5 partitions, 5
> signals will be sent, and 5 different threads will start polling the topic
> in the following manner:
> 
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> 
> topicCountMap.put(kafkaTopic, new* Integer(1))*;
> 
>  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
> 
>   .createMessageStreams(topicCountMap);
> 
>  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaTopic);
> 
> KafkaStream<byte[], byte[]> stream = streams.*get(0);*
> 
>  ConsumerIterator<byte[], byte[]> it = stream.iterator();
> 
> As you can see, I specify *Integer(1) to ensure there is only one stream in
> the polling thread.*
> 
> But in my testing, I am using:
> 
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> 
>  topicCountMap.put(topic, *new Integer(numOfPartitions));*
> 
>  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
> 
>   .createMessageStreams(topicCountMap);
> 
>  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
> 
>  executor = Executors.newFixedThreadPool(*numOfPartitions*);
> 
>  for (final KafkaStream stream : streams) {
> 
>  executor.submit(new ConsumerTest(stream, threadNumber,this.targetTopic));
> 
>  threadNumber++;
> 
>  }
> 
> Are these  two  methods fundamentally the same, or the later one is
> preferred?
> 
> Thanks much!
> 
> Chen

Reply via email to