Re: dynamically partitioned stream

2017-09-07 Thread Tony Wei
Hi Martin, For the first question, as far as I know, Flink guarantees that the order of records from the same sub-task of consumer won't be changed. If A, B and C came from different sub tasks, the result might be like your concern. After all, you can't have all sub tasks process in the same speed

Re: dynamically partitioned stream

2017-09-07 Thread Martin Eden
Hi Tony, Ah I see. Yes you are right. What I was saying in my last message is that I relaxed that requirement after realising that it works how you just described it (and Aljoscha previously) and global state is not really feasible/possible. Here is a re-worked example. Please let me know if it m

Re: dynamically partitioned stream

2017-09-07 Thread Tony Wei
Hi Martin, What I was talking is about how to store the arguments' state. In the example you explained your use case to Aljoscha. 4 f1(V4, V3, V3) f2(V4, V3) 3 f1(V3, V3, V3) 2 - 1 - You showed that when lambda f2 came, it would emit f2(V4, V3) imm

Re: dynamically partitioned stream

2017-09-06 Thread Martin Eden
Hi Tony, Yes exactly I am assuming the lambda emits a value only after it has been published to the control topic (t1) and at least 1 value arrives in the data topic for each of it's arguments. This will happen at a time t2 > t1. So yes, there is uncertainty with regards to when t2 will happen. Id

Re: dynamically partitioned stream

2017-09-06 Thread Tony Wei
Hi Martin, The performance is an issue, but in your case, yes, it might not be a problem if X << N. However, the other problem is where data should go in the beginning if there is no lambda been received. This problem doesn't associate with performance, but instead with correctness. If you want t

Re: dynamically partitioned stream

2017-09-06 Thread Martin Eden
Hi Aljoscha, Tony, We actually do not need all the keys to be on all nodes where lambdas are. We just need the keys that represent the data for the lambda arguments to be routed to the same node as the lambda, whichever one it might be. Essentially in the solution we emit the data multiple times

Re: dynamically partitioned stream

2017-09-01 Thread Tony Wei
Hi Martin, Aljoscha I think Aljoscha is right. My origin thought was to keep the state only after a lambda function coming. Use Aljoscha's scenario as example, initially, all data will be discarded because there is no any lambdas. When lambda f1 [D, E] and f2 [A, C] comes, A, C begin to be routed

Re: dynamically partitioned stream

2017-09-01 Thread Aljoscha Krettek
Hi Martin, I think with those requirements this is very hard (or maybe impossible) to do efficiently in a distributed setting. It might be that I'm misunderstanding things but let's look at an example. Assume that initially, we don't have any lambdas, so data can be sent to any machine because

Re: dynamically partitioned stream

2017-08-31 Thread Martin Eden
This might be a way forward but since side inputs are not there I will try and key the control stream by the keys in the first co flat map. I'll see how it goes. Thanks guys, M On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei wrote: > Hi Martin, > > Yes, that is exactly what I thought. > But the firs

Re: dynamically partitioned stream

2017-08-31 Thread Tony Wei
Hi Martin, Yes, that is exactly what I thought. But the first step also needs to be fulfilled by SideInput. I'm not sure how to achieve this in the current release. Best, Tony Wei Martin Eden 於 2017年8月31日 週四,下午11:32寫道: > Hi Aljoscha, Tony, > > Aljoscha: > Yes it's the first option you mentione

Re: dynamically partitioned stream

2017-08-31 Thread Martin Eden
Hi Aljoscha, Tony, Aljoscha: Yes it's the first option you mentioned. Yes, the stream has multiple values in flight for A, B, C. f1 needs to be applied each time a new value for either A, B or C comes in. So we need to use state to cache the latest values. So using the example data stream in my fi

Re: dynamically partitioned stream

2017-08-31 Thread Aljoscha Krettek
Hi Martin, In your original example, what does this syntax mean exactly: f1[A, B, C]1 Does it mean that f1 needs one A, one B and one C from the main stream? If yes, which ones, because there are multiple As and Bs and so on. Or does it mean that f1 can apply to an A or

Re: dynamically partitioned stream

2017-08-31 Thread Tony Wei
Hi Martin, So the problem is that you want to group those arguments in Data Stream and pass them to the lambda function from Control Stream at the same time. Am I right? If right, then you could give each lambda function an id as well. Use these ids to tag those arguments to which they belong. Af

Re: dynamically partitioned stream

2017-08-31 Thread Martin Eden
Thanks for your reply Tony, Yes we are in the latter case, where the functions/lambdas come in the control stream. Think of them as strings containing the logic of the function. The values for each of the arguments to the function come from the data stream. That is why we need to co-locate the dat

Re: dynamically partitioned stream

2017-08-31 Thread Tony Wei
Hi Martin, About problem 2. How were those lambda functions created? Pre-defined functions / operators or automatically generated based on the message from Control Stream? For the former, you could give each function one id and user flapMap to duplicate data with multiple ids. Then, you could use

Re: dynamically partitioned stream

2017-08-31 Thread Martin Eden
Thanks for your reply Tony. So there are actually 2 problems to solve: 1. All control stream msgs need to be broadcasted to all tasks. 2. The data stream messages with the same keys as those specified in the control message need to go to the same task as well, so that all the values required for

Re: dynamically partitioned stream

2017-08-31 Thread Tony Wei
Hi Martin, Let me understand your question first. You have two Stream: Data Stream and Control Stream and you want to select data in Data Stream based on the key set got from Control Stream. If I were not misunderstanding your question, I think SideInput is what you want. https://cwiki.apache.org

dynamically partitioned stream

2017-08-31 Thread Martin Eden
Hi all, I am trying to implement the following using Flink: I have 2 input message streams: 1. Data Stream: KEY VALUE TIME . . . C V66 B V66 A V55 A V44 C V33 A V33 B V33 B V22 A V1