Hi, the follow code: val text = env.socketTextStream(hostName, port) val words1 = text.map { x => val res = x.split(",") (res.apply(0)->res.apply(1)) } val words2 = env.fromElements(("a","w1"),("a","w2"),("c","w3"),("d","w4")) val joinedStream = words1 .coGroup(words2) .where(_._1) .equalTo(_._1) .window(GlobalWindows.create()) .trigger(CountTrigger.of(1)) val res = joinedStream.apply(new InnerJoinFunction).print() env.execute()
class InnerJoinFunction extends CoGroupFunction[(String,String),(String,String),(String,String)]{ override def coGroup(T1: java.lang.Iterable[(String,String)], T2: java.lang.Iterable[(String,String)], out: Collector[(String, String)]): Unit = { println("****************************") println("T1="+T1+"T2="+T2) import scala.collection.JavaConverters._ val scalaT2 = T2.asScala.toList if(!T1.asScala.isEmpty && scalaT2.nonEmpty){ val transaction = T1.asScala.last println("T2 last="+transaction) for(snapshot <- scalaT2){ out.collect(transaction._1,transaction._2+snapshot._2) } } } }-------------------------------- the code have no proplem,and can run,the follow is the result:(input "a,1" then input "a,2") ****************************T1=[(a,1)]T2=[(a,w2), (a,w1)]T2 last=(a,1)2> (a,1w2)2> (a,1w1)****************************T1=[(a,1), (a,2)]T2=[(a,w2), (a,w1)]T2 last=(a,2)2> (a,2w2)2> (a,2w1) --------------------------------------------------the T1 is increase,and T2 is not change.i worry,when input so many,the T1 will out of storage.so i want to write my "GlobalWindows.create()", to achieve T1 will store the only one,from input(or read from kafka),and the history of T1 will clear(input a,1 T1 is [(a,1)],then input a,2,T1 is [(a,2)],not T1=[(a,1), (a,2)]),but T2 will not change. i rewrite the "GlobalWindows",but it do not work,i read the code,find must rewrite the "GlobalWindow",and must modify "the class Serializer extends TypeSerializer<MyGlobalWindow>",but when i run,it can not into there,why? some can tell me?