With regards, Sagar Grover Phone - 7022175584 On Fri, Mar 16, 2018 at 12:15 AM, Aakash Basu <aakash.spark....@gmail.com> wrote:
> Awesome, thanks for detailing! > > Was thinking the same, we've to split by comma for csv while casting > inside. > > Cool! Shall try it and revert back tomm. > > Thanks a ton! > > On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com> > wrote: > >> To remain generic, the KafkaSource can only offer the lowest common >> denominator for a schema (topic, partition, offset, key, value, timestamp, >> timestampType). As such, you can't just feed it a StructType. When you are >> using a producer or consumer directly with Kafka, serialization and >> deserialization is often an orthogonal and implicit transform. However, in >> Spark, serialization and deserialization is an explicit transform (e.g., >> you define it in your query plan). >> >> >> To make this more granular, if we imagine your source is registered as a >> temp view named "foo": >> >> SELECT >> >> split(cast(value as string), ',')[0] as id, >> >> split(cast(value as string), ',')[1] as name >> >> FROM foo; >> >> >> Assuming you were providing the following messages to Kafka: >> >> 1,aakash >> >> 2,tathagata >> >> 3,chris >> >> >> You could make the query plan less repetitive. I don't believe Spark >> offers from_csv out of the box as an expression (although CSV is well >> supported as a data source). You could implement an expression by reusing a >> lot of the supporting CSV classes which may result in a better user >> experience vs. explicitly using split and array indices, etc. In this >> simple example, casting the binary to a string just works because there is >> a common understanding of string's encoded as bytes between Spark and Kafka >> by default. >> >> >> -Chris >> ------------------------------ >> *From:* Aakash Basu <aakash.spark....@gmail.com> >> *Sent:* Thursday, March 15, 2018 10:48:45 AM >> *To:* Bowden, Chris >> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user >> >> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query >> >> Hey Chris, >> >> You got it right. I'm reading a *csv *file from local as mentioned >> above, with a console producer on Kafka side. >> >> So, as it is a csv data with headers, shall I then use from_csv on the >> spark side and provide a StructType to shape it up with a schema and then >> cast it to string as TD suggested? >> >> I'm getting all of your points at a very high level. A little more >> granularity would help. >> >> *In the slide TD just shared*, PFA, I'm confused at the point where he >> is casting the value as string. Logically, the value shall consist of all >> the entire data set, so, suppose, I've a table with many columns, *how >> can I provide a single alias as he did in the groupBy. I missed it there >> itself. Another question is, do I have to cast in groupBy itself? Can't I >> do it directly in a select query? The last one, if the steps are followed, >> can I then run a SQL query on top of the columns separately?* >> >> Thanks, >> Aakash. >> >> >> On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com> >> wrote: >> >> You need to tell Spark about the structure of the data, it doesn't know >> ahead of time if you put avro, json, protobuf, etc. in kafka for the >> message format. If the messages are in json, Spark provides from_json out >> of the box. For a very simple POC you can happily cast the value to a >> string, etc. if you are prototyping and pushing messages by hand with a >> console producer on the kafka side. >> >> ________________________________________ >> From: Aakash Basu <aakash.spark....@gmail.com> >> Sent: Thursday, March 15, 2018 7:52:28 AM >> To: Tathagata Das >> Cc: Dylan Guedes; Georg Heiler; user >> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query >> >> Hi, >> >> And if I run this below piece of code - >> >> >> from pyspark.sql import SparkSession >> import time >> >> class test: >> >> >> spark = SparkSession.builder \ >> .appName("DirectKafka_Spark_Stream_Stream_Join") \ >> .getOrCreate() >> # ssc = StreamingContext(spark, 20) >> >> table1_stream = >> (spark.readStream.format("kafka").option("startingOffsets", >> "earliest").option("kafka.bootstrap.servers", >> "localhost:9092").option("subscribe", "test1").load()) >> >> table2_stream = ( >> spark.readStream.format("kafka").option("startingOffsets", >> "earliest").option("kafka.bootstrap.servers", >> >> "localhost:9092").option("subscribe", >> >> "test2").load()) >> >> joined_Stream = table1_stream.join(table2_stream, "Id") >> # >> # joined_Stream.show() >> >> # query = >> table1_stream.writeStream.format("console").start().awaitTermination() >> # .queryName("table_A").format("memory") >> # spark.sql("select * from table_A").show() >> time.sleep(10) # sleep 20 seconds >> # query.stop() >> # query >> >> >> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit >> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 >> Stream_Stream_Join.py >> >> >> >> >> I get the below error (in Spark 2.3.0) - >> >> Traceback (most recent call last): >> File >> "/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py", >> line 4, in <module> >> class test: >> File >> "/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py", >> line 19, in test >> joined_Stream = table1_stream.join(table2_stream, "Id") >> File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/ >> pyspark.zip/pyspark/sql/dataframe.py", line 931, in join >> File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/ >> py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ >> File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/ >> pyspark.zip/pyspark/sql/utils.py", line 69, in deco >> pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be >> resolved on the left side of the join. The left-side columns: [key, value, >> topic, partition, offset, timestamp, timestampType];' >> >> Seems, as per the documentation, they key and value are deserialized as >> byte arrays. >> >> I am badly stuck at this step, not many materials online, with steps to >> proceed on this, too. >> >> Any help, guys? >> >> Thanks, >> Aakash. >> >> >> On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu <aakash.spark....@gmail.com >> <mailto:aakash.spark....@gmail.com>> wrote: >> Any help on the above? >> >> On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark....@gmail.com >> <mailto:aakash.spark....@gmail.com>> wrote: >> Hi, >> >> I progressed a bit in the above mentioned topic - >> >> 1) I am feeding a CSV file into the Kafka topic. >> 2) Feeding the Kafka topic as readStream as TD's article suggests. >> 3) Then, simply trying to do a show on the streaming dataframe, using >> queryName('XYZ') in the writeStream and writing a sql query on top of it, >> but that doesn't show anything. >> 4) Once all the above problems are resolved, I want to perform a >> stream-stream join. >> >> The CSV file I'm ingesting into Kafka has - >> >> id,first_name,last_name >> 1,Kellyann,Moyne >> 2,Morty,Blacker >> 3,Tobit,Robardley >> 4,Wilona,Kells >> 5,Reggy,Comizzoli >> >> >> My test code - >> >> >> from pyspark.sql import SparkSession >> import time >> >> class test: >> >> >> spark = SparkSession.builder \ >> .appName("DirectKafka_Spark_Stream_Stream_Join") \ >> .getOrCreate() >> # ssc = StreamingContext(spark, 20) >> >> table1_stream = >> (spark.readStream.format("kafka").option("startingOffsets", >> "earliest").option("kafka.bootstrap.servers", >> "localhost:9092").option("subscribe", "test1").load()) >> >> # table2_stream = (spark.readStream.format("kafka").option(" >> kafka.bootstrap.servers", "localhost:9092").option("subscribe", >> "test2").load()) >> >> # joined_Stream = table1_stream.join(table2_stream, "Id") >> # >> # joined_Stream.show() >> >> query = >> table1_stream.writeStream.format("console").queryName("table_A").start() >> # .format("memory") >> # spark.sql("select * from table_A").show() >> # time.sleep(10) # sleep 20 seconds >> # query.stop() >> query.awaitTermination() >> >> >> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit >> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 >> Stream_Stream_Join.py >> >> >> The output I'm getting (whereas I simply want to show() my dataframe) - >> >> +----+--------------------+-----+---------+------+---------- >> ----------+-------------+ >> | key| value|topic|partition|offset| >> timestamp|timestampType| >> +----+--------------------+-----+---------+------+---------- >> ----------+-------------+ >> |null|[69 64 2C 66 69 7...|test1| 0| 5226|2018-03-15 15:48:...| >> 0| >> |null|[31 2C 4B 65 6C 6...|test1| 0| 5227|2018-03-15 15:48:...| >> 0| >> |null|[32 2C 4D 6F 72 7...|test1| 0| 5228|2018-03-15 15:48:...| >> 0| >> |null|[33 2C 54 6F 62 6...|test1| 0| 5229|2018-03-15 15:48:...| >> 0| >> |null|[34 2C 57 69 6C 6...|test1| 0| 5230|2018-03-15 15:48:...| >> 0| >> |null|[35 2C 52 65 67 6...|test1| 0| 5231|2018-03-15 15:48:...| >> 0| >> +----+--------------------+-----+---------+------+---------- >> ----------+-------------+ >> >> 18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: { >> "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8", >> "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21", >> "name" : "table_A", >> "timestamp" : "2018-03-15T10:18:07.218Z", >> "numInputRows" : 6, >> "inputRowsPerSecond" : 461.53846153846155, >> "processedRowsPerSecond" : 14.634146341463415, >> "durationMs" : { >> "addBatch" : 241, >> "getBatch" : 15, >> "getOffset" : 2, >> "queryPlanning" : 2, >> "triggerExecution" : 410, >> "walCommit" : 135 >> }, >> "stateOperators" : [ ], >> "sources" : [ { >> "description" : "KafkaSource[Subscribe[test1]]", >> "startOffset" : { >> "test1" : { >> "0" : 5226 >> } >> }, >> "endOffset" : { >> "test1" : { >> "0" : 5232 >> } >> }, >> "numInputRows" : 6, >> "inputRowsPerSecond" : 461.53846153846155, >> "processedRowsPerSecond" : 14.634146341463415 >> } ], >> "sink" : { >> "description" : "org.apache.spark.sql.executio >> n.streaming.ConsoleSink@3dfc7990" >> } >> } >> >> P.S - If I add the below piece in the code, it doesn't print a DF of the >> actual table. >> >> spark.sql("select * from table_A").show() >> >> Any help? >> >> >> Thanks, >> Aakash. >> >> On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu <aakash.spark....@gmail.com >> <mailto:aakash.spark....@gmail.com>> wrote: >> Thanks to TD, the savior! >> >> Shall look into it. >> >> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das < >> tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>> wrote: >> Relevant: https://databricks.com/blog/2018/03/13/introducing-stream-st >> ream-joins-in-apache-spark-2-3.html >> >> This is true stream-stream join which will automatically buffer delayed >> data and appropriately join stuff with SQL join semantics. Please check it >> out :) >> >> TD >> >> >> >> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com >> <mailto:djmggue...@gmail.com>> wrote: >> I misread it, and thought that you question was if pyspark supports kafka >> lol. Sorry! >> >> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <aakash.spark....@gmail.com >> <mailto:aakash.spark....@gmail.com>> wrote: >> Hey Dylan, >> >> Great! >> >> Can you revert back to my initial and also the latest mail? >> >> Thanks, >> Aakash. >> >> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com<mailto:d >> jmggue...@gmail.com>> wrote: >> Hi, >> >> I've been using the Kafka with pyspark since 2.1. >> >> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <aakash.spark....@gmail.com >> <mailto:aakash.spark....@gmail.com>> wrote: >> Hi, >> >> I'm yet to. >> >> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package >> allows Python? I read somewhere, as of now Scala and Java are the languages >> to be used. >> >> Please correct me if am wrong. >> >> Thanks, >> Aakash. >> >> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com<mailto: >> georg.kf.hei...@gmail.com>> wrote: >> Did you try spark 2.3 with structured streaming? There watermarking and >> plain sql might be really interesting for you. >> Aakash Basu <aakash.spark....@gmail.com<mailto:aakash.spark....@gmail.com>> >> schrieb am Mi. 14. März 2018 um 14:57: >> Hi, >> >> Info (Using): >> Spark Streaming Kafka 0.8 package >> Spark 2.2.1 >> Kafka 1.0.1 >> >> As of now, I am feeding paragraphs in Kafka console producer and my >> Spark, which is acting as a receiver is printing the flattened words, which >> is a complete RDD operation. >> >> My motive is to read two tables continuously (being updated) as two >> distinct Kafka topics being read as two Spark Dataframes and join them >> based on a key and produce the output. (I am from Spark-SQL background, >> pardon my Spark-SQL-ish writing) >> >> It may happen, the first topic is receiving new data 15 mins prior to the >> second topic, in that scenario, how to proceed? I should not lose any data. >> >> As of now, I want to simply pass paragraphs, read them as RDD, convert to >> DF and then join to get the common keys as the output. (Just for R&D). >> >> Started using Spark Streaming and Kafka today itself. >> >> Please help! >> >> Thanks, >> Aakash. >> >> >> >> >> >> >> >> >>