Hello, I have a topic with 4 partitions and I'm using the assign() method of KafkaConsumer for receiving from a specified partition. For testing, I tried to specify a not existing partition (number 5) but I had a strange trace and behavior instead of an error/exception related to not existing partition. After the assing() call I have the following ... ... 2016-04-21 15:31:26,355 [ Thread-5] KafkaConsumer DEBUG Subscribed to partition(s): my_topic-5 ... and then a lot of retries to get partition information offset ...
2016-04-21 15:31:26,482 [ Thread-5] ConsumerCoordinator DEBUG No committed offset for partition my_topic-5 2016-04-21 15:31:26,482 [ Thread-5] Fetcher DEBUG Resetting offset for partition my_topic-5 to latest offset. 2016-04-21 15:31:26,483 [ Thread-5] Fetcher DEBUG Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh 2016-04-21 15:31:26,571 [ Thread-5] NetworkClient DEBUG Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=4,client_id=consumer-1}, body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486571, sendTimeMs=0) to node 0 2016-04-21 15:31:26,573 [ Thread-5] Metadata DEBUG Updated cluster metadata version 3 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]]) 2016-04-21 15:31:26,573 [ Thread-5] Fetcher DEBUG Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh 2016-04-21 15:31:26,672 [ Thread-5] NetworkClient DEBUG Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=5,client_id=consumer-1}, body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486672, sendTimeMs=0) to node 0 2016-04-21 15:31:26,674 [ Thread-5] Metadata DEBUG Updated cluster metadata version 4 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]]) 2016-04-21 15:31:26,674 [ Thread-5] Fetcher DEBUG Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh 2016-04-21 15:31:26,773 [ Thread-5] NetworkClient DEBUG Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=6,client_id=consumer-1}, body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486773, sendTimeMs=0) to node 0 2016-04-21 15:31:26,775 [ Thread-5] Metadata DEBUG Updated cluster metadata version 5 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]]) 2016-04-21 15:31:26,775 [ Thread-5] Fetcher DEBUG Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh 2016-04-21 15:31:26,874 [ Thread-5] NetworkClient DEBUG Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=7,client_id=consumer-1}, body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486874, sendTimeMs=0) to node 0 2016-04-21 15:31:26,876 [ Thread-5] Metadata DEBUG Updated cluster metadata version 6 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]]) Why don't an error is returned for a not existing partition ? Do I have to check by myself calling partitionsFor() method before trying the assign() ? Thanks, Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor Twitter : @ppatierno Linkedin : paolopatierno Blog : DevExperience