Hello,

I have a topic with 4 partitions and I'm using the assign() method of 
KafkaConsumer for receiving from a specified partition.
For testing, I tried to specify a not existing partition (number 5) but I had a 
strange trace and behavior instead of an error/exception related to not 
existing partition.
After the assing() call I have the following ...
...
2016-04-21 15:31:26,355 [                      Thread-5] KafkaConsumer          
        DEBUG Subscribed to partition(s): my_topic-5
...
and then a lot of retries to get partition information offset ...

2016-04-21 15:31:26,482 [                      Thread-5] ConsumerCoordinator    
        DEBUG No committed offset for partition my_topic-5
2016-04-21 15:31:26,482 [                      Thread-5] Fetcher                
        DEBUG Resetting offset for partition my_topic-5 to latest offset.
2016-04-21 15:31:26,483 [                      Thread-5] Fetcher                
        DEBUG Partition my_topic-5 is unknown for fetching offset, wait for 
metadata refresh
2016-04-21 15:31:26,571 [                      Thread-5] NetworkClient          
        DEBUG Sending metadata request ClientRequest(expectResponse=true, 
callback=null, 
request=RequestSend(header={api_key=3,api_version=0,correlation_id=4,client_id=consumer-1},
 body={topics=[my_topic]}), isInitiatedByNetworkClient, 
createdTimeMs=1461245486571, sendTimeMs=0) to node 0
2016-04-21 15:31:26,573 [                      Thread-5] Metadata               
        DEBUG Updated cluster metadata version 3 to Cluster(nodes = [Node(0, 
localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, 
partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = 
my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], 
Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = 
[0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], 
isr = [0,]])
2016-04-21 15:31:26,573 [                      Thread-5] Fetcher                
        DEBUG Partition my_topic-5 is unknown for fetching offset, wait for 
metadata refresh
2016-04-21 15:31:26,672 [                      Thread-5] NetworkClient          
        DEBUG Sending metadata request ClientRequest(expectResponse=true, 
callback=null, 
request=RequestSend(header={api_key=3,api_version=0,correlation_id=5,client_id=consumer-1},
 body={topics=[my_topic]}), isInitiatedByNetworkClient, 
createdTimeMs=1461245486672, sendTimeMs=0) to node 0
2016-04-21 15:31:26,674 [                      Thread-5] Metadata               
        DEBUG Updated cluster metadata version 4 to Cluster(nodes = [Node(0, 
localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, 
partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = 
my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], 
Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = 
[0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], 
isr = [0,]])
2016-04-21 15:31:26,674 [                      Thread-5] Fetcher                
        DEBUG Partition my_topic-5 is unknown for fetching offset, wait for 
metadata refresh
2016-04-21 15:31:26,773 [                      Thread-5] NetworkClient          
        DEBUG Sending metadata request ClientRequest(expectResponse=true, 
callback=null, 
request=RequestSend(header={api_key=3,api_version=0,correlation_id=6,client_id=consumer-1},
 body={topics=[my_topic]}), isInitiatedByNetworkClient, 
createdTimeMs=1461245486773, sendTimeMs=0) to node 0
2016-04-21 15:31:26,775 [                      Thread-5] Metadata               
        DEBUG Updated cluster metadata version 5 to Cluster(nodes = [Node(0, 
localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, 
partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = 
my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], 
Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = 
[0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], 
isr = [0,]])
2016-04-21 15:31:26,775 [                      Thread-5] Fetcher                
        DEBUG Partition my_topic-5 is unknown for fetching offset, wait for 
metadata refresh
2016-04-21 15:31:26,874 [                      Thread-5] NetworkClient          
        DEBUG Sending metadata request ClientRequest(expectResponse=true, 
callback=null, 
request=RequestSend(header={api_key=3,api_version=0,correlation_id=7,client_id=consumer-1},
 body={topics=[my_topic]}), isInitiatedByNetworkClient, 
createdTimeMs=1461245486874, sendTimeMs=0) to node 0
2016-04-21 15:31:26,876 [                      Thread-5] Metadata               
        DEBUG Updated cluster metadata version 6 to Cluster(nodes = [Node(0, 
localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, 
partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = 
my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], 
Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = 
[0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], 
isr = [0,]])

Why don't an error is returned for a not existing partition ? 

Do I have to check by myself calling partitionsFor() method before trying the 
assign() ?

Thanks,

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor 
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience                                      

Reply via email to