Hi Martin, In your original example, what does this syntax mean exactly:
f1 [A, B, C] 1 Does it mean that f1 needs one A, one B and one C from the main stream? If yes, which ones, because there are multiple As and Bs and so on. Or does it mean that f1 can apply to an A or a B or a C? If it's the first, then I think it's quite hard to find a partitioning such that both f1, f2, and all A, B, and C go to the same machine. Best, Aljoscha > On 31. Aug 2017, at 15:53, Tony Wei <tony19920...@gmail.com> wrote: > > Hi Martin, > > So the problem is that you want to group those arguments in Data Stream and > pass them to the lambda function from Control Stream at the same time. Am I > right? > > If right, then you could give each lambda function an id as well. Use these > ids to tag those arguments to which they belong. > After that, keyBy function could be used to group those arguments belonging > to the same lambda function. Joining this stream with Control Stream by > function id could make arguments and function be in the same instance. > > What do you think? Could this solution solve your problem? > > Best, > Tony Wei > > 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com > <mailto:martineden...@gmail.com>>: > Thanks for your reply Tony, > > Yes we are in the latter case, where the functions/lambdas come in the > control stream. Think of them as strings containing the logic of the > function. The values for each of the arguments to the function come from the > data stream. That is why we need to co-locate the data stream messages for > the corresponding keys with the control message that has the function to be > applied. > > We have a way of interpreting the logic described in the string and executing > it on the incoming values from the data stream. This is kicked off from > within the Flink runtime (synchronous to a flatMap of the > RichCoFlatMapFunction) but is not using Flink predefined operators or > functions. > > So yeah I see your point about mapping the arguments but the problem is not > really that, the problem is making sure that the values in the control stream > are in the same instance of the task/ keyed managed state as a the actual > control stream message. Once they are we can pass them in. > > Any other thoughts? > > M > > > > > > > > On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com > <mailto:tony19920...@gmail.com>> wrote: > Hi Martin, > > About problem 2. How were those lambda functions created? Pre-defined > functions / operators or automatically generated based on the message from > Control Stream? > > For the former, you could give each function one id and user flapMap to > duplicate data with multiple ids. Then, you could use filter function and > send them to the corresponding operators. > > For the general case like the latter, because you had broadcasted the > messages to all tasks, it could always build a mapping table from argument > keys to lambda functions in each sub-task and use the map to process the > data. But I was wondering if it is possible to generate a completely new > function in the runtime. > > Best, > Tony Wei > > 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden...@gmail.com > <mailto:martineden...@gmail.com>>: > Thanks for your reply Tony. > > So there are actually 2 problems to solve: > > 1. All control stream msgs need to be broadcasted to all tasks. > > 2. The data stream messages with the same keys as those specified in the > control message need to go to the same task as well, so that all the values > required for the lambda (i.e. functions f1, f2 ...) are there. > > In my understanding side inputs (which are actually not available in the > current release) would address problem 1. > > To address problem 1 I also tried > dataStream.keyBy(key).connect(controlStream.broadcast).flatMap(new > RichCoFlatMapFunction) but I get a runtime exception telling me I still need > to do a keyBy before the flatMap. So are the upcoming side inputs the only > way to broadcast a control stream to all tasks of a coFlatMap? Or is there > another way? > > As for problem 2, I am still pending a reply. Would appreciate if anyone has > some suggestions. > > Thanks, > M > > > > > On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com > <mailto:tony19920...@gmail.com>> wrote: > Hi Martin, > > Let me understand your question first. > You have two Stream: Data Stream and Control Stream and you want to select > data in Data Stream based on the key set got from Control Stream. > > If I were not misunderstanding your question, I think SideInput is what you > want. > https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-StoringSide-InputData > > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-StoringSide-InputData> > It lets you to define one stream as a SideInput and can be assigned to the > other stream, then the data in SideInput stream will be broadcasted. > > So far, I have no idea if there is any solution to solve this without > SideInput. > > Best, > Tony Wei > > 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com > <mailto:martineden...@gmail.com>>: > Hi all, > > I am trying to implement the following using Flink: > > I have 2 input message streams: > > 1. Data Stream: > KEY VALUE TIME > . > . > . > C V6 6 > B V6 6 > A V5 5 > A V4 4 > C V3 3 > A V3 3 > B V3 3 > B V2 2 > A V1 1 > > 2. Control Stream: > Lambda ArgumentKeys TIME > . > . > . > f2 [A, C] 4 > f1 [A, B, C] 1 > > I want to apply the lambdas coming in the control stream to the selection of > keys that are coming in the data stream. > > Since we have 2 streams I naturally thought of connecting them using > .connect. For this I need to key both of them by a certain criteria. And here > lies the problem, how can I make sure the messages with keys A,B,C specified > in the control stream end up in the same task as well as the control message > (f1, [A, B, C]) itself. Basically I don't know how to key by to achieve this. > > I suspect a custom partitioner is required that partitions the data stream > based on the messages in the control stream? Is this even possible? > > Any suggestions welcomed! > > Thanks, > M > > > > > > > > > > >