Hi Rimin, I have to admit that I don't really understand what you're trying to achieve. Could you try again to explain your problem?
Cheers, Till On Tue, Sep 6, 2016 at 3:53 PM, <rimin...@sina.cn> wrote: > think your anwser. > but i can not get your ideal."If all elements of "words2" have been > processed, the right side of your coGroup will always be empty no matter > what is incoming in your socketTextStream.",the mean i can not get. > the following is the ideal from me(it maybe error): > the coGroup will create new dataStream,T1 and T2,this must use > GlobalWindows to store all elements from T2,if use timeWindow or others,the > T2's element will not all store. > ------------------ > T1, T2 > ------------------ > and into apply function,get result, > when input first element,the T1 will add one element, > --------------------------------- > T1(+first), T2 > --------------------------------- > and into apply function,get result. > when input second element,the T1 will add one element, > --------------------------------------- > T1(+first+second), T2 > ----------------------------------------- > and into apply function,get result. > *************************************************** > but,in fact ,i want to get the datastream like this, > ------------------------------- > T1, T2 > -------------------------------- > when input first ,is follow: > -------------------------------- > T1(+first), T2 > --------------------------------- > when input second, is follow: > -------------------------------- > T1(+second), T2 > -------------------------------- > so the first must fired,this is my intention. > and i try to cut socket input datastream,use countWindow or timewindow,it > is not work,when use coGroup,the datastream is T1 and T2,they are a whole > ,so i think must window the coGroup. > ----- 原始邮件 ----- > 发件人:Timo Walther <twal...@apache.org> > 收件人:user@flink.apache.org > 主题:Re: 回复:Re: modify coGroup GlobalWindows_GlobalWindow > 日期:2016年09月06日 20点52分 > > I think you have to rethink your approach. In your example "words2" is a > stream but only with a fixed set of elements. If all elements of "words2" > have been processed, the right side of your coGroup will always be empty no > matter what is incoming in your socketTextStream. > It is not read in over and over again. Is that your intention? > > > Am 06/09/16 um 13:15 schrieb rimin...@sina.cn: > > i try read data into a list or List[Map] to store the T2,but i think if > use list or List[Map],there is not parallelization,so i want to use coGroup. > other hand,the coGroup function is join the T1 and T2,and must have window > and trigger method,the window is cut the T1 and T2, > the trigger is trigger the apply function when input to the trigger > threshold. > from the result,in apply(), i use my InnerJoinFunction,and output the T1 > and T2,we can see when input data,and trigger the apply,into the > InnerJoinFunction,the T1 and T2 will output, > the T1 is increase,and T2 is not change, so the window cut the T1 and T2 > do not achieve mine goal,so i want to write my "GlobalWindows.create()". > and Flink's operator state i have no ideal for it,and really do not know > how to use it.can you give me a example. > ----- 原始邮件 ----- > 发件人:Timo Walther <twal...@apache.org> <twal...@apache.org> > 收件人:user@flink.apache.org > 主题:Re: modify coGroup GlobalWindows GlobalWindow > 日期:2016年09月06日 17点52分 > > Hi, > > will words2 always remain constant? If yes, you don't have to create a > stream out of it and coGroup it, but you could simply pass the collection > to Map/FlatMap function and do the joining there without the need of a > window. Btw. you know that non-keyed global windows do not scale? > If I understand your code correctly, you just want to get a stream with > the last T2, right? I don't think you have to implement your own > "GlobalWindow" for that. Have you tried to use Flink's operator state for > that? So that if the state is growing it can be written to disk. > > Hope that helps. > > Timo > > Am 06/09/16 um 10:05 schrieb rimin...@sina.cn: > > Hi, > the follow code: > > val text = env.socketTextStream(hostName, port) > val words1 = text.map { x => > val res = x.split(",") > (res.apply(0)->res.apply(1)) > } > > val words2 = env.fromElements(("a","w1"),(" > a","w2"),("c","w3"),("d","w4")) > val joinedStream = words1 > .coGroup(words2) > .where(_._1) > .equalTo(_._1) > .window(GlobalWindows.create()) > .trigger(CountTrigger.of(1)) > > val res = joinedStream.apply(new InnerJoinFunction).print() > > env.execute() > > > class InnerJoinFunction extends CoGroupFunction[(String, > String),(String,String),(String,String)]{ > > override def coGroup(T1: java.lang.Iterable[(String,String)], > T2: java.lang.Iterable[(String,String)], > out: Collector[(String, String)]): Unit = { > println("****************************") > println("T1="+T1+"T2="+T2) > import scala.collection.JavaConverters._ > val scalaT2 = T2.asScala.toList > if(!T1.asScala.isEmpty && scalaT2.nonEmpty){ > val transaction = T1.asScala.last > println("T2 last="+transaction) > for(snapshot <- scalaT2){ > out.collect(transaction._1,transaction._2+snapshot._2) > } > } > } > } > -------------------------------- > the code have no proplem,and can run,the follow is the result:(input "a,1" > then input "a,2") > > **************************** > T1=[(a,1)]T2=[(a,w2), (a,w1)] > T2 last=(a,1) > 2> (a,1w2) > 2> (a,1w1) > **************************** > T1=[(a,1), (a,2)]T2=[(a,w2), (a,w1)] > T2 last=(a,2) > 2> (a,2w2) > 2> (a,2w1) > -------------------------------------------------- > the T1 is increase,and T2 is not change.i worry,when input so many,the T1 > will out of storage. > so i want to write my "GlobalWindows.create()", to achieve T1 will store > the only one,from input(or read from kafka),and the history of T1 will > clear(input a,1 T1 is [(a,1)],then input a,2,T1 is [(a,2)],not T1=[(a,1), > (a,2)]),but T2 will not change. > > i rewrite the "GlobalWindows",but it do not work,i read the code,find must > rewrite the "GlobalWindow",and must modify "the class Serializer extends > TypeSerializer<MyGlobalWindow>",but when i run,it can not into there,why? > some can tell me? > > > > -- > Freundliche Grüße / Kind Regards > > Timo Walther > > Follow me: @twalthrhttps://www.linkedin.com/in/twalthr > > > > -- > Freundliche Grüße / Kind Regards > > Timo Walther > > Follow me: @twalthrhttps://www.linkedin.com/in/twalthr > >