This might be a way forward but since side inputs are not there I will try and key the control stream by the keys in the first co flat map.
I'll see how it goes. Thanks guys, M On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <tony19920...@gmail.com> wrote: > Hi Martin, > > Yes, that is exactly what I thought. > But the first step also needs to be fulfilled by SideInput. I'm not sure > how to achieve this in the current release. > > Best, > Tony Wei > > Martin Eden <martineden...@gmail.com>於 2017年8月31日 週四,下午11:32寫道: > >> Hi Aljoscha, Tony, >> >> Aljoscha: >> Yes it's the first option you mentioned. >> Yes, the stream has multiple values in flight for A, B, C. f1 needs to be >> applied each time a new value for either A, B or C comes in. So we need to >> use state to cache the latest values. So using the example data stream in >> my first msg the emitted stream should be: >> >> 1. Data Stream: >> KEY VALUE TIME >> . >> . >> . >> C V6 6 >> B V6 6 >> A V5 5 >> A V4 4 >> C V3 3 >> A V3 3 >> B V3 3 >> B V2 2 >> A V1 1 >> >> 2. Control Stream: >> Lambda ArgumentKeys TIME >> . >> . >> . >> f2 [A, C] 4 >> f1 [A, B, C] 1 >> >> 3. Expected emitted stream: >> TIME VALUE >> . >> . >> . >> 6 f1(V5, V6, V3) >> f1(V5, V6, V6) >> f2(V5, V6) >> 5 f1(V5, V3, V3) >> f2(V5, V3) >> 4 f1(V4, V3, V3) >> f2(V4, V3) >> 3 f1(V3, V3, V3) >> 2 - >> 1 - >> >> So essentially as soon as the argument list fills up then we apply the >> function/lambda at each new arriving message in the data stream for either >> argument key. >> >> Tony: >> Yes we need to group by and pass to the lambda. >> Ok, so what you are proposing might work. So your solution assumes that >> we have to connect with the control stream twice? Once for the tagging and >> another time re-connect-ing the control stream with the tagged stream for >> the actual application of the function/lambda? >> >> Thanks, >> Alex >> >> >> >> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi Martin, >>> >>> In your original example, what does this syntax mean exactly: >>> >>> f1 [A, B, C] 1 >>> >>> Does it mean that f1 needs one A, one B and one C from the main stream? >>> If yes, which ones, because there are multiple As and Bs and so on. Or does >>> it mean that f1 can apply to an A or a B or a C? If it's the first, then I >>> think it's quite hard to find a partitioning such that both f1, f2, and all >>> A, B, and C go to the same machine. >>> >>> Best, >>> Aljoscha >>> >>> On 31. Aug 2017, at 15:53, Tony Wei <tony19920...@gmail.com> wrote: >>> >>> Hi Martin, >>> >>> So the problem is that you want to group those arguments in Data Stream >>> and pass them to the lambda function from Control Stream at the same time. >>> Am I right? >>> >>> If right, then you could give each lambda function an id as well. Use >>> these ids to tag those arguments to which they belong. >>> After that, keyBy function could be used to group those arguments >>> belonging to the same lambda function. Joining this stream with Control >>> Stream by function id could make arguments and function be in the same >>> instance. >>> >>> What do you think? Could this solution solve your problem? >>> >>> Best, >>> Tony Wei >>> >>> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com>: >>> >>>> Thanks for your reply Tony, >>>> >>>> Yes we are in the latter case, where the functions/lambdas come in the >>>> control stream. Think of them as strings containing the logic of the >>>> function. The values for each of the arguments to the function come from >>>> the data stream. That is why we need to co-locate the data stream messages >>>> for the corresponding keys with the control message that has the function >>>> to be applied. >>>> >>>> We have a way of interpreting the logic described in the string and >>>> executing it on the incoming values from the data stream. This is kicked >>>> off from within the Flink runtime (synchronous to a flatMap of the >>>> RichCoFlatMapFunction) but is not using Flink predefined operators or >>>> functions. >>>> >>>> So yeah I see your point about mapping the arguments but the problem is >>>> not really that, the problem is making sure that the values in the control >>>> stream are in the same instance of the task/ keyed managed state as a the >>>> actual control stream message. Once they are we can pass them in. >>>> >>>> Any other thoughts? >>>> >>>> M >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com> >>>> wrote: >>>> >>>>> Hi Martin, >>>>> >>>>> About problem 2. How were those lambda functions created? Pre-defined >>>>> functions / operators or automatically generated based on the message from >>>>> Control Stream? >>>>> >>>>> For the former, you could give each function one id and user flapMap >>>>> to duplicate data with multiple ids. Then, you could use filter function >>>>> and send them to the corresponding operators. >>>>> >>>>> For the general case like the latter, because you had broadcasted the >>>>> messages to all tasks, it could always build a mapping table from argument >>>>> keys to lambda functions in each sub-task and use the map to process the >>>>> data. But I was wondering if it is possible to generate a completely new >>>>> function in the runtime. >>>>> >>>>> Best, >>>>> Tony Wei >>>>> >>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden...@gmail.com>: >>>>> >>>>>> Thanks for your reply Tony. >>>>>> >>>>>> So there are actually 2 problems to solve: >>>>>> >>>>>> 1. All control stream msgs need to be broadcasted to all tasks. >>>>>> >>>>>> 2. The data stream messages with the same keys as those specified in >>>>>> the control message need to go to the same task as well, so that all the >>>>>> values required for the lambda (i.e. functions f1, f2 ...) are there. >>>>>> >>>>>> In my understanding side inputs (which are actually not available in >>>>>> the current release) would address problem 1. >>>>>> >>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect( >>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but I >>>>>> get a runtime exception telling me I still need to do a keyBy before the >>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a >>>>>> control stream to all tasks of a coFlatMap? Or is there another way? >>>>>> >>>>>> As for problem 2, I am still pending a reply. Would appreciate if >>>>>> anyone has some suggestions. >>>>>> >>>>>> Thanks, >>>>>> M >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Martin, >>>>>>> >>>>>>> Let me understand your question first. >>>>>>> You have two Stream: Data Stream and Control Stream and you want to >>>>>>> select data in Data Stream based on the key set got from Control Stream. >>>>>>> >>>>>>> If I were not misunderstanding your question, I think SideInput is >>>>>>> what you want. >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- >>>>>>> 17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI- >>>>>>> StoringSide-InputData >>>>>>> It lets you to define one stream as a SideInput and can be assigned >>>>>>> to the other stream, then the data in SideInput stream will be >>>>>>> broadcasted. >>>>>>> >>>>>>> So far, I have no idea if there is any solution to solve this >>>>>>> without SideInput. >>>>>>> >>>>>>> Best, >>>>>>> Tony Wei >>>>>>> >>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com>: >>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I am trying to implement the following using Flink: >>>>>>>> >>>>>>>> I have 2 input message streams: >>>>>>>> >>>>>>>> 1. Data Stream: >>>>>>>> KEY VALUE TIME >>>>>>>> . >>>>>>>> . >>>>>>>> . >>>>>>>> C V6 6 >>>>>>>> B V6 6 >>>>>>>> A V5 5 >>>>>>>> A V4 4 >>>>>>>> C V3 3 >>>>>>>> A V3 3 >>>>>>>> B V3 3 >>>>>>>> B V2 2 >>>>>>>> A V1 1 >>>>>>>> >>>>>>>> 2. Control Stream: >>>>>>>> Lambda ArgumentKeys TIME >>>>>>>> . >>>>>>>> . >>>>>>>> . >>>>>>>> f2 [A, C] 4 >>>>>>>> f1 [A, B, C] 1 >>>>>>>> >>>>>>>> I want to apply the lambdas coming in the control stream to the >>>>>>>> selection of keys that are coming in the data stream. >>>>>>>> >>>>>>>> Since we have 2 streams I naturally thought of connecting them >>>>>>>> using .connect. For this I need to key both of them by a certain >>>>>>>> criteria. >>>>>>>> And here lies the problem, how can I make sure the messages with keys >>>>>>>> A,B,C >>>>>>>> specified in the control stream end up in the same task as well as the >>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to >>>>>>>> key >>>>>>>> by to achieve this. >>>>>>>> >>>>>>>> I suspect a custom partitioner is required that partitions the data >>>>>>>> stream based on the messages in the control stream? Is this even >>>>>>>> possible? >>>>>>>> >>>>>>>> Any suggestions welcomed! >>>>>>>> >>>>>>>> Thanks, >>>>>>>> M >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >>