Hi,
will words2 always remain constant? If yes, you don't have to create a
stream out of it and coGroup it, but you could simply pass the
collection to Map/FlatMap function and do the joining there without the
need of a window. Btw. you know that non-keyed global windows do not scale?
If I understand your code correctly, you just want to get a stream with
the last T2, right? I don't think you have to implement your own
"GlobalWindow" for that. Have you tried to use Flink's operator state
for that? So that if the state is growing it can be written to disk.
Hope that helps.
Timo
Am 06/09/16 um 10:05 schrieb rimin...@sina.cn:
Hi,
the follow code:
val text = env.socketTextStream(hostName, port)
val words1 = text.map { x =>
val res = x.split(",")
(res.apply(0)->res.apply(1))
}
val words2 =
env.fromElements(("a","w1"),("a","w2"),("c","w3"),("d","w4"))
val joinedStream = words1
.coGroup(words2)
.where(_._1)
.equalTo(_._1)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(1))
val res = joinedStream.apply(new InnerJoinFunction).print()
env.execute()
class InnerJoinFunction extends
CoGroupFunction[(String,String),(String,String),(String,String)]{
override def coGroup(T1: java.lang.Iterable[(String,String)],
T2: java.lang.Iterable[(String,String)],
out: Collector[(String, String)]): Unit = {
println("****************************")
println("T1="+T1+"T2="+T2)
import scala.collection.JavaConverters._
val scalaT2 = T2.asScala.toList
if(!T1.asScala.isEmpty && scalaT2.nonEmpty){
val transaction = T1.asScala.last
println("T2 last="+transaction)
for(snapshot <- scalaT2){
out.collect(transaction._1,transaction._2+snapshot._2)
}
}
}
}
--------------------------------
the code have no proplem,and can run,the follow is the result:(input
"a,1" then input "a,2")
****************************
T1=[(a,1)]T2=[(a,w2), (a,w1)]
T2 last=(a,1)
2> (a,1w2)
2> (a,1w1)
****************************
T1=[(a,1), (a,2)]T2=[(a,w2), (a,w1)]
T2 last=(a,2)
2> (a,2w2)
2> (a,2w1)
--------------------------------------------------
the T1 is increase,and T2 is not change.i worry,when input so
many,the T1 will out of storage.
so i want to write my "GlobalWindows.create()", to achieve T1 will
store the only one,from input(or read from kafka),and the history of
T1 will clear(input a,1 T1 is [(a,1)],then input a,2,T1 is [(a,2)],not
T1=[(a,1), (a,2)]),but T2 will not change.
i rewrite the "GlobalWindows",but it do not work,i read the code,find
must rewrite the "GlobalWindow",and must modify "the class Serializer
extends TypeSerializer<MyGlobalWindow>",but when i run,it can not into
there,why? some can tell me?
--
Freundliche Grüße / Kind Regards
Timo Walther
Follow me: @twalthr
https://www.linkedin.com/in/twalthr