Hi Martin, Yes, that is exactly what I thought. But the first step also needs to be fulfilled by SideInput. I'm not sure how to achieve this in the current release.
Best, Tony Wei Martin Eden <martineden...@gmail.com>於 2017年8月31日 週四,下午11:32寫道: > Hi Aljoscha, Tony, > > Aljoscha: > Yes it's the first option you mentioned. > Yes, the stream has multiple values in flight for A, B, C. f1 needs to be > applied each time a new value for either A, B or C comes in. So we need to > use state to cache the latest values. So using the example data stream in > my first msg the emitted stream should be: > > 1. Data Stream: > KEY VALUE TIME > . > . > . > C V6 6 > B V6 6 > A V5 5 > A V4 4 > C V3 3 > A V3 3 > B V3 3 > B V2 2 > A V1 1 > > 2. Control Stream: > Lambda ArgumentKeys TIME > . > . > . > f2 [A, C] 4 > f1 [A, B, C] 1 > > 3. Expected emitted stream: > TIME VALUE > . > . > . > 6 f1(V5, V6, V3) > f1(V5, V6, V6) > f2(V5, V6) > 5 f1(V5, V3, V3) > f2(V5, V3) > 4 f1(V4, V3, V3) > f2(V4, V3) > 3 f1(V3, V3, V3) > 2 - > 1 - > > So essentially as soon as the argument list fills up then we apply the > function/lambda at each new arriving message in the data stream for either > argument key. > > Tony: > Yes we need to group by and pass to the lambda. > Ok, so what you are proposing might work. So your solution assumes that we > have to connect with the control stream twice? Once for the tagging and > another time re-connect-ing the control stream with the tagged stream for > the actual application of the function/lambda? > > Thanks, > Alex > > > > On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi Martin, >> >> In your original example, what does this syntax mean exactly: >> >> f1 [A, B, C] 1 >> >> Does it mean that f1 needs one A, one B and one C from the main stream? >> If yes, which ones, because there are multiple As and Bs and so on. Or does >> it mean that f1 can apply to an A or a B or a C? If it's the first, then I >> think it's quite hard to find a partitioning such that both f1, f2, and all >> A, B, and C go to the same machine. >> >> Best, >> Aljoscha >> >> On 31. Aug 2017, at 15:53, Tony Wei <tony19920...@gmail.com> wrote: >> >> Hi Martin, >> >> So the problem is that you want to group those arguments in Data Stream >> and pass them to the lambda function from Control Stream at the same time. >> Am I right? >> >> If right, then you could give each lambda function an id as well. Use >> these ids to tag those arguments to which they belong. >> After that, keyBy function could be used to group those arguments >> belonging to the same lambda function. Joining this stream with Control >> Stream by function id could make arguments and function be in the same >> instance. >> >> What do you think? Could this solution solve your problem? >> >> Best, >> Tony Wei >> >> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com>: >> >>> Thanks for your reply Tony, >>> >>> Yes we are in the latter case, where the functions/lambdas come in the >>> control stream. Think of them as strings containing the logic of the >>> function. The values for each of the arguments to the function come from >>> the data stream. That is why we need to co-locate the data stream messages >>> for the corresponding keys with the control message that has the function >>> to be applied. >>> >>> We have a way of interpreting the logic described in the string and >>> executing it on the incoming values from the data stream. This is kicked >>> off from within the Flink runtime (synchronous to a flatMap of the >>> RichCoFlatMapFunction) but is not using Flink predefined operators or >>> functions. >>> >>> So yeah I see your point about mapping the arguments but the problem is >>> not really that, the problem is making sure that the values in the control >>> stream are in the same instance of the task/ keyed managed state as a the >>> actual control stream message. Once they are we can pass them in. >>> >>> Any other thoughts? >>> >>> M >>> >>> >>> >>> >>> >>> >>> >>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com> >>> wrote: >>> >>>> Hi Martin, >>>> >>>> About problem 2. How were those lambda functions created? Pre-defined >>>> functions / operators or automatically generated based on the message from >>>> Control Stream? >>>> >>>> For the former, you could give each function one id and user flapMap to >>>> duplicate data with multiple ids. Then, you could use filter function and >>>> send them to the corresponding operators. >>>> >>>> For the general case like the latter, because you had broadcasted the >>>> messages to all tasks, it could always build a mapping table from argument >>>> keys to lambda functions in each sub-task and use the map to process the >>>> data. But I was wondering if it is possible to generate a completely new >>>> function in the runtime. >>>> >>>> Best, >>>> Tony Wei >>>> >>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden...@gmail.com>: >>>> >>>>> Thanks for your reply Tony. >>>>> >>>>> So there are actually 2 problems to solve: >>>>> >>>>> 1. All control stream msgs need to be broadcasted to all tasks. >>>>> >>>>> 2. The data stream messages with the same keys as those specified in >>>>> the control message need to go to the same task as well, so that all the >>>>> values required for the lambda (i.e. functions f1, f2 ...) are there. >>>>> >>>>> In my understanding side inputs (which are actually not available in >>>>> the current release) would address problem 1. >>>>> >>>>> To address problem 1 I also tried >>>>> dataStream.keyBy(key).connect(controlStream.broadcast).flatMap(new >>>>> RichCoFlatMapFunction) but I get a runtime exception telling me I >>>>> still need to do a keyBy before the flatMap. So are the upcoming side >>>>> inputs the only way to broadcast a control stream to all tasks of a >>>>> coFlatMap? Or is there another way? >>>>> >>>>> As for problem 2, I am still pending a reply. Would appreciate if >>>>> anyone has some suggestions. >>>>> >>>>> Thanks, >>>>> M >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Martin, >>>>>> >>>>>> Let me understand your question first. >>>>>> You have two Stream: Data Stream and Control Stream and you want to >>>>>> select data in Data Stream based on the key set got from Control Stream. >>>>>> >>>>>> If I were not misunderstanding your question, I think SideInput is >>>>>> what you want. >>>>>> >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-StoringSide-InputData >>>>>> It lets you to define one stream as a SideInput and can be assigned >>>>>> to the other stream, then the data in SideInput stream will be >>>>>> broadcasted. >>>>>> >>>>>> So far, I have no idea if there is any solution to solve this without >>>>>> SideInput. >>>>>> >>>>>> Best, >>>>>> Tony Wei >>>>>> >>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com>: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> I am trying to implement the following using Flink: >>>>>>> >>>>>>> I have 2 input message streams: >>>>>>> >>>>>>> 1. Data Stream: >>>>>>> KEY VALUE TIME >>>>>>> . >>>>>>> . >>>>>>> . >>>>>>> C V6 6 >>>>>>> B V6 6 >>>>>>> A V5 5 >>>>>>> A V4 4 >>>>>>> C V3 3 >>>>>>> A V3 3 >>>>>>> B V3 3 >>>>>>> B V2 2 >>>>>>> A V1 1 >>>>>>> >>>>>>> 2. Control Stream: >>>>>>> Lambda ArgumentKeys TIME >>>>>>> . >>>>>>> . >>>>>>> . >>>>>>> f2 [A, C] 4 >>>>>>> f1 [A, B, C] 1 >>>>>>> >>>>>>> I want to apply the lambdas coming in the control stream to the >>>>>>> selection of keys that are coming in the data stream. >>>>>>> >>>>>>> Since we have 2 streams I naturally thought of connecting them using >>>>>>> .connect. For this I need to key both of them by a certain criteria. And >>>>>>> here lies the problem, how can I make sure the messages with keys A,B,C >>>>>>> specified in the control stream end up in the same task as well as the >>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to >>>>>>> key >>>>>>> by to achieve this. >>>>>>> >>>>>>> I suspect a custom partitioner is required that partitions the data >>>>>>> stream based on the messages in the control stream? Is this even >>>>>>> possible? >>>>>>> >>>>>>> Any suggestions welcomed! >>>>>>> >>>>>>> Thanks, >>>>>>> M >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >