Hi Martin,

Yes, that is exactly what I thought.
But the first step also needs to be fulfilled  by SideInput. I'm not sure
how to achieve this in the current release.

Best,
Tony Wei

Martin Eden <martineden...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:

> Hi Aljoscha, Tony,
>
> Aljoscha:
> Yes it's the first option you mentioned.
> Yes, the stream has multiple values in flight for A, B, C. f1 needs to be
> applied each time a new value for either A, B or C comes in. So we need to
> use state to cache the latest values. So using the example data stream in
> my first msg the emitted stream should be:
>
> 1. Data Stream:
> KEY VALUE TIME
> .
> .
> .
> C      V6        6
> B      V6        6
> A      V5        5
> A      V4        4
> C      V3        3
> A      V3        3
> B      V3        3
> B      V2        2
> A      V1        1
>
> 2. Control Stream:
> Lambda  ArgumentKeys TIME
> .
> .
> .
> f2            [A, C]                 4
> f1            [A, B, C]            1
>
> 3. Expected emitted stream:
> TIME    VALUE
> .
> .
> .
> 6          f1(V5, V6, V3)
>             f1(V5, V6, V6)
>             f2(V5, V6)
> 5          f1(V5, V3, V3)
>             f2(V5, V3)
> 4          f1(V4, V3, V3)
>             f2(V4, V3)
> 3          f1(V3, V3, V3)
> 2          -
> 1          -
>
> So essentially as soon as the argument list fills up then we apply the
> function/lambda at each new arriving message in the data stream for either
> argument key.
>
> Tony:
> Yes we need to group by and pass to the lambda.
> Ok, so what you are proposing might work. So your solution assumes that we
> have to connect with the control stream twice? Once for the tagging and
> another time re-connect-ing the control stream with the tagged stream for
> the actual application of the function/lambda?
>
> Thanks,
> Alex
>
>
>
> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi Martin,
>>
>> In your original example, what does this syntax mean exactly:
>>
>> f1            [A, B, C]            1
>>
>> Does it mean that f1 needs one A, one B and one C from the main stream?
>> If yes, which ones, because there are multiple As and Bs and so on. Or does
>> it mean that f1 can apply to an A or a B or a C? If it's the first, then I
>> think it's quite hard to find a partitioning such that both f1, f2, and all
>> A, B, and C go to the same machine.
>>
>> Best,
>> Aljoscha
>>
>> On 31. Aug 2017, at 15:53, Tony Wei <tony19920...@gmail.com> wrote:
>>
>> Hi Martin,
>>
>> So the problem is that you want to group those arguments in Data Stream
>> and pass them to the lambda function from Control Stream at the same time.
>> Am I right?
>>
>> If right, then you could give each lambda function an id as well. Use
>> these ids to tag those arguments to which they belong.
>> After that, keyBy function could be used to group those arguments
>> belonging to the same lambda function. Joining this stream with Control
>> Stream by function id could make arguments and function be in the same
>> instance.
>>
>> What do you think? Could this solution solve your problem?
>>
>> Best,
>> Tony Wei
>>
>> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>
>>> Thanks for your reply Tony,
>>>
>>> Yes we are in the latter case, where the functions/lambdas come in the
>>> control stream. Think of them as strings containing the logic of the
>>> function. The values for each of the arguments to the function come from
>>> the data stream. That is why we need to co-locate the data stream messages
>>> for the corresponding keys with the control message that has the function
>>> to be applied.
>>>
>>> We have a way of interpreting the logic described in the string and
>>> executing it on the incoming values from the data stream. This is kicked
>>> off from within the Flink runtime (synchronous to a flatMap of the
>>> RichCoFlatMapFunction) but is not using Flink predefined operators or
>>> functions.
>>>
>>> So yeah I see your point about mapping the arguments but the problem is
>>> not really that, the problem is making sure that the values in the control
>>> stream are in the same instance of the task/ keyed managed state as a the
>>> actual control stream message. Once they are we can pass them in.
>>>
>>> Any other thoughts?
>>>
>>> M
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com>
>>> wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> About problem 2. How were those lambda functions created? Pre-defined
>>>> functions / operators or automatically generated based on the message from
>>>> Control Stream?
>>>>
>>>> For the former, you could give each function one id and user flapMap to
>>>> duplicate data with multiple ids. Then, you could use filter function and
>>>> send them to the corresponding operators.
>>>>
>>>> For the general case like the latter, because you had broadcasted the
>>>> messages to all tasks, it could always build a mapping table from argument
>>>> keys to lambda functions in each sub-task and use the map to process the
>>>> data. But I was wondering if it is possible to generate a completely new
>>>> function in the runtime.
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>>
>>>>> Thanks for your reply Tony.
>>>>>
>>>>> So there are actually 2 problems to solve:
>>>>>
>>>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>>>
>>>>> 2. The data stream messages with the same keys as those specified in
>>>>> the control message need to go to the same task as well, so that all the
>>>>> values required for the lambda (i.e. functions f1, f2 ...) are there.
>>>>>
>>>>> In my understanding side inputs (which are actually not available in
>>>>> the current release) would address problem 1.
>>>>>
>>>>> To address problem 1 I also tried
>>>>> dataStream.keyBy(key).connect(controlStream.broadcast).flatMap(new
>>>>> RichCoFlatMapFunction) but I get a runtime exception telling me I
>>>>> still need to do a keyBy before the flatMap. So are the upcoming side
>>>>> inputs the only way to broadcast a control stream to all tasks of a
>>>>> coFlatMap? Or is there another way?
>>>>>
>>>>> As for problem 2, I am still pending a reply. Would appreciate if
>>>>> anyone has some suggestions.
>>>>>
>>>>> Thanks,
>>>>> M
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Martin,
>>>>>>
>>>>>> Let me understand your question first.
>>>>>> You have two Stream: Data Stream and Control Stream and you want to
>>>>>> select data in Data Stream based on the key set got from Control Stream.
>>>>>>
>>>>>> If I were not misunderstanding your question, I think SideInput is
>>>>>> what you want.
>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-StoringSide-InputData
>>>>>> It lets you to define one stream as a SideInput and can be assigned
>>>>>> to the other stream, then the data in SideInput stream will be 
>>>>>> broadcasted.
>>>>>>
>>>>>> So far, I have no idea if there is any solution to solve this without
>>>>>> SideInput.
>>>>>>
>>>>>> Best,
>>>>>> Tony Wei
>>>>>>
>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I am trying to implement the following using Flink:
>>>>>>>
>>>>>>> I have 2 input message streams:
>>>>>>>
>>>>>>> 1. Data Stream:
>>>>>>> KEY VALUE TIME
>>>>>>> .
>>>>>>> .
>>>>>>> .
>>>>>>> C      V6        6
>>>>>>> B      V6        6
>>>>>>> A      V5        5
>>>>>>> A      V4        4
>>>>>>> C      V3        3
>>>>>>> A      V3        3
>>>>>>> B      V3        3
>>>>>>> B      V2        2
>>>>>>> A      V1        1
>>>>>>>
>>>>>>> 2. Control Stream:
>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>> .
>>>>>>> .
>>>>>>> .
>>>>>>> f2            [A, C]                 4
>>>>>>> f1            [A, B, C]            1
>>>>>>>
>>>>>>> I want to apply the lambdas coming in the control stream to the
>>>>>>> selection of keys that are coming in the data stream.
>>>>>>>
>>>>>>> Since we have 2 streams I naturally thought of connecting them using
>>>>>>> .connect. For this I need to key both of them by a certain criteria. And
>>>>>>> here lies the problem, how can I make sure the messages with keys A,B,C
>>>>>>> specified in the control stream end up in the same task as well as the
>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to 
>>>>>>> key
>>>>>>> by to achieve this.
>>>>>>>
>>>>>>> I suspect a custom partitioner is required that partitions the data
>>>>>>> stream based on the messages in the control stream? Is this even 
>>>>>>> possible?
>>>>>>>
>>>>>>> Any suggestions welcomed!
>>>>>>>
>>>>>>> Thanks,
>>>>>>> M
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Reply via email to