Hi, I am trying to consume from Kafka topics following
http://spark.apache.org/docs/latest/streaming-kafka-integration.html Approach
one(createStream). I am not able to write it to local text file using
saveAsTextFiles() function. Below is the code
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
zkQuorum, topic = 'localhost:9092', 'python-kafka'
kafka_stream = KafkaUtils.createStream(ssc, zkQuorum,None, {topic: 1})
lines = kafka_stream.map(lambda x: x[1])
kafka_stream.saveAsTextFiles('file:///home/puneett/')
When I access the consumer I get following output
[puneett@gb-slo-svb-0255 ~]$
/nfs/science/shared/kafka/kafka/bin/kafka-console-consumer.sh --topic
python-kafka --property schema.registry.url="http://localhost:9092" --zookeeper
localhost:2182 --from-beginning
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/nfs/science/shared/kafka/kafka/core/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/nfs/science/shared/kafka/kafka/tools/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/nfs/science/shared/kafka/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/nfs/science/shared/kafka/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/nfs/science/shared/kafka/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/nfs/science/shared/kafka/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There
Please can someone suggest what am I doing wrong?
Regards,
Puneet
dunnhumby limited is a limited company registered in England and Wales with
registered number 02388853 and VAT registered number 927 5871 83. Our
registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The
contents of this message and any attachments to it are confidential and may be
legally privileged. If you have received this message in error you should
delete it from your system immediately and advise the sender. dunnhumby may
monitor and record all emails. The views expressed in this email are those of
the sender and not those of dunnhumby.