code: val query = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "somenode:9092") .option("subscribe", "wikipedia") .load .select(col("value") cast StringType) .writeStream .format("console") .outputMode(OutputMode.Append) .start()
while (true) { Thread.sleep(10000) println(query.lastProgress) } } On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <alons...@gmail.com> wrote: > lets see the code... > > Alonso Isidoro Roman > [image: https://]about.me/alonso.isidoro.roman > > <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> > > 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tresata.com>: > >> my little program prints out query.lastProgress every 10 seconds, and >> this is what it shows: >> >> { >> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >> "name" : "wiki", >> "timestamp" : "2017-01-26T22:54:45.732Z", >> "numInputRows" : 0, >> "inputRowsPerSecond" : 0.0, >> "processedRowsPerSecond" : 0.0, >> "durationMs" : { >> "getOffset" : 9, >> "triggerExecution" : 10 >> }, >> "stateOperators" : [ ], >> "sources" : [ { >> "description" : "KafkaSource[Subscribe[wikipedia]]", >> "startOffset" : { >> "wikipedia" : { >> "2" : 0, >> "4" : 0, >> "1" : 0, >> "3" : 0, >> "0" : 0 >> } >> }, >> "endOffset" : { >> "wikipedia" : { >> "2" : 0, >> "4" : 0, >> "1" : 0, >> "3" : 0, >> "0" : 0 >> } >> }, >> "numInputRows" : 0, >> "inputRowsPerSecond" : 0.0, >> "processedRowsPerSecond" : 0.0 >> } ], >> "sink" : { >> "description" : "org.apache.spark.sql.executio >> n.streaming.ConsoleSink@4818d2d9" >> } >> } >> { >> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >> "name" : "wiki", >> "timestamp" : "2017-01-26T22:54:55.745Z", >> "numInputRows" : 0, >> "inputRowsPerSecond" : 0.0, >> "processedRowsPerSecond" : 0.0, >> "durationMs" : { >> "getOffset" : 5, >> "triggerExecution" : 5 >> }, >> "stateOperators" : [ ], >> "sources" : [ { >> "description" : "KafkaSource[Subscribe[wikipedia]]", >> "startOffset" : { >> "wikipedia" : { >> "2" : 0, >> "4" : 0, >> "1" : 0, >> "3" : 0, >> "0" : 0 >> } >> }, >> "endOffset" : { >> "wikipedia" : { >> "2" : 0, >> "4" : 0, >> "1" : 0, >> "3" : 0, >> "0" : 0 >> } >> }, >> "numInputRows" : 0, >> "inputRowsPerSecond" : 0.0, >> "processedRowsPerSecond" : 0.0 >> } ], >> "sink" : { >> "description" : "org.apache.spark.sql.executio >> n.streaming.ConsoleSink@4818d2d9" >> } >> } >> { >> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >> "name" : "wiki", >> "timestamp" : "2017-01-26T22:55:05.748Z", >> "numInputRows" : 0, >> "inputRowsPerSecond" : 0.0, >> "processedRowsPerSecond" : 0.0, >> "durationMs" : { >> "getOffset" : 5, >> "triggerExecution" : 5 >> }, >> "stateOperators" : [ ], >> "sources" : [ { >> "description" : "KafkaSource[Subscribe[wikipedia]]", >> "startOffset" : { >> "wikipedia" : { >> "2" : 0, >> "4" : 0, >> "1" : 0, >> "3" : 0, >> "0" : 0 >> } >> }, >> "endOffset" : { >> "wikipedia" : { >> "2" : 0, >> "4" : 0, >> "1" : 0, >> "3" : 0, >> "0" : 0 >> } >> }, >> "numInputRows" : 0, >> "inputRowsPerSecond" : 0.0, >> "processedRowsPerSecond" : 0.0 >> } ], >> "sink" : { >> "description" : "org.apache.spark.sql.executio >> n.streaming.ConsoleSink@4818d2d9" >> } >> } >> { >> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >> "name" : "wiki", >> "timestamp" : "2017-01-26T22:55:15.758Z", >> "numInputRows" : 0, >> "inputRowsPerSecond" : 0.0, >> "processedRowsPerSecond" : 0.0, >> "durationMs" : { >> "getOffset" : 4, >> "triggerExecution" : 4 >> }, >> "stateOperators" : [ ], >> "sources" : [ { >> "description" : "KafkaSource[Subscribe[wikipedia]]", >> "startOffset" : { >> "wikipedia" : { >> "2" : 0, >> "4" : 0, >> "1" : 0, >> "3" : 0, >> "0" : 0 >> } >> }, >> "endOffset" : { >> "wikipedia" : { >> "2" : 0, >> "4" : 0, >> "1" : 0, >> "3" : 0, >> "0" : 0 >> } >> }, >> "numInputRows" : 0, >> "inputRowsPerSecond" : 0.0, >> "processedRowsPerSecond" : 0.0 >> } ], >> "sink" : { >> "description" : "org.apache.spark.sql.executio >> n.streaming.ConsoleSink@4818d2d9" >> } >> } >> { >> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >> "name" : "wiki", >> "timestamp" : "2017-01-26T22:55:25.760Z", >> "numInputRows" : 0, >> "inputRowsPerSecond" : 0.0, >> "processedRowsPerSecond" : 0.0, >> "durationMs" : { >> "getOffset" : 4, >> "triggerExecution" : 4 >> }, >> "stateOperators" : [ ], >> "sources" : [ { >> "description" : "KafkaSource[Subscribe[wikipedia]]", >> "startOffset" : { >> "wikipedia" : { >> "2" : 0, >> "4" : 0, >> "1" : 0, >> "3" : 0, >> "0" : 0 >> } >> }, >> "endOffset" : { >> "wikipedia" : { >> "2" : 0, >> "4" : 0, >> "1" : 0, >> "3" : 0, >> "0" : 0 >> } >> }, >> "numInputRows" : 0, >> "inputRowsPerSecond" : 0.0, >> "processedRowsPerSecond" : 0.0 >> } ], >> "sink" : { >> "description" : "org.apache.spark.sql.executio >> n.streaming.ConsoleSink@4818d2d9" >> } >> } >> { >> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >> "name" : "wiki", >> "timestamp" : "2017-01-26T22:55:35.766Z", >> "numInputRows" : 0, >> "inputRowsPerSecond" : 0.0, >> "processedRowsPerSecond" : 0.0, >> "durationMs" : { >> "getOffset" : 4, >> "triggerExecution" : 4 >> }, >> "stateOperators" : [ ], >> "sources" : [ { >> "description" : "KafkaSource[Subscribe[wikipedia]]", >> "startOffset" : { >> "wikipedia" : { >> "2" : 0, >> "4" : 0, >> "1" : 0, >> "3" : 0, >> "0" : 0 >> } >> }, >> "endOffset" : { >> "wikipedia" : { >> "2" : 0, >> "4" : 0, >> "1" : 0, >> "3" : 0, >> "0" : 0 >> } >> }, >> "numInputRows" : 0, >> "inputRowsPerSecond" : 0.0, >> "processedRowsPerSecond" : 0.0 >> } ], >> "sink" : { >> "description" : "org.apache.spark.sql.executio >> n.streaming.ConsoleSink@4818d2d9" >> } >> } >> >> >> On Thu, Jan 26, 2017 at 10:33 PM, Koert Kuipers <ko...@tresata.com> >> wrote: >> >>> hey, >>> i am just getting started with kafka + spark structured streaming. so >>> this is probably a pretty dumb mistake. >>> >>> i wrote a little program in spark to read messages from a kafka topic >>> and display them in the console, using the kafka source and console sink. i >>> run it it in spark local mode. >>> >>> i hooked it up to a test topic that i send messages to using the kafka >>> console producer, and everything works great. i type a message in the >>> console producer, and it pops up in my spark program. very neat! >>> >>> next i point it to another topic instead on which a kafka-connect >>> program is writing lots of irc messages. i can see kafka connect to the >>> topic successfully, the partitions are discovered etc., and then... >>> nothing. it just keeps stuck at offsets 0 for all partitions. at the same >>> time in another terminal i can see messages coming in just fine using the >>> kafka console consumer. >>> >>> i dont get it. why doesnt kafka want to consume from this topic in spark >>> structured streaming? >>> >>> thanks! koert >>> >>> >>> >> >