I am looking on how to get kafka to work with python and sending messages.

I am using python kafka to produce and consume messages.
https://github.com/mumrah/kafka-python



 the messages that I am sending is a json string in python.

kafka = KafkaClient(kafka_domain, 9092)
producer = SimpleProducer(kafka, async=True)
producer.send_messages(pixel_topic,payload1)

consumer = SimpleConsumer(kafka, "my-group1",pixel_topic)
    for message in consumer:
        print '------',message


Here is the errors I get in python

------ OffsetAndMessage(offset=14, message=Message(magic=0, attributes=0,
key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1,
"country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id":
"cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt":
"2014-02-21T06:41:42", "wp": 100, "bp": 0.001, "domain": "news.google.com",
"tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3,
"os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}'))
------ OffsetAndMessage(offset=15, message=Message(magic=0, attributes=0,
key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1,
"country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id":
"cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt":
"2014-02-25T00:43:56", "wp": 100, "bp": 0.001, "domain": "news.google.com",
"tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3,
"os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}'))
------ OffsetAndMessage(offset=16, message=Message(magic=0, attributes=0,
key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1,
"country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id":
"cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt":
"2014-02-25T00:44:25", "wp": 100, "bp": 0.001, "domain": "news.google.com",
"tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3,
"os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}'))
------ OffsetAndMessage(offset=17, message=Message(magic=0, attributes=0,
key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1,
"country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id":
"cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt":
"2014-02-25T00:46:04", "wp": 100, "bp": 0.001, "domain": "news.google.com",
"tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3,
"os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}'))
------ OffsetAndMessage(offset=9, message=Message(magic=0, attributes=0,
key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1,
"country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id":
"cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt":
"2014-02-21T06:41:37", "wp": 100, "bp": 0.001, "domain": "news.google.com",
"tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3,
"os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}'))
------ OffsetAndMessage(offset=10, message=Message(magic=0, attributes=0,
key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1,
"country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id":
"cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt":
"2014-02-21T06:41:39", "wp": 100, "bp": 0.001, "domain": "news.google.com",
"tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3,
"os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}'))
------ OffsetAndMessage(offset=11, message=Message(magic=0, attributes=0,
key=None, value='{"enode": 1, "city": "Sydney", "dl": "en", "wnode": 1,
"country": "AU", "creative_id": "crid2787744916", "pp": 0, "campaign_id":
"cid2504649263", "bt": "Microsoft Internet Explorer", "utcdt":
"2014-02-21T06:41:41", "wp": 100, "bp": 0.001, "domain": "news.google.com",
"tree_id": "c719f01a-9376-437c-9f13-520833177833", "ebp": 0.001, "vid1": 3,
"os": "Windows", "region": "AU-QLD", "adgroup_id": "agid8153515032"}'))
No handlers could be found for logger "kafka"
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/kafka/util.py", line 99, in
_timer
    self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer.py", line
152, in commit
    resps = self.client.send_offset_commit_request(self.group, reqs)
  File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 346,
in send_offset_commit_request
    resps = self._send_broker_aware_request(payloads, encoder, decoder)
  File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 160,
in _send_broker_aware_request
    raise FailedPayloadsError(failed_payloads)
FailedPayloadsError: [OffsetCommitRequest(topic='topic-pixel', partition=0,
offset=18, metadata=None), OffsetCommitRequest(topic='topic-pixel',
partition=1, offset=12, metadata=None)]



Here is my kafka logs....

ERROR Closing socket for /222.127.xx.xxx because of error
(kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 8
    at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:53)
    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:49)
    at kafka.network.Processor.read(SocketServer.scala:353)
    at kafka.network.Processor.run(SocketServer.scala:245)
    at java.lang.Thread.run(Thread.java:744)
[2014-02-25 00:45:31,351] INFO Closing socket connection to
/222.127.xxx.xxx. (kafka.network.Processor)
[2014-02-25 00:46:14,688] ERROR Closing socket for /222.127.xxx.xxx because
of error (kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 8
    at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:53)
    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:49)
    at kafka.network.Processor.read(SocketServer.scala:353)
    at kafka.network.Processor.run(SocketServer.scala:245)
    at java.lang.Thread.run(Thread.java:744)
[2014-02-25 00:47:09,957] INFO Closing socket connection to /222.127.178.107.
(kafka.network.Processor)



I am using

Reply via email to