In the end, the mistake I made was that I forgot to setup the proper export AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY on the machine I was running the spark-shell.
Nevertheless, thanks for answering, Tathagata Das. Otávio. 2016-12-01 17:36 GMT-02:00 Tathagata Das <tathagata.das1...@gmail.com>: > Can you confirm the following? > 1. Are you sending new data to the Kafka topic AFTER starting the > streaming query? Since you have specified `*startingOffsets` *as* > `latest`*, data needs to the topic after the query start for the query to > receiver. > 2. Are you able to read kafka data using Kafka's console consumer, from > the same machine running the query? That would clear up any confusion > regarding connectivity. > > If the above are cleared, I would look at INFO and DEBUG level log4j logs > to see what the query is doing? is it stuck at some point or is it > continuously running but not finding latest offsets? > > > On Thu, Dec 1, 2016 at 6:31 AM, Otávio Carvalho <otav...@gmail.com> wrote: > >> Hello hivemind, >> >> I am trying to connect my Spark 2.0.2 cluster to an Apache Kafka 0.10 >> cluster via spark-shell. >> >> The connection works fine, but it is not able to receive the messages >> published to the topic. >> >> It doesn't throw any error, but it is not able to retrieve any message (I >> am sure that messages are being published 'cause I am able to read from the >> topic from the same machine) >> >> Here follows the spark-shell code/output: >> >> *val ds1 = spark.readStream* >> *.format("kafka")* >> *.option("subscribe", "clickstream")* >> *.option("kafka.bootstrap.servers", >> "ec2-54-208-12-171.compute-1.amazonaws.com:9092 >> <http://ec2-54-208-12-171.compute-1.amazonaws.com:9092>")* >> *.option("startingOffsets", "latest")* >> *.load* >> >> *// Exiting paste mode, now interpreting.* >> >> *ds1: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 >> more fields]* >> >> *scala> val counter = ds1.groupBy("value").count* >> *counter: org.apache.spark.sql.DataFrame = [value: binary, count: bigint]* >> >> *scala> import org.apache.spark.sql.streaming.OutputMode.Complete* >> *import org.apache.spark.sql.streaming.OutputMode.Complete* >> >> *val query = counter.writeStream* >> * .outputMode(Complete)* >> * .format("console")* >> * .start* >> >> *// Exiting paste mode, now interpreting.* >> >> *query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query - >> query-1 [state = ACTIVE]* >> >> *scala> query.status* >> *res0: org.apache.spark.sql.streaming.StreamingQueryStatus =* >> *Status of query 'query-1'* >> * Query id: 1* >> * Status timestamp: 1480602056895* >> * Input rate: 0.0 rows/sec* >> * Processing rate 0.0 rows/sec* >> * Latency: - ms* >> * Trigger details:* >> * isTriggerActive: true* >> * statusMessage: Finding new data from sources* >> * timestamp.triggerStart: 1480602056894* >> * triggerId: -1* >> * Source statuses [1 source]:* >> * Source 1 - KafkaSource[Subscribe[clickstream]]* >> * Available offset: -* >> * Input rate: 0.0 rows/sec* >> * Processing rate: 0.0 rows/sec* >> * Trigger details:* >> * triggerId: -1* >> * Sink status - >> org.apache.spark.sql.execution.streaming.ConsoleSink@54d5b6cb* >> * Committed offsets: [-]* >> >> I am starting the spark-shell as follows: >> /root/spark/bin/spark-shell --packages org.apache.spark:spark-sql-kaf >> ka-0-10_2.10:2.0.2 >> >> Thanks, >> Otávio Carvalho. >> >> -- >> Otávio Carvalho >> Consultant Developer >> Email ocarv...@thoughtworks.com >> Telephone +55 53 91565742 <+55+53+91565742> >> [image: ThoughtWorks] >> <http://www.thoughtworks.com/?utm_campaign=ot%C3%A1vio-moraes%20de%20carvalho-signature&utm_medium=email&utm_source=thoughtworks-email-signature-generator> >> > >