This might be a way forward but since side inputs are not there I will try
and key the control stream by the keys in the first co flat map.

I'll see how it goes.

Thanks guys,
M

On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <tony19920...@gmail.com> wrote:

> Hi Martin,
>
> Yes, that is exactly what I thought.
> But the first step also needs to be fulfilled  by SideInput. I'm not sure
> how to achieve this in the current release.
>
> Best,
> Tony Wei
>
> Martin Eden <martineden...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>
>> Hi Aljoscha, Tony,
>>
>> Aljoscha:
>> Yes it's the first option you mentioned.
>> Yes, the stream has multiple values in flight for A, B, C. f1 needs to be
>> applied each time a new value for either A, B or C comes in. So we need to
>> use state to cache the latest values. So using the example data stream in
>> my first msg the emitted stream should be:
>>
>> 1. Data Stream:
>> KEY VALUE TIME
>> .
>> .
>> .
>> C      V6        6
>> B      V6        6
>> A      V5        5
>> A      V4        4
>> C      V3        3
>> A      V3        3
>> B      V3        3
>> B      V2        2
>> A      V1        1
>>
>> 2. Control Stream:
>> Lambda  ArgumentKeys TIME
>> .
>> .
>> .
>> f2            [A, C]                 4
>> f1            [A, B, C]            1
>>
>> 3. Expected emitted stream:
>> TIME    VALUE
>> .
>> .
>> .
>> 6          f1(V5, V6, V3)
>>             f1(V5, V6, V6)
>>             f2(V5, V6)
>> 5          f1(V5, V3, V3)
>>             f2(V5, V3)
>> 4          f1(V4, V3, V3)
>>             f2(V4, V3)
>> 3          f1(V3, V3, V3)
>> 2          -
>> 1          -
>>
>> So essentially as soon as the argument list fills up then we apply the
>> function/lambda at each new arriving message in the data stream for either
>> argument key.
>>
>> Tony:
>> Yes we need to group by and pass to the lambda.
>> Ok, so what you are proposing might work. So your solution assumes that
>> we have to connect with the control stream twice? Once for the tagging and
>> another time re-connect-ing the control stream with the tagged stream for
>> the actual application of the function/lambda?
>>
>> Thanks,
>> Alex
>>
>>
>>
>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi Martin,
>>>
>>> In your original example, what does this syntax mean exactly:
>>>
>>> f1            [A, B, C]            1
>>>
>>> Does it mean that f1 needs one A, one B and one C from the main stream?
>>> If yes, which ones, because there are multiple As and Bs and so on. Or does
>>> it mean that f1 can apply to an A or a B or a C? If it's the first, then I
>>> think it's quite hard to find a partitioning such that both f1, f2, and all
>>> A, B, and C go to the same machine.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 31. Aug 2017, at 15:53, Tony Wei <tony19920...@gmail.com> wrote:
>>>
>>> Hi Martin,
>>>
>>> So the problem is that you want to group those arguments in Data Stream
>>> and pass them to the lambda function from Control Stream at the same time.
>>> Am I right?
>>>
>>> If right, then you could give each lambda function an id as well. Use
>>> these ids to tag those arguments to which they belong.
>>> After that, keyBy function could be used to group those arguments
>>> belonging to the same lambda function. Joining this stream with Control
>>> Stream by function id could make arguments and function be in the same
>>> instance.
>>>
>>> What do you think? Could this solution solve your problem?
>>>
>>> Best,
>>> Tony Wei
>>>
>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>
>>>> Thanks for your reply Tony,
>>>>
>>>> Yes we are in the latter case, where the functions/lambdas come in the
>>>> control stream. Think of them as strings containing the logic of the
>>>> function. The values for each of the arguments to the function come from
>>>> the data stream. That is why we need to co-locate the data stream messages
>>>> for the corresponding keys with the control message that has the function
>>>> to be applied.
>>>>
>>>> We have a way of interpreting the logic described in the string and
>>>> executing it on the incoming values from the data stream. This is kicked
>>>> off from within the Flink runtime (synchronous to a flatMap of the
>>>> RichCoFlatMapFunction) but is not using Flink predefined operators or
>>>> functions.
>>>>
>>>> So yeah I see your point about mapping the arguments but the problem is
>>>> not really that, the problem is making sure that the values in the control
>>>> stream are in the same instance of the task/ keyed managed state as a the
>>>> actual control stream message. Once they are we can pass them in.
>>>>
>>>> Any other thoughts?
>>>>
>>>> M
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Martin,
>>>>>
>>>>> About problem 2. How were those lambda functions created? Pre-defined
>>>>> functions / operators or automatically generated based on the message from
>>>>> Control Stream?
>>>>>
>>>>> For the former, you could give each function one id and user flapMap
>>>>> to duplicate data with multiple ids. Then, you could use filter function
>>>>> and send them to the corresponding operators.
>>>>>
>>>>> For the general case like the latter, because you had broadcasted the
>>>>> messages to all tasks, it could always build a mapping table from argument
>>>>> keys to lambda functions in each sub-task and use the map to process the
>>>>> data. But I was wondering if it is possible to generate a completely new
>>>>> function in the runtime.
>>>>>
>>>>> Best,
>>>>> Tony Wei
>>>>>
>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>>>
>>>>>> Thanks for your reply Tony.
>>>>>>
>>>>>> So there are actually 2 problems to solve:
>>>>>>
>>>>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>>>>
>>>>>> 2. The data stream messages with the same keys as those specified in
>>>>>> the control message need to go to the same task as well, so that all the
>>>>>> values required for the lambda (i.e. functions f1, f2 ...) are there.
>>>>>>
>>>>>> In my understanding side inputs (which are actually not available in
>>>>>> the current release) would address problem 1.
>>>>>>
>>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect(
>>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but I
>>>>>> get a runtime exception telling me I still need to do a keyBy before the
>>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a
>>>>>> control stream to all tasks of a coFlatMap? Or is there another way?
>>>>>>
>>>>>> As for problem 2, I am still pending a reply. Would appreciate if
>>>>>> anyone has some suggestions.
>>>>>>
>>>>>> Thanks,
>>>>>> M
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Martin,
>>>>>>>
>>>>>>> Let me understand your question first.
>>>>>>> You have two Stream: Data Stream and Control Stream and you want to
>>>>>>> select data in Data Stream based on the key set got from Control Stream.
>>>>>>>
>>>>>>> If I were not misunderstanding your question, I think SideInput is
>>>>>>> what you want.
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>>>>>>> 17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-
>>>>>>> StoringSide-InputData
>>>>>>> It lets you to define one stream as a SideInput and can be assigned
>>>>>>> to the other stream, then the data in SideInput stream will be 
>>>>>>> broadcasted.
>>>>>>>
>>>>>>> So far, I have no idea if there is any solution to solve this
>>>>>>> without SideInput.
>>>>>>>
>>>>>>> Best,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I am trying to implement the following using Flink:
>>>>>>>>
>>>>>>>> I have 2 input message streams:
>>>>>>>>
>>>>>>>> 1. Data Stream:
>>>>>>>> KEY VALUE TIME
>>>>>>>> .
>>>>>>>> .
>>>>>>>> .
>>>>>>>> C      V6        6
>>>>>>>> B      V6        6
>>>>>>>> A      V5        5
>>>>>>>> A      V4        4
>>>>>>>> C      V3        3
>>>>>>>> A      V3        3
>>>>>>>> B      V3        3
>>>>>>>> B      V2        2
>>>>>>>> A      V1        1
>>>>>>>>
>>>>>>>> 2. Control Stream:
>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>> .
>>>>>>>> .
>>>>>>>> .
>>>>>>>> f2            [A, C]                 4
>>>>>>>> f1            [A, B, C]            1
>>>>>>>>
>>>>>>>> I want to apply the lambdas coming in the control stream to the
>>>>>>>> selection of keys that are coming in the data stream.
>>>>>>>>
>>>>>>>> Since we have 2 streams I naturally thought of connecting them
>>>>>>>> using .connect. For this I need to key both of them by a certain 
>>>>>>>> criteria.
>>>>>>>> And here lies the problem, how can I make sure the messages with keys 
>>>>>>>> A,B,C
>>>>>>>> specified in the control stream end up in the same task as well as the
>>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to 
>>>>>>>> key
>>>>>>>> by to achieve this.
>>>>>>>>
>>>>>>>> I suspect a custom partitioner is required that partitions the data
>>>>>>>> stream based on the messages in the control stream? Is this even 
>>>>>>>> possible?
>>>>>>>>
>>>>>>>> Any suggestions welcomed!
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> M
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>

Reply via email to