My data looks like this:
{ "ts2" : "2018/05/01 00:02:50.041", "serviceGroupId" : "123", "userId" : "avv-0", "stream" : "", "lastUserActivity" : "00:02:50", "lastUserActivityCount" : "0" } { "ts2" : "2018/05/01 00:09:02.079", "serviceGroupId" : "123", "userId" : "avv-0", "stream" : "", "lastUserActivity" : "00:09:02", "lastUserActivityCount" : "0" } { "ts2" : "2018/05/01 00:09:02.086", "serviceGroupId" : "123", "userId" : "avv-2", "stream" : "", "lastUserActivity" : "00:09:02", "lastUserActivityCount" : "0" } ... And my aggregation is : val sdvTuneInsAgg1 = df .withWatermark("ts2", "10 seconds") .groupBy(window(col("ts2"),"10 seconds")) .agg(count("*") as "count") .as[CountMetric1] But, the only anomaly is that the current date is 2018/05/24 but the record ts2 has old dates. Will aggregation / count work in this scenario ? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org