@Jeff Evans @Sean Owen
Both of these postings are examples of same object orientated concept. They examples of extraction of child Object from Parent Object. The difference is that when a Muslim asked he was told by Jeff Evans "we are not here handhold you." “do a simple Google search” “They're not being paid to handhold you and quickly answer to your every whim.” COMPARATIVELY BUT when the good Dr Mich Talebzadeh asked same. No humiliation or offensive comments. No comments at all. Hi, Thank you all, I am just thinking of passing that date 06/04/2020 12:03:43 and getting the correct format from the module. In effect This date format yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSZ as pattern in other words rather than new Date() pass "06/04/2020 12:03:43" as string REgards, Dr Mich Talebzadeh val fixedStr = "2020-06-04T12:03:43"; val dt = new DateTime(fixedStr); val jdkDate = dt.toDate(); val pattern3 = "dd yyyy MM HH:mm:ss.SSSSSSSSSZ"; val simpleDateFormat3 = (new SimpleDateFormat(pattern2, new Locale("en", "UK"))); val date3 = simpleDateFormat3.format(jdkDate); System.out.println(date3); On Sat, 28 Mar 2020, 15:50 Jeff Evans, <[hidden email]> wrote: Dude, you really need to chill. Have you ever worked with a large open source project before? It seems not. Even so, insinuating there are tons of bugs that were left uncovered until you came along (despite the fact that the project is used by millions across many different organizations) is ludicrous. Learn a little bit of humility If you're new to something, assume you have made a mistake rather than that there is a bug. Lurk a bit more, or even do a simple Google search, and you will realize Sean is a very senior committer (i.e. expert) in Spark, and has been for many years. He, and everyone else participating in these lists, is doing it voluntarily on their own time. They're not being paid to handhold you and quickly answer to your every whim. As you can see from the code : STEP 1: I create a object of type static frame which holds all the information to the datasource (csv files). STEP 2: Then I create a variable called staticSchema assigning the information of the schema from the original static data frame. STEP 3: then I create another variable called val streamingDataFrame of type spark.readStream. and Into the .schema function parameters I pass the object staticSchema which is meant to hold the information to the csv files including the .load(path) function etc. So then when I am creating val StreamingDataFrame and passing it .schema(staticSchema) the variable StreamingDataFrame should have all the information. I should only have to call .option("maxFilePerTrigger",1) and not .format ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv") Otherwise what is the point of passing .schema(staticSchema) to StreamingDataFrame. You can replicate it using the complete code below. import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{window,column,desc,col} object RetailData { def main(args: Array[String]): Unit = { // create spark session val spark = SparkSession.builder().master("spark://[192.168.0.38:7077](http://192.168.0.38:7077/)").appName("Retail Data").getOrCreate(); // set spark runtime configuration spark.conf.set("spark.sql.shuffle.partitions","5") spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True") // create a static frame val staticDataFrame = spark.read.format("csv") .option ("header","true") .option("inferschema","true") .load("/data/retail-data/by-day/*.csv") staticDataFrame.createOrReplaceTempView("retail_data") val staticSchema = staticDataFrame.schema staticDataFrame .selectExpr( "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate") .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day")) .sum("total_cost") .sort(desc("sum(total_cost)")) .show(2) val streamingDataFrame = spark.readStream .schema(staticSchema) .format("csv") .option("maxFilesPerTrigger", 1) .option("header","true") .load("/data/retail-data/by-day/*.csv") println(streamingDataFrame.isStreaming) // lazy operation so we will need to call a streaming action to start the action val purchaseByCustomerPerHour = streamingDataFrame .selectExpr( "CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate") .groupBy( col("CustomerId"), window(col("InvoiceDate"), "1 day")) .sum("total_cost") // stream action to write to console purchaseByCustomerPerHour.writeStream .format("console") .queryName("customer_purchases") .outputMode("complete") .start() } // main } // object Sent with [ProtonMail](https://protonmail.com) Secure Email.