Hi,
I progressed a bit in the above mentioned topic -
1) I am feeding a CSV file into the Kafka topic.
2) Feeding the Kafka topic as readStream as TD's article suggests.
3) Then, simply trying to do a show on the streaming dataframe, using
queryName('XYZ') in the writeStream and writing a sql query on top of it,
but that doesn't show anything.
4) Once all the above problems are resolved, I want to perform a
stream-stream join.
The CSV file I'm ingesting into Kafka has -
id,first_name,last_name
1,Kellyann,Moyne
2,Morty,Blacker
3,Tobit,Robardley
4,Wilona,Kells
5,Reggy,Comizzoli
My test code -
from pyspark.sql import SparkSession
import time
class test:
spark = SparkSession.builder \
.appName("DirectKafka_Spark_Stream_Stream_Join") \
.getOrCreate()
# ssc = StreamingContext(spark, 20)
table1_stream =
(spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test1").load())
# table2_stream =
(spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test2").load())
# joined_Stream = table1_stream.join(table2_stream, "Id")
#
# joined_Stream.show()
query =
table1_stream.writeStream.format("console").queryName("table_A").start()
# .format("memory")
# spark.sql("select * from table_A").show()
# time.sleep(10) # sleep 20 seconds
# query.stop()
query.awaitTermination()
# /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Stream_Stream_Join.py
The output I'm getting (whereas I simply want to show() my dataframe) -
+----+--------------------+-----+---------+------+--------------------+-------------+
| key| value|topic|partition|offset|
timestamp|timestampType|
+----+--------------------+-----+---------+------+--------------------+-------------+
|null|[69 64 2C 66 69 7...|test1| 0| 5226|2018-03-15
15:48:...| 0|
|null|[31 2C 4B 65 6C 6...|test1| 0| 5227|2018-03-15
15:48:...| 0|
|null|[32 2C 4D 6F 72 7...|test1| 0| 5228|2018-03-15
15:48:...| 0|
|null|[33 2C 54 6F 62 6...|test1| 0| 5229|2018-03-15
15:48:...| 0|
|null|[34 2C 57 69 6C 6...|test1| 0| 5230|2018-03-15
15:48:...| 0|
|null|[35 2C 52 65 67 6...|test1| 0| 5231|2018-03-15
15:48:...| 0|
+----+--------------------+-----+---------+------+--------------------+-------------+
18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: {
"id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8",
"runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21",
"name" : "table_A",
"timestamp" : "2018-03-15T10:18:07.218Z",
"numInputRows" : 6,
"inputRowsPerSecond" : 461.53846153846155,
"processedRowsPerSecond" : 14.634146341463415,
"durationMs" : {
"addBatch" : 241,
"getBatch" : 15,
"getOffset" : 2,
"queryPlanning" : 2,
"triggerExecution" : 410,
"walCommit" : 135
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[test1]]",
"startOffset" : {
"test1" : {
"0" : 5226
}
},
"endOffset" : {
"test1" : {
"0" : 5232
}
},
"numInputRows" : 6,
"inputRowsPerSecond" : 461.53846153846155,
"processedRowsPerSecond" : 14.634146341463415
} ],
"sink" : {
"description" :
"org.apache.spark.sql.execution.streaming.ConsoleSink@3dfc7990"
}
}
P.S - If I add the below piece in the code, it doesn't print a DF of the
actual table.
spark.sql("select * from table_A").show()
Any help?
Thanks,
Aakash.
On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu <[email protected]>
wrote:
> Thanks to TD, the savior!
>
> Shall look into it.
>
> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <
> [email protected]> wrote:
>
>> Relevant: https://databricks.com/blog/2018/03/13/introducing
>> -stream-stream-joins-in-apache-spark-2-3.html
>>
>> This is true stream-stream join which will automatically buffer delayed
>> data and appropriately join stuff with SQL join semantics. Please check it
>> out :)
>>
>> TD
>>
>>
>>
>> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <[email protected]>
>> wrote:
>>
>>> I misread it, and thought that you question was if pyspark supports
>>> kafka lol. Sorry!
>>>
>>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <[email protected]
>>> > wrote:
>>>
>>>> Hey Dylan,
>>>>
>>>> Great!
>>>>
>>>> Can you revert back to my initial and also the latest mail?
>>>>
>>>> Thanks,
>>>> Aakash.
>>>>
>>>> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <[email protected]> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I've been using the Kafka with pyspark since 2.1.
>>>>>
>>>>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm yet to.
>>>>>>
>>>>>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
>>>>>> allows Python? I read somewhere, as of now Scala and Java are the
>>>>>> languages
>>>>>> to be used.
>>>>>>
>>>>>> Please correct me if am wrong.
>>>>>>
>>>>>> Thanks,
>>>>>> Aakash.
>>>>>>
>>>>>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Did you try spark 2.3 with structured streaming? There watermarking
>>>>>>> and plain sql might be really interesting for you.
>>>>>>> Aakash Basu <[email protected]> schrieb am Mi. 14. März
>>>>>>> 2018 um 14:57:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>>>>>>
>>>>>>>> *Spark 2.2.1*
>>>>>>>> *Kafka 1.0.1*
>>>>>>>>
>>>>>>>> As of now, I am feeding paragraphs in Kafka console producer and my
>>>>>>>> Spark, which is acting as a receiver is printing the flattened words,
>>>>>>>> which
>>>>>>>> is a complete RDD operation.
>>>>>>>>
>>>>>>>> *My motive is to read two tables continuously (being updated) as
>>>>>>>> two distinct Kafka topics being read as two Spark Dataframes and join
>>>>>>>> them
>>>>>>>> based on a key and produce the output. *(I am from Spark-SQL
>>>>>>>> background, pardon my Spark-SQL-ish writing)
>>>>>>>>
>>>>>>>> *It may happen, the first topic is receiving new data 15 mins prior
>>>>>>>> to the second topic, in that scenario, how to proceed? I should not
>>>>>>>> lose
>>>>>>>> any data.*
>>>>>>>>
>>>>>>>> As of now, I want to simply pass paragraphs, read them as RDD,
>>>>>>>> convert to DF and then join to get the common keys as the output.
>>>>>>>> (Just for
>>>>>>>> R&D).
>>>>>>>>
>>>>>>>> Started using Spark Streaming and Kafka today itself.
>>>>>>>>
>>>>>>>> Please help!
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Aakash.
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>
>