PS I see this in one of the kafka log files Thanks
[2015-10-09 13:41:52,873] WARN [ReplicaFetcherThread-0-21441], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 58; ClientId: ReplicaFetcherThread-0-21441; ReplicaId: 26680; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [topic-debug-production,0] -> PartitionFetchInfo(0,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.server.ReplicaFetcherThread) [2015-10-09 13:41:52,873] INFO Reconnect due to socket error: java.nio.channels.ClosedChannelException (kafka.consumer.SimpleConsumer) [2015-10-09 13:42:22,905] WARN [ReplicaFetcherThread-0-21441], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 59; ClientId: ReplicaFetcherThread-0-21441; ReplicaId: 26680; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [topic-debug-production,0] -> PartitionFetchInfo(0,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.server.ReplicaFetcherThread) [2015-10-09 13:42:22,906] INFO Reconnect due to socket error: java.nio.channels.ClosedChannelException (kafka.consumer.SimpleConsumer) [2015-10-09 13:42:52,928] WARN [ReplicaFetcherThread-0-21441], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 60; ClientId: ReplicaFetcherThread-0-21441; ReplicaId: 26680; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [topic-debug-production,0] -> PartitionFetchInfo(0,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.server.ReplicaFetcherThread) [2015-10-09 13:42:52,929] INFO Reconnect due to socket error: java.nio.channels.ClosedChannelException (kafka.consumer.SimpleConsumer) [2015-10-09 13:43:22,937] WARN [ReplicaFetcherThread-0-21441], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 61; ClientId: ReplicaFetcherThread-0-21441; ReplicaId: 26680; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [topic-debug-production,0] -> PartitionFetchInfo(0,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.server.ReplicaFetcherThread) [2015-10-09 13:43:22,937] INFO Reconnect due to socket error: java.nio.channels.ClosedChannelException (kafka.consumer.SimpleConsumer) On Sat, Oct 10, 2015 at 1:24 AM, David Montgomery <davidmontgom...@gmail.com > wrote: > Is this a total failure on kafka or python kakfa-python? > > I am using ubuntu 14.04 on digital ocean and zk 3.4.6 > Using UFW all zk and kafka servers have access > I added my ip address from home to kafka and zk servers using UFW so there > should be no filewall issues > > > Below is my kafka server.properties from my chef template > > broker.id=<%=@broker_id%> > advertised.host.name=<%=@ipaddress%> > port=9092 > host.name=<%=@ipaddress%> > num.network.threads=2 > num.io.threads=2 > socket.send.buffer.bytes=1048576 > socket.receive.buffer.bytes=1048576 > socket.request.max.bytes=104857600 > log.dirs=/tmp/kafka-logs > num.partitions=2 > log.flush.interval.messages=10000 > log.flush.interval.ms=1000 > log.retention.hours=168 > log.segment.bytes=536870912 > log.cleanup.interval.mins=1 > zookeeper.connect=<%=@zookeeper%> > zookeeper.connection.timeout.ms=1000000 > default.replication.factor=2 > delete.topic.enable=true > unclean.leader.election.enable=true > > I just rebuilt my zk cluster and kafka cluster so this is a cold start. > > so from a cold start I ran my python code to create a topic. Got a > leadererror and ran again > > I then ran > > bin/kafka-topics.sh --zookeeper 1.zk.do.production.sf.ftest.com:2181, > 2.zk.do.production.sf.test.com:2181,3.zk.do.production.sf.test.com:2181 > --describe --topic topic-debug-production > > Topic:topic-debug-production PartitionCount:2 ReplicationFactor:2 Configs: > Topic: topic-debug-production Partition: 0 Leader: 21441 Replicas: > 21441,26680 Isr: 21441,26680 > Topic: topic-debug-production Partition: 1 Leader: 26680 Replicas: > 26680,21441 Isr: 26680 > > > So when I run the python code again I get the below: > > > import logging > logging.basicConfig( > > format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', > level=logging.DEBUG > ) > from kafka import SimpleProducer > from kafka import KafkaClient > kafka_host_list = '104.236.xxx.xxx:9092,104.236.xxx.xxx:9092' > kafka = KafkaClient(kafka_host_list) > producer = SimpleProducer(kafka, async=False) > test_topic = 'topic-debug-production' > test_payload = 'test' > response = producer.send_messages(test_topic,test_payload) > print response > > > 2015-10-10 > 00:17:37,599.599.545001984:kafka.client:140388282590976:DEBUG:20143:Request > 1: [] > 2015-10-10 > 00:17:37,600.600.193977356:kafka.conn:140388282590976:DEBUG:20143:Reinitializing > socket connection for 104.236.167.64:9092 > 2015-10-10 > 00:17:37,778.778.523921967:kafka.conn:140388282590976:DEBUG:20143:About to > send 30 bytes to Kafka, request 1 > 2015-10-10 > 00:17:37,779.779.033899307:kafka.conn:140388282590976:DEBUG:20143:Reading > response 1 from Kafka > 2015-10-10 > 00:17:37,779.779.129981995:kafka.conn:140388282590976:DEBUG:20143:About to > read 4 bytes from Kafka > 2015-10-10 > 00:17:37,955.955.996990204:kafka.conn:140388282590976:DEBUG:20143:Read 4/4 > bytes from Kafka > 2015-10-10 > 00:17:37,956.956.175088882:kafka.conn:140388282590976:DEBUG:20143:About to > read 12 bytes from Kafka > 2015-10-10 > 00:17:37,956.956.24089241:kafka.conn:140388282590976:DEBUG:20143:Read 12/12 > bytes from Kafka > 2015-10-10 > 00:17:37,956.956.319093704:kafka.client:140388282590976:DEBUG:20143:Response > 1: MetadataResponse(brokers=[], topics=[]) > 2015-10-10 > 00:17:37,956.956.377983093:kafka.client:140388282590976:DEBUG:20143:Updating > broker metadata: [] > 2015-10-10 > 00:17:37,956.956.424951553:kafka.client:140388282590976:DEBUG:20143:Updating > topic metadata: [] > 2015-10-10 > 00:17:37,956.956.562995911:kafka.client:140388282590976:DEBUG:20143:Request > 2: ['topic-debug-production'] > 2015-10-10 > 00:17:37,956.956.639051437:kafka.conn:140388282590976:DEBUG:20143:About to > send 54 bytes to Kafka, request 2 > 2015-10-10 > 00:17:37,956.956.777095795:kafka.conn:140388282590976:DEBUG:20143:Reading > response 2 from Kafka > 2015-10-10 > 00:17:37,956.956.829071045:kafka.conn:140388282590976:DEBUG:20143:About to > read 4 bytes from Kafka > 2015-10-10 > 00:17:38,142.142.988920212:kafka.conn:140388282590976:DEBUG:20143:Read 4/4 > bytes from Kafka > 2015-10-10 > 00:17:38,143.143.167972565:kafka.conn:140388282590976:DEBUG:20143:About to > read 42 bytes from Kafka > 2015-10-10 > 00:17:38,143.143.234968185:kafka.conn:140388282590976:DEBUG:20143:Read > 42/42 bytes from Kafka > 2015-10-10 > 00:17:38,143.143.313884735:kafka.client:140388282590976:DEBUG:20143:Response > 2: MetadataResponse(brokers=[], > topics=[TopicMetadata(topic='topic-debug-production', error=5, > partitions=[])]) > 2015-10-10 > 00:17:38,143.143.378973007:kafka.client:140388282590976:DEBUG:20143:Updating > broker metadata: [] > 2015-10-10 > 00:17:38,143.143.425941467:kafka.client:140388282590976:DEBUG:20143:Updating > topic metadata: [TopicMetadata(topic='topic-debug-production', error=5, > partitions=[])] > Traceback (most recent call last): > File > "/home/ubuntu/workspace/feed-tests/tests/post_druid_feeds_kafka_write.py", > line 13, in <module> > response = producer.send_messages(test_topic,test_payload) > File > "/usr/local/lib/python2.7/dist-packages/kafka_python-0.9.5_dev-py2.7.egg/kafka/producer/simple.py", > line 52, in send_messages > partition = self._next_partition(topic) > File > "/usr/local/lib/python2.7/dist-packages/kafka_python-0.9.5_dev-py2.7.egg/kafka/producer/simple.py", > line 36, in _next_partition > self.client.load_metadata_for_topics(topic) > File > "/usr/local/lib/python2.7/dist-packages/kafka_python-0.9.5_dev-py2.7.egg/kafka/client.py", > line 383, in load_metadata_for_topics > kafka.common.check_error(topic_metadata) > File > "/usr/local/lib/python2.7/dist-packages/kafka_python-0.9.5_dev-py2.7.egg/kafka/common.py", > line 233, in check_error > raise error_class(response) > kafka.common.LeaderNotAvailableError: > TopicMetadata(topic='topic-debug-production', error=5, partitions=[]) > > > > > > > > > > > > > > On Fri, Oct 9, 2015 at 11:53 PM, Prabhjot Bharaj <prabhbha...@gmail.com> > wrote: > >> Hi, >> >> Your metadata response is empty. Here is an output for my kafka cluster: >> >> *>>> kafka = >> >> KafkaClient(['96.a.a.a:9092','96.b.b.b:9092','96.c.c.c:9092','96.d.d.d:9092','96.e.e.e:9092'])* >> >> *2015-10-09 >> 21:16:53,575.575.392961502:kafka.client:140735282348800:DEBUG:8705:Request >> 1: []* >> >> *2015-10-09 >> >> 21:16:53,580.580.756902695:kafka.conn:140735282348800:DEBUG:8705:Reinitializing >> socket connection for 96.b.b.b:9092* >> >> *2015-10-09 >> 21:16:53,981.981.142044067:kafka.conn:140735282348800:DEBUG:8705:About to >> send 30 bytes to Kafka, request 1* >> >> *2015-10-09 >> 21:16:53,981.981.415987015:kafka.conn:140735282348800:DEBUG:8705:Reading >> response 1 from Kafka* >> >> *2015-10-09 >> 21:16:53,981.981.508016586:kafka.conn:140735282348800:DEBUG:8705:About to >> read 4 bytes from Kafka* >> >> *2015-10-09 >> 21:16:54,596.596.358060837:kafka.conn:140735282348800:DEBUG:8705:Read 4/4 >> bytes from Kafka* >> >> *2015-10-09 >> 21:16:54,596.596.518039703:kafka.conn:140735282348800:DEBUG:8705:About to >> read 389 bytes from Kafka* >> >> *2015-10-09 >> 21:16:54,596.596.612930298:kafka.conn:140735282348800:DEBUG:8705:Read >> 389/389 bytes from Kafka* >> >> *2015-10-09 >> >> 21:16:54,596.596.745014191:kafka.client:140735282348800:DEBUG:8705:Response >> 1: MetadataResponse(brokers=[BrokerMetadata(nodeId=1, host='96.a.a.a.a', >> port=9092), BrokerMetadata(nodeId=6, host='96.d.d.d', port=9092), >> BrokerMetadata(nodeId=2, host='96.b.b.b', port=9092), >> BrokerMetadata(nodeId=7, host='96.e.e.e', port=9092), >> BrokerMetadata(nodeId=4, host='96.c.c.c', port=9092)], >> topics=[TopicMetadata(topic='part_1_repl_3', error=0, >> partitions=[PartitionMetadata(topic='part_1_repl_3', partition=0, >> leader=2, >> replicas=(2, 6, 7), isr=(2, 6, 7), error=0)]), >> TopicMetadata(topic='part_1_repl_3_4', error=0, >> partitions=[PartitionMetadata(topic='part_1_repl_3_4', partition=0, >> leader=6, replicas=(6,), isr=(6,), error=0)])])* >> >> *2015-10-09 >> 21:16:54,596.596.867084503:kafka.client:140735282348800:INFO:8705:Updating >> broker metadata: [BrokerMetadata(nodeId=1, host='96.a.a.a.a', port=9092), >> BrokerMetadata(nodeId=6, host='96.d.d.d', port=9092), >> BrokerMetadata(nodeId=2, host='96.b.b.b', port=9092), >> BrokerMetadata(nodeId=7, host='96.e.e.e', port=9092), >> BrokerMetadata(nodeId=4, host='96.c.c.c', port=9092)] * >> >> *2015-10-09 >> 21:16:54,596.596.991062164:kafka.client:140735282348800:INFO:8705:Updating >> topic metadata: [TopicMetadata(topic='part_1_repl_3', error=0, >> partitions=[PartitionMetadata(topic='part_1_repl_3', partition=0, >> leader=2, >> replicas=(2, 6, 7), isr=(2, 6, 7), error=0)]), >> TopicMetadata(topic='part_1_repl_3_4', error=0, >> partitions=[PartitionMetadata(topic='part_1_repl_3_4', partition=0, >> leader=6, replicas=(6,), isr=(6,), error=0)])]* >> >> Is your cluster up ? >> >> Are you behind some firewall ? >> >> By the way, I'm using kafka-python-0.9.4 from my mac >> >> Regards, >> >> Prabhjot >> >> On Fri, Oct 9, 2015 at 7:32 PM, David Montgomery < >> davidmontgom...@gmail.com> >> wrote: >> >> > ps here is the output the I add logging to the code >> > >> > import logging >> > logging.basicConfig( >> > >> > >> > >> format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', >> > level=logging.DEBUG >> > ) >> > >> > 2015-10-09 >> > >> 20:59:33,656.656.464099884:kafka.client:139860093155072:DEBUG:25285:Request >> > 1: [] >> > 2015-10-09 >> > >> > >> 20:59:33,657.657.119989395:kafka.conn:139860093155072:DEBUG:25285:Reinitializing >> > socket connection for 45.55.xxx.xxx:9092 >> > 2015-10-09 >> > 20:59:33,928.928.045034409:kafka.conn:139860093155072:DEBUG:25285:About >> to >> > send 30 bytes to Kafka, request 1 >> > 2015-10-09 >> > >> 20:59:33,928.928.556919098:kafka.conn:139860093155072:DEBUG:25285:Reading >> > response 1 from Kafka >> > 2015-10-09 >> > 20:59:33,928.928.649902344:kafka.conn:139860093155072:DEBUG:25285:About >> to >> > read 4 bytes from Kafka >> > 2015-10-09 >> > 20:59:34,184.184.187889099:kafka.conn:139860093155072:DEBUG:25285:Read >> 4/4 >> > bytes from Kafka >> > 2015-10-09 >> > 20:59:34,184.184.477090836:kafka.conn:139860093155072:DEBUG:25285:About >> to >> > read 12 bytes from Kafka >> > 2015-10-09 >> > 20:59:34,184.184.587001801:kafka.conn:139860093155072:DEBUG:25285:Read >> > 12/12 bytes from Kafka >> > 2015-10-09 >> > >> > >> 20:59:34,184.184.746026993:kafka.client:139860093155072:DEBUG:25285:Response >> > 1: MetadataResponse(brokers=[], topics=[]) >> > 2015-10-09 >> > >> > >> 20:59:34,184.184.829950333:kafka.client:139860093155072:DEBUG:25285:Updating >> > broker metadata: [] >> > 2015-10-09 >> > >> 20:59:34,184.184.88407135:kafka.client:139860093155072:DEBUG:25285:Updating >> > topic metadata: [] >> > 2015-10-09 >> > >> 20:59:34,185.185.06193161:kafka.client:139860093155072:DEBUG:25285:Request >> > 2: ['topic-test-production'] >> > 2015-10-09 >> > 20:59:34,185.185.146093369:kafka.conn:139860093155072:DEBUG:25285:About >> to >> > send 53 bytes to Kafka, request 2 >> > 2015-10-09 >> > >> 20:59:34,185.185.309886932:kafka.conn:139860093155072:DEBUG:25285:Reading >> > response 2 from Kafka >> > 2015-10-09 >> > 20:59:34,185.185.367107391:kafka.conn:139860093155072:DEBUG:25285:About >> to >> > read 4 bytes from Kafka >> > 2015-10-09 >> > 20:59:34,445.445.18995285:kafka.conn:139860093155072:DEBUG:25285:Read >> 4/4 >> > bytes from Kafka >> > 2015-10-09 >> > 20:59:34,445.445.672988892:kafka.conn:139860093155072:DEBUG:25285:About >> to >> > read 41 bytes from Kafka >> > 2015-10-09 >> > 20:59:34,445.445.739984512:kafka.conn:139860093155072:DEBUG:25285:Read >> > 41/41 bytes from Kafka >> > 2015-10-09 >> > >> > >> 20:59:34,445.445.817947388:kafka.client:139860093155072:DEBUG:25285:Response >> > 2: MetadataResponse(brokers=[], >> > topics=[TopicMetadata(topic='topic-test-production', error=5, >> > partitions=[])]) >> > 2015-10-09 >> > >> > >> 20:59:34,445.445.895910263:kafka.client:139860093155072:DEBUG:25285:Updating >> > broker metadata: [] >> > 2015-10-09 >> > >> 20:59:34,445.445.94502449:kafka.client:139860093155072:DEBUG:25285:Updating >> > topic metadata: [TopicMetadata(topic='topic-test-production', error=5, >> > partitions=[])] >> > >> > >> > On Fri, Oct 9, 2015 at 7:00 PM, David Montgomery < >> > davidmontgom...@gmail.com> >> > wrote: >> > >> > > This is the python error >> > > >> > > kafka.common.LeaderNotAvailableError: >> > > TopicMetadata(topic='topic-test-production', error=5, partitions=[]) >> > > >> > > >> > > On Fri, Oct 9, 2015 at 6:55 PM, David Montgomery < >> > > davidmontgom...@gmail.com> wrote: >> > > >> > >> Well i masked my true domain but the hostnames are valid and do >> work. I >> > >> do see the topics creates in ZK >> > >> >> > >> >> > >> brokers/topics/topic-test-production/partisions/0/state >> > >> {"controller_epoch":10,"leader":26665,"version":1,"leader_epoch":2,"isr":[26665]} >> > >> >> > >> brokers/topics/topic-test-production/partisions/1/state >> > >> {"controller_epoch":10,"leader":26665,"version":1,"leader_epoch":3,"isr":[26665]} >> > >> for example >> > {"version":1,"partitions":{"1":[2028,26665],"0":[26665,2028]}} >> > >> >> > >> But now I do get the below on describe: >> > >> roduction PartitionCount:2 ReplicationFactor:2 Configs: >> > >> Topic: topic-test-production Partition: 0 Leader: 26665 Replicas: >> > >> 26665,2028 Isr: 26665 >> > >> Topic: topic-test-production Partition: 1 Leader: 26665 Replicas: >> > >> 2028,26665 Isr: 26665 >> > >> >> > >> But in kafka-python I still get this error >> > >> kafka.common.LeaderNotAvailableError: >> > >> TopicMetadata(topic='topic-test-production', error=5, partitions=[]) >> > >> >> > >> >> > >> >> > >> On Fri, Oct 9, 2015 at 5:31 PM, Prabhjot Bharaj < >> prabhbha...@gmail.com> >> > >> wrote: >> > >> >> > >>> Hi, >> > >>> >> > >>> Is this a valid zookeeper connection string >> > >>> >> > >>> >> > >>> >> > >> 1.zk.do.production.test:2181,2.zk.do.production.test:2181,3.zk.do.production.test:2181 >> > >>> >> > >>> Is 2.zk.do.production.test or 3.zk.do.production.test a valid >> > hostname?? >> > >>> >> > >>> Regards, >> > >>> Prabhjot >> > >>> On Oct 9, 2015 1:37 PM, "David Montgomery" < >> davidmontgom...@gmail.com> >> > >>> wrote: >> > >>> >> > >>> > PS I added one more server now 3 kafka servers. still kafka is >> not >> > >>> > working. Is there a known bug? >> > >>> > >> > >>> > On Fri, Oct 9, 2015 at 3:23 PM, David Montgomery < >> > >>> > davidmontgom...@gmail.com> >> > >>> > wrote: >> > >>> > >> > >>> > > i, >> > >>> > > >> > >>> > > I have 3 ZK servers and 2 kafka servers with 2 partitions. >> > >>> > > >> > >>> > > I am using kafka_2.11-0.8.2.1 >> > >>> > > >> > >>> > > Here is how I am creating a topic: >> > >>> > > >> > >>> > > bin/kafka-topics.sh --zookeeper >> > >>> > > >> > >>> > >> > >>> >> > >> 1.zk.do.production.test:2181,2.zk.do.production.test:2181,3.zk.do.production.test:2181 >> > >>> > > --create --topic topic-test-production --partitions 2 >> > >>> > --replication-factor 2 >> > >>> > > >> > >>> > > here is what I describe: >> > >>> > > >> > >>> > > bin/kafka-topics.sh >> > >>> > > --zookeeper >> > >>> > >> > >>> >> > >> .zk.do.production.test:2181,2.zk.do.production.test:2181,3.zk.do.production.test:2181 >> > >>> > --describe >> > >>> > > --topic topic-test-production >> > >>> > > >> > >>> > > Topic:topic-test-production PartitionCount:2 ReplicationFactor:2 >> > >>> Configs: >> > >>> > > Topic: topic-test-production Partition: 0 Leader: none Replicas: >> > >>> > > 26665,2028 Isr: >> > >>> > > Topic: topic-test-production Partition: 1 Leader: none Replicas: >> > >>> > > 2028,26665 Isr: >> > >>> > > >> > >>> > > When i try to write to kafka in python I get this error. >> > >>> > > >> > >>> > > kafka.common.LeaderNotAvailableError: >> > >>> > > TopicMetadata(topic='topic-test-production', error=5, >> > partitions=[]) >> > >>> > > >> > >>> > > What is wrong with kafka? >> > >>> > > >> > >>> > > Thanks >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> > >> > >>> >> > >> >> > >> >> > > >> > >> >> >> >> -- >> --------------------------------------------------------- >> "There are only 10 types of people in the world: Those who understand >> binary, and those who don't" >> > >