Hi Martin,

In your original example, what does this syntax mean exactly:

f1            [A, B, C]            1

Does it mean that f1 needs one A, one B and one C from the main stream? If yes, 
which ones, because there are multiple As and Bs and so on. Or does it mean 
that f1 can apply to an A or a B or a C? If it's the first, then I think it's 
quite hard to find a partitioning such that both f1, f2, and all A, B, and C go 
to the same machine.


> On 31. Aug 2017, at 15:53, Tony Wei <tony19920...@gmail.com> wrote:
> Hi Martin,
> So the problem is that you want to group those arguments in Data Stream and 
> pass them to the lambda function from Control Stream at the same time. Am I 
> right?
> If right, then you could give each lambda function an id as well. Use these 
> ids to tag those arguments to which they belong.
> After that, keyBy function could be used to group those arguments belonging 
> to the same lambda function. Joining this stream with Control Stream by 
> function id could make arguments and function be in the same instance.
> What do you think? Could this solution solve your problem?
> Best,
> Tony Wei
> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com 
> <mailto:martineden...@gmail.com>>:
> Thanks for your reply Tony,
> Yes we are in the latter case, where the functions/lambdas come in the 
> control stream. Think of them as strings containing the logic of the 
> function. The values for each of the arguments to the function come from the 
> data stream. That is why we need to co-locate the data stream messages for 
> the corresponding keys with the control message that has the function to be 
> applied.
> We have a way of interpreting the logic described in the string and executing 
> it on the incoming values from the data stream. This is kicked off from 
> within the Flink runtime (synchronous to a flatMap of the 
> RichCoFlatMapFunction) but is not using Flink predefined operators or 
> functions.
> So yeah I see your point about mapping the arguments but the problem is not 
> really that, the problem is making sure that the values in the control stream 
> are in the same instance of the task/ keyed managed state as a the actual 
> control stream message. Once they are we can pass them in.
> Any other thoughts?
> M
> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com 
> <mailto:tony19920...@gmail.com>> wrote:
> Hi Martin,
> About problem 2. How were those lambda functions created? Pre-defined 
> functions / operators or automatically generated based on the message from 
> Control Stream?
> For the former, you could give each function one id and user flapMap to 
> duplicate data with multiple ids. Then, you could use filter function and 
> send them to the corresponding operators.
> For the general case like the latter, because you had broadcasted the 
> messages to all tasks, it could always build a mapping table from argument 
> keys to lambda functions in each sub-task and use the map to process the 
> data. But I was wondering if it is possible to generate a completely new 
> function in the runtime.
> Best,
> Tony Wei
> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden...@gmail.com 
> <mailto:martineden...@gmail.com>>:
> Thanks for your reply Tony.
> So there are actually 2 problems to solve:
> 1. All control stream msgs need to be broadcasted to all tasks.
> 2. The data stream messages with the same keys as those specified in the 
> control message need to go to the same task as well, so that all the values 
> required for the lambda (i.e. functions f1, f2 ...) are there.
> In my understanding side inputs (which are actually not available in the 
> current release) would address problem 1.
> To address problem 1 I also tried 
> dataStream.keyBy(key).connect(controlStream.broadcast).flatMap(new 
> RichCoFlatMapFunction) but I get a runtime exception telling me I still need 
> to do a keyBy before the flatMap. So are the upcoming side inputs the only 
> way to broadcast a control stream to all tasks of a coFlatMap? Or is there 
> another way?
> As for problem 2, I am still pending a reply. Would appreciate if anyone has 
> some suggestions.
> Thanks,
> M
> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com 
> <mailto:tony19920...@gmail.com>> wrote:
> Hi Martin,
> Let me understand your question first.
> You have two Stream: Data Stream and Control Stream and you want to select 
> data in Data Stream based on the key set got from Control Stream.
> If I were not misunderstanding your question, I think SideInput is what you 
> want.
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-StoringSide-InputData
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-StoringSide-InputData>
> It lets you to define one stream as a SideInput and can be assigned to the 
> other stream, then the data in SideInput stream will be broadcasted.
> So far, I have no idea if there is any solution to solve this without 
> SideInput.
> Best,
> Tony Wei
> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com 
> <mailto:martineden...@gmail.com>>:
> Hi all,
> I am trying to implement the following using Flink:
> I have 2 input message streams:
> 1. Data Stream:
> .
> .
> .
> C      V6        6
> B      V6        6
> A      V5        5
> A      V4        4
> C      V3        3
> A      V3        3
> B      V3        3
> B      V2        2
> A      V1        1
> 2. Control Stream:
> Lambda  ArgumentKeys TIME
> .
> .
> .
> f2            [A, C]                 4
> f1            [A, B, C]            1
> I want to apply the lambdas coming in the control stream to the selection of 
> keys that are coming in the data stream.
> Since we have 2 streams I naturally thought of connecting them using 
> .connect. For this I need to key both of them by a certain criteria. And here 
> lies the problem, how can I make sure the messages with keys A,B,C specified 
> in the control stream end up in the same task as well as the control message 
> (f1, [A, B, C]) itself. Basically I don't know how to key by to achieve this.
> I suspect a custom partitioner is required that partitions the data stream 
> based on the messages in the control stream? Is this even possible?
> Any suggestions welcomed!
> Thanks,
> M

Reply via email to