Hi,      the follow code:
    val text = env.socketTextStream(hostName, port)    val words1 = text.map { 
x =>      val res = x.split(",")      (res.apply(0)->res.apply(1))    }        
val words2 = env.fromElements(("a","w1"),("a","w2"),("c","w3"),("d","w4"))    
val joinedStream = words1      .coGroup(words2)      .where(_._1)      
.equalTo(_._1)      .window(GlobalWindows.create())      
.trigger(CountTrigger.of(1))
      val res = joinedStream.apply(new InnerJoinFunction).print()
    env.execute()

class InnerJoinFunction extends 
CoGroupFunction[(String,String),(String,String),(String,String)]{       
override def coGroup(T1: java.lang.Iterable[(String,String)],         T2: 
java.lang.Iterable[(String,String)],         out: Collector[(String, String)]): 
Unit = {        println("****************************")        
println("T1="+T1+"T2="+T2)      import scala.collection.JavaConverters._      
val scalaT2 = T2.asScala.toList      if(!T1.asScala.isEmpty && 
scalaT2.nonEmpty){          val transaction = T1.asScala.last           
println("T2 last="+transaction)          for(snapshot <- scalaT2){            
out.collect(transaction._1,transaction._2+snapshot._2)          }      }    }  
}--------------------------------

the code have no proplem,and can run,the follow is the result:(input "a,1" then 
input "a,2")
****************************T1=[(a,1)]T2=[(a,w2), (a,w1)]T2 last=(a,1)2> 
(a,1w2)2> (a,1w1)****************************T1=[(a,1), (a,2)]T2=[(a,w2), 
(a,w1)]T2 last=(a,2)2> (a,2w2)2> (a,2w1)
--------------------------------------------------the T1 is increase,and T2 is 
not change.i worry,when  input so many,the T1 will out of storage.so i want to 
write my "GlobalWindows.create()", to achieve T1 will store the only one,from 
input(or read from kafka),and the history of T1 will clear(input a,1 T1 is 
[(a,1)],then input a,2,T1 is [(a,2)],not T1=[(a,1), (a,2)]),but T2 will not 
change.
i rewrite the "GlobalWindows",but it do not work,i read the code,find must 
rewrite the "GlobalWindow",and must modify "the class Serializer extends 
TypeSerializer<MyGlobalWindow>",but when i run,it can not into there,why? some 
can tell me?

Reply via email to