In the end, the mistake I made was that I forgot to setup the proper export
AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY on the machine I was running
the spark-shell.

Nevertheless, thanks for answering, Tathagata Das.

Otávio.

2016-12-01 17:36 GMT-02:00 Tathagata Das <tathagata.das1...@gmail.com>:

> Can you confirm the following?
> 1. Are you sending new data to the Kafka topic AFTER starting the
> streaming query? Since you have specified `*startingOffsets` *as*
> `latest`*, data needs to the topic after the query start for the query to
> receiver.
> 2. Are you able to read kafka data using Kafka's console consumer, from
> the same machine running the query? That would clear up any confusion
> regarding connectivity.
>
> If the above are cleared, I would look at INFO and DEBUG level log4j logs
> to see what the query is doing? is it stuck at some point or is it
> continuously running but not finding latest offsets?
>
>
> On Thu, Dec 1, 2016 at 6:31 AM, Otávio Carvalho <otav...@gmail.com> wrote:
>
>> Hello hivemind,
>>
>> I am trying to connect my Spark 2.0.2 cluster to an Apache Kafka 0.10
>> cluster via spark-shell.
>>
>> The connection works fine, but it is not able to receive the messages
>> published to the topic.
>>
>> It doesn't throw any error, but it is not able to retrieve any message (I
>> am sure that messages are being published 'cause I am able to read from the
>> topic from the same machine)
>>
>> Here follows the spark-shell code/output:
>>
>> *val ds1 = spark.readStream*
>> *.format("kafka")*
>> *.option("subscribe", "clickstream")*
>> *.option("kafka.bootstrap.servers",
>> "ec2-54-208-12-171.compute-1.amazonaws.com:9092
>> <http://ec2-54-208-12-171.compute-1.amazonaws.com:9092>")*
>> *.option("startingOffsets", "latest")*
>> *.load*
>>
>> *// Exiting paste mode, now interpreting.*
>>
>> *ds1: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5
>> more fields]*
>>
>> *scala> val counter = ds1.groupBy("value").count*
>> *counter: org.apache.spark.sql.DataFrame = [value: binary, count: bigint]*
>>
>> *scala> import org.apache.spark.sql.streaming.OutputMode.Complete*
>> *import org.apache.spark.sql.streaming.OutputMode.Complete*
>>
>> *val query = counter.writeStream*
>> *  .outputMode(Complete)*
>> *  .format("console")*
>> *  .start*
>>
>> *// Exiting paste mode, now interpreting.*
>>
>> *query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query -
>> query-1 [state = ACTIVE]*
>>
>> *scala> query.status*
>> *res0: org.apache.spark.sql.streaming.StreamingQueryStatus =*
>> *Status of query 'query-1'*
>> *    Query id: 1*
>> *    Status timestamp: 1480602056895*
>> *    Input rate: 0.0 rows/sec*
>> *    Processing rate 0.0 rows/sec*
>> *    Latency: - ms*
>> *    Trigger details:*
>> *        isTriggerActive: true*
>> *        statusMessage: Finding new data from sources*
>> *        timestamp.triggerStart: 1480602056894*
>> *        triggerId: -1*
>> *    Source statuses [1 source]:*
>> *        Source 1 - KafkaSource[Subscribe[clickstream]]*
>> *            Available offset: -*
>> *            Input rate: 0.0 rows/sec*
>> *            Processing rate: 0.0 rows/sec*
>> *            Trigger details:*
>> *                triggerId: -1*
>> *    Sink status -
>> org.apache.spark.sql.execution.streaming.ConsoleSink@54d5b6cb*
>> *        Committed offsets: [-]*
>>
>> I am starting the spark-shell as follows:
>> /root/spark/bin/spark-shell --packages org.apache.spark:spark-sql-kaf
>> ka-0-10_2.10:2.0.2
>>
>> Thanks,
>> Otávio Carvalho.
>>
>> --
>> Otávio Carvalho
>> Consultant Developer
>> Email ocarv...@thoughtworks.com
>> Telephone +55 53 91565742 <+55+53+91565742>
>> [image: ThoughtWorks]
>> <http://www.thoughtworks.com/?utm_campaign=ot%C3%A1vio-moraes%20de%20carvalho-signature&utm_medium=email&utm_source=thoughtworks-email-signature-generator>
>>
>
>

Reply via email to