Can you confirm the following? 1. Are you sending new data to the Kafka topic AFTER starting the streaming query? Since you have specified `*startingOffsets` *as* `latest`*, data needs to the topic after the query start for the query to receiver. 2. Are you able to read kafka data using Kafka's console consumer, from the same machine running the query? That would clear up any confusion regarding connectivity.
If the above are cleared, I would look at INFO and DEBUG level log4j logs to see what the query is doing? is it stuck at some point or is it continuously running but not finding latest offsets? On Thu, Dec 1, 2016 at 6:31 AM, Otávio Carvalho <otav...@gmail.com> wrote: > Hello hivemind, > > I am trying to connect my Spark 2.0.2 cluster to an Apache Kafka 0.10 > cluster via spark-shell. > > The connection works fine, but it is not able to receive the messages > published to the topic. > > It doesn't throw any error, but it is not able to retrieve any message (I > am sure that messages are being published 'cause I am able to read from the > topic from the same machine) > > Here follows the spark-shell code/output: > > *val ds1 = spark.readStream* > *.format("kafka")* > *.option("subscribe", "clickstream")* > *.option("kafka.bootstrap.servers", > "ec2-54-208-12-171.compute-1.amazonaws.com:9092 > <http://ec2-54-208-12-171.compute-1.amazonaws.com:9092>")* > *.option("startingOffsets", "latest")* > *.load* > > *// Exiting paste mode, now interpreting.* > > *ds1: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 > more fields]* > > *scala> val counter = ds1.groupBy("value").count* > *counter: org.apache.spark.sql.DataFrame = [value: binary, count: bigint]* > > *scala> import org.apache.spark.sql.streaming.OutputMode.Complete* > *import org.apache.spark.sql.streaming.OutputMode.Complete* > > *val query = counter.writeStream* > * .outputMode(Complete)* > * .format("console")* > * .start* > > *// Exiting paste mode, now interpreting.* > > *query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query - > query-1 [state = ACTIVE]* > > *scala> query.status* > *res0: org.apache.spark.sql.streaming.StreamingQueryStatus =* > *Status of query 'query-1'* > * Query id: 1* > * Status timestamp: 1480602056895* > * Input rate: 0.0 rows/sec* > * Processing rate 0.0 rows/sec* > * Latency: - ms* > * Trigger details:* > * isTriggerActive: true* > * statusMessage: Finding new data from sources* > * timestamp.triggerStart: 1480602056894* > * triggerId: -1* > * Source statuses [1 source]:* > * Source 1 - KafkaSource[Subscribe[clickstream]]* > * Available offset: -* > * Input rate: 0.0 rows/sec* > * Processing rate: 0.0 rows/sec* > * Trigger details:* > * triggerId: -1* > * Sink status - > org.apache.spark.sql.execution.streaming.ConsoleSink@54d5b6cb* > * Committed offsets: [-]* > > I am starting the spark-shell as follows: > /root/spark/bin/spark-shell --packages org.apache.spark:spark-sql- > kafka-0-10_2.10:2.0.2 > > Thanks, > Otávio Carvalho. > > -- > Otávio Carvalho > Consultant Developer > Email ocarv...@thoughtworks.com > Telephone +55 53 91565742 <+55+53+91565742> > [image: ThoughtWorks] > <http://www.thoughtworks.com/?utm_campaign=ot%C3%A1vio-moraes%20de%20carvalho-signature&utm_medium=email&utm_source=thoughtworks-email-signature-generator> >