code:
      val query = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "somenode:9092")
        .option("subscribe", "wikipedia")
        .load
        .select(col("value") cast StringType)
        .writeStream
        .format("console")
        .outputMode(OutputMode.Append)
        .start()

      while (true) {
        Thread.sleep(10000)
        println(query.lastProgress)
      }
    }

On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <alons...@gmail.com>
wrote:

> lets see the code...
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>
> 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tresata.com>:
>
>> my little program prints out query.lastProgress every 10 seconds, and
>> this is what it shows:
>>
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:54:45.732Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>>     "getOffset" : 9,
>>     "triggerExecution" : 10
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>     "startOffset" : {
>>       "wikipedia" : {
>>         "2" : 0,
>>         "4" : 0,
>>         "1" : 0,
>>         "3" : 0,
>>         "0" : 0
>>       }
>>     },
>>     "endOffset" : {
>>       "wikipedia" : {
>>         "2" : 0,
>>         "4" : 0,
>>         "1" : 0,
>>         "3" : 0,
>>         "0" : 0
>>       }
>>     },
>>     "numInputRows" : 0,
>>     "inputRowsPerSecond" : 0.0,
>>     "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>>     "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@4818d2d9"
>>   }
>> }
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:54:55.745Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>>     "getOffset" : 5,
>>     "triggerExecution" : 5
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>     "startOffset" : {
>>       "wikipedia" : {
>>         "2" : 0,
>>         "4" : 0,
>>         "1" : 0,
>>         "3" : 0,
>>         "0" : 0
>>       }
>>     },
>>     "endOffset" : {
>>       "wikipedia" : {
>>         "2" : 0,
>>         "4" : 0,
>>         "1" : 0,
>>         "3" : 0,
>>         "0" : 0
>>       }
>>     },
>>     "numInputRows" : 0,
>>     "inputRowsPerSecond" : 0.0,
>>     "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>>     "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@4818d2d9"
>>   }
>> }
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:55:05.748Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>>     "getOffset" : 5,
>>     "triggerExecution" : 5
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>     "startOffset" : {
>>       "wikipedia" : {
>>         "2" : 0,
>>         "4" : 0,
>>         "1" : 0,
>>         "3" : 0,
>>         "0" : 0
>>       }
>>     },
>>     "endOffset" : {
>>       "wikipedia" : {
>>         "2" : 0,
>>         "4" : 0,
>>         "1" : 0,
>>         "3" : 0,
>>         "0" : 0
>>       }
>>     },
>>     "numInputRows" : 0,
>>     "inputRowsPerSecond" : 0.0,
>>     "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>>     "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@4818d2d9"
>>   }
>> }
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:55:15.758Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>>     "getOffset" : 4,
>>     "triggerExecution" : 4
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>     "startOffset" : {
>>       "wikipedia" : {
>>         "2" : 0,
>>         "4" : 0,
>>         "1" : 0,
>>         "3" : 0,
>>         "0" : 0
>>       }
>>     },
>>     "endOffset" : {
>>       "wikipedia" : {
>>         "2" : 0,
>>         "4" : 0,
>>         "1" : 0,
>>         "3" : 0,
>>         "0" : 0
>>       }
>>     },
>>     "numInputRows" : 0,
>>     "inputRowsPerSecond" : 0.0,
>>     "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>>     "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@4818d2d9"
>>   }
>> }
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:55:25.760Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>>     "getOffset" : 4,
>>     "triggerExecution" : 4
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>     "startOffset" : {
>>       "wikipedia" : {
>>         "2" : 0,
>>         "4" : 0,
>>         "1" : 0,
>>         "3" : 0,
>>         "0" : 0
>>       }
>>     },
>>     "endOffset" : {
>>       "wikipedia" : {
>>         "2" : 0,
>>         "4" : 0,
>>         "1" : 0,
>>         "3" : 0,
>>         "0" : 0
>>       }
>>     },
>>     "numInputRows" : 0,
>>     "inputRowsPerSecond" : 0.0,
>>     "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>>     "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@4818d2d9"
>>   }
>> }
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:55:35.766Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>>     "getOffset" : 4,
>>     "triggerExecution" : 4
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>     "startOffset" : {
>>       "wikipedia" : {
>>         "2" : 0,
>>         "4" : 0,
>>         "1" : 0,
>>         "3" : 0,
>>         "0" : 0
>>       }
>>     },
>>     "endOffset" : {
>>       "wikipedia" : {
>>         "2" : 0,
>>         "4" : 0,
>>         "1" : 0,
>>         "3" : 0,
>>         "0" : 0
>>       }
>>     },
>>     "numInputRows" : 0,
>>     "inputRowsPerSecond" : 0.0,
>>     "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>>     "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@4818d2d9"
>>   }
>> }
>>
>>
>> On Thu, Jan 26, 2017 at 10:33 PM, Koert Kuipers <ko...@tresata.com>
>> wrote:
>>
>>> hey,
>>> i am just getting started with kafka + spark structured streaming. so
>>> this is probably a pretty dumb mistake.
>>>
>>> i wrote a little program in spark to read messages from a kafka topic
>>> and display them in the console, using the kafka source and console sink. i
>>> run it it in spark local mode.
>>>
>>> i hooked it up to a test topic that i send messages to using the kafka
>>> console producer, and everything works great. i type a message in the
>>> console producer, and it pops up in my spark program. very neat!
>>>
>>> next i point it to another topic instead on which a kafka-connect
>>> program is writing lots of irc messages. i can see kafka connect to the
>>> topic successfully, the partitions are discovered etc., and then...
>>> nothing. it just keeps stuck at offsets 0 for all partitions. at the same
>>> time in another terminal i can see messages coming in just fine using the
>>> kafka console consumer.
>>>
>>> i dont get it. why doesnt kafka want to consume from this topic in spark
>>> structured streaming?
>>>
>>> thanks! koert
>>>
>>>
>>>
>>
>

Reply via email to