Hi Martin,

So the problem is that you want to group those arguments in Data Stream and
pass them to the lambda function from Control Stream at the same time. Am I
right?

If right, then you could give each lambda function an id as well. Use these
ids to tag those arguments to which they belong.
After that, keyBy function could be used to group those arguments belonging
to the same lambda function. Joining this stream with Control Stream by
function id could make arguments and function be in the same instance.

What do you think? Could this solution solve your problem?

Best,
Tony Wei

2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com>:

> Thanks for your reply Tony,
>
> Yes we are in the latter case, where the functions/lambdas come in the
> control stream. Think of them as strings containing the logic of the
> function. The values for each of the arguments to the function come from
> the data stream. That is why we need to co-locate the data stream messages
> for the corresponding keys with the control message that has the function
> to be applied.
>
> We have a way of interpreting the logic described in the string and
> executing it on the incoming values from the data stream. This is kicked
> off from within the Flink runtime (synchronous to a flatMap of the
> RichCoFlatMapFunction) but is not using Flink predefined operators or
> functions.
>
> So yeah I see your point about mapping the arguments but the problem is
> not really that, the problem is making sure that the values in the control
> stream are in the same instance of the task/ keyed managed state as a the
> actual control stream message. Once they are we can pass them in.
>
> Any other thoughts?
>
> M
>
>
>
>
>
>
>
> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com> wrote:
>
>> Hi Martin,
>>
>> About problem 2. How were those lambda functions created? Pre-defined
>> functions / operators or automatically generated based on the message from
>> Control Stream?
>>
>> For the former, you could give each function one id and user flapMap to
>> duplicate data with multiple ids. Then, you could use filter function and
>> send them to the corresponding operators.
>>
>> For the general case like the latter, because you had broadcasted the
>> messages to all tasks, it could always build a mapping table from argument
>> keys to lambda functions in each sub-task and use the map to process the
>> data. But I was wondering if it is possible to generate a completely new
>> function in the runtime.
>>
>> Best,
>> Tony Wei
>>
>> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>
>>> Thanks for your reply Tony.
>>>
>>> So there are actually 2 problems to solve:
>>>
>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>
>>> 2. The data stream messages with the same keys as those specified in the
>>> control message need to go to the same task as well, so that all the values
>>> required for the lambda (i.e. functions f1, f2 ...) are there.
>>>
>>> In my understanding side inputs (which are actually not available in the
>>> current release) would address problem 1.
>>>
>>> To address problem 1 I also tried dataStream.keyBy(key).connect(
>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but I get a
>>> runtime exception telling me I still need to do a keyBy before the flatMap.
>>> So are the upcoming side inputs the only way to broadcast a control stream
>>> to all tasks of a coFlatMap? Or is there another way?
>>>
>>> As for problem 2, I am still pending a reply. Would appreciate if anyone
>>> has some suggestions.
>>>
>>> Thanks,
>>> M
>>>
>>>
>>>
>>>
>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com>
>>> wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> Let me understand your question first.
>>>> You have two Stream: Data Stream and Control Stream and you want to
>>>> select data in Data Stream based on the key set got from Control Stream.
>>>>
>>>> If I were not misunderstanding your question, I think SideInput is what
>>>> you want.
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Si
>>>> de+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamA
>>>> PI-StoringSide-InputData
>>>> It lets you to define one stream as a SideInput and can be assigned to
>>>> the other stream, then the data in SideInput stream will be broadcasted.
>>>>
>>>> So far, I have no idea if there is any solution to solve this without
>>>> SideInput.
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I am trying to implement the following using Flink:
>>>>>
>>>>> I have 2 input message streams:
>>>>>
>>>>> 1. Data Stream:
>>>>> KEY VALUE TIME
>>>>> .
>>>>> .
>>>>> .
>>>>> C      V6        6
>>>>> B      V6        6
>>>>> A      V5        5
>>>>> A      V4        4
>>>>> C      V3        3
>>>>> A      V3        3
>>>>> B      V3        3
>>>>> B      V2        2
>>>>> A      V1        1
>>>>>
>>>>> 2. Control Stream:
>>>>> Lambda  ArgumentKeys TIME
>>>>> .
>>>>> .
>>>>> .
>>>>> f2            [A, C]                 4
>>>>> f1            [A, B, C]            1
>>>>>
>>>>> I want to apply the lambdas coming in the control stream to the
>>>>> selection of keys that are coming in the data stream.
>>>>>
>>>>> Since we have 2 streams I naturally thought of connecting them using
>>>>> .connect. For this I need to key both of them by a certain criteria. And
>>>>> here lies the problem, how can I make sure the messages with keys A,B,C
>>>>> specified in the control stream end up in the same task as well as the
>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to key
>>>>> by to achieve this.
>>>>>
>>>>> I suspect a custom partitioner is required that partitions the data
>>>>> stream based on the messages in the control stream? Is this even possible?
>>>>>
>>>>> Any suggestions welcomed!
>>>>>
>>>>> Thanks,
>>>>> M
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to