I am looking on how to get kafka to work with python and sending messages.
I am using python kafka to produce and consume messages. https://github.com/mumrah/kafka-python the messages that I am sending is a json string in python. kafka = KafkaClient(kafka_domain, 9092) producer = SimpleProducer(kafka, async=True) producer.send_messages(pixel_topic,payload1) consumer = SimpleConsumer(kafka, "my-group1",pixel_topic) for message in consumer: print '------',message Here is the errors I get in python ------ OffsetAndMessage(offset=14, message=Message(magic=0, attributes=0, key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1, "country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id": "cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt": "2014-02-21T06:41:42", "wp": 100, "bp": 0.001, "domain": "news.google.com", "tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3, "os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}')) ------ OffsetAndMessage(offset=15, message=Message(magic=0, attributes=0, key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1, "country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id": "cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt": "2014-02-25T00:43:56", "wp": 100, "bp": 0.001, "domain": "news.google.com", "tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3, "os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}')) ------ OffsetAndMessage(offset=16, message=Message(magic=0, attributes=0, key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1, "country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id": "cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt": "2014-02-25T00:44:25", "wp": 100, "bp": 0.001, "domain": "news.google.com", "tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3, "os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}')) ------ OffsetAndMessage(offset=17, message=Message(magic=0, attributes=0, key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1, "country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id": "cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt": "2014-02-25T00:46:04", "wp": 100, "bp": 0.001, "domain": "news.google.com", "tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3, "os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}')) ------ OffsetAndMessage(offset=9, message=Message(magic=0, attributes=0, key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1, "country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id": "cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt": "2014-02-21T06:41:37", "wp": 100, "bp": 0.001, "domain": "news.google.com", "tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3, "os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}')) ------ OffsetAndMessage(offset=10, message=Message(magic=0, attributes=0, key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1, "country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id": "cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt": "2014-02-21T06:41:39", "wp": 100, "bp": 0.001, "domain": "news.google.com", "tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3, "os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}')) ------ OffsetAndMessage(offset=11, message=Message(magic=0, attributes=0, key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1, "country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id": "cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt": "2014-02-21T06:41:41", "wp": 100, "bp": 0.001, "domain": "news.google.com", "tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3, "os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}')) No handlers could be found for logger "kafka" Exception in thread Thread-1: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 504, in run self.__target(*self.__args, **self.__kwargs) File "/usr/local/lib/python2.7/dist-packages/kafka/util.py", line 99, in _timer self.fn(*self.args, **self.kwargs) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer.py", line 152, in commit resps = self.client.send_offset_commit_request(self.group, reqs) File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 346, in send_offset_commit_request resps = self._send_broker_aware_request(payloads, encoder, decoder) File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 160, in _send_broker_aware_request raise FailedPayloadsError(failed_payloads) FailedPayloadsError: [OffsetCommitRequest(topic='topic-pixel', partition=0, offset=18, metadata=None), OffsetCommitRequest(topic='topic-pixel', partition=1, offset=12, metadata=None)] Here is my kafka logs.... ERROR Closing socket for /222.127.xx.xxx because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 8 at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:53) at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:49) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) [2014-02-25 00:45:31,351] INFO Closing socket connection to /222.127.xxx.xxx. (kafka.network.Processor) [2014-02-25 00:46:14,688] ERROR Closing socket for /222.127.xxx.xxx because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 8 at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:53) at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:49) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) [2014-02-25 00:47:09,957] INFO Closing socket connection to /222.127.178.107. (kafka.network.Processor) I am using