Hi all , I want to create union of 2 DStreams , in one of them *RDD is created per 1 second* , other is having RDD generated by reduceByWindowandKey with *duration set to 60 sec.* (slide duration also 60 sec .)
- Main idea is to do some analysis for every minute data and emitting union of input data (per sec.) and transformed data (per min.) . *Code is -* JavaPairDStream<String, String> windowedGridCounts = GridtoPair.reduceByKeyAndWindow(new Function2<String, String, String>() { @Override public String call(String i1, String i2) { long id1= MsgIdAddandRemove.getMessageId(i1); long id2= MsgIdAddandRemove.getMessageId(i2); Float v1= Float.parseFloat(MsgIdAddandRemove.getMessageContent(i1)); Float v2= Float.parseFloat(MsgIdAddandRemove.getMessageContent(i1)); String res= String.valueOf(v1+v2); if(id1>id2) { return MsgIdAddandRemove.addMessageId(res, id1); } else{ return MsgIdAddandRemove.addMessageId(res,id2); } }}, *Durations.seconds(60),Durations.seconds(60));* *JavaDStream<String> UnionStream=tollPercent.union(underPay).union(taxSumPercent);* *Getting the following error -* *Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Some of the DStreams have different slide durations at scala.Predef$.require(Predef.scala:233) at org.apache.spark.streaming.dstream.UnionDStream.<init>(UnionDStream.scala:33) at org.apache.spark.streaming.dstream.DStream$$anonfun$union$1.apply(DStream.scala:849) at org.apache.spark.streaming.dstream.DStream$$anonfun$union$1.apply(DStream.scala:849) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)* -- Thanks & Regards, Anshu Shukla