Hi, Yes I think it makes sense. :) Gyula
On Fri, Aug 12, 2016, 17:02 Ufuk Celebi <u...@apache.org> wrote: > I will update the design doc with more details for the Checkpointed > variants and remove Option 2 (I think that's an orthogonal thing). > > The way I see it now, we should have base CheckpointedBase interface, > have the current Checkpointed interface be a subclass for not > repartitionable state. Then we have two other List-based variants: > > 1) Union List => on restore all state is unioned (what is currently in > the design doc) > > 2) List => on restore state is automatically redistributed (if > parallelism stays the same, state should go to the same sub tasks, but > no guarantees when changed parallelism). > > ==== > > Regarding the other thing you and Aljoscha discussed: I feel like that > should be handled as part of the side input effort. Does that make > sense? > > > > On Fri, Aug 12, 2016 at 3:11 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: > > Hi Aljoscha, > > > > Yes this is pretty much how I think about it as well. > > > > Basically the state in this case would be computed from the side inputs > > with the same state update logic on all operators. I think it is imprtant > > that operators compute their own state or at least observe all state > > changes otherwise a lot of things can get weird. > > > > Lets say for instance I am building a dynamic filter where new filter > > conditions are added /removed on the fly. For the sake of my argument > lets > > also assume that initializing a new filter condition is a heavy > operation. > > The global state in this case is the union of all filter conditions. > > > > If at any point in time the operators could only observe the current > state > > we might end up with a very inefficient code, while if we observe all > state > > changes individually (add 1 new filter) we can jus instantiate the new > > filter without worrying about the other ones. > > > > I am not completely sure if its clear what I am trying to say :D > > > > Gyula > > > > On Fri, Aug 12, 2016, 14:28 Aljoscha Krettek <aljos...@apache.org> > wrote: > > > >> Hi Gyula, > >> I was thinking about this as well, in the context of side-inputs, which > >> would be a generalization of your use case. If I'm not mistaken. In my > head > >> I was calling it global state. Essentially, this state would be the > same on > >> all operators and when checkpointing you would only have to checkpoint > the > >> state of operator 0. Upon restore you would distribute this state to all > >> operators again. > >> > >> Is this what you had in mind? > >> > >> Cheers, > >> Aljoscha > >> > >> On Fri, 12 Aug 2016 at 13:07 Gyula Fóra <gyula.f...@gmail.com> wrote: > >> > >> > Hi, > >> > Let me try to explain what I mean by broadcast states. > >> > > >> > I think it is a very common pattern that people broadcast control > >> messages > >> > to operators that also receive normal input events. > >> > > >> > some examples: broadcast a model for prediction, broadcast some > >> information > >> > that should be the same at all subtasks but is evolving over time. At > the > >> > same time these operators usually also do normal event processing > based > >> on > >> > the broadcasted input stream. > >> > > >> > There is currently no proper solution for this provided by the api. We > >> can > >> > of course use connected operators or wrapper types and broadcast one > of > >> the > >> > input but there are several limitations. We cant use keyed states for > >> > instance becase that requires both inputs to be keyed (so we cant > >> > broadcast). > >> > > >> > Cheers, > >> > Gyula > >> > > >> > On Fri, Aug 12, 2016, 12:28 Ufuk Celebi <u...@apache.org> wrote: > >> > > >> > > Comments inline. > >> > > > >> > > On Thu, Aug 11, 2016 at 8:06 PM, Gyula Fóra <gyula.f...@gmail.com> > >> > wrote: > >> > > > Option 1: > >> > > > I think the main problem here is sending all the state everywhere > >> will > >> > > not > >> > > > scale at all. I think this will even fail for some internal Flink > >> > > operators > >> > > > (window timers I think are kept like this, maybe Im wrong here). > The > >> > > > general problem here what we don't have with the key-value states > is > >> > that > >> > > > the system can't do the repartitioning automatically. I think we > >> should > >> > > try > >> > > > to make abstractions that would allow the system to do this. > >> > > > >> > > The state size can definitely become a problem. For Kafka sources > for > >> > > example I don' think it would be problematic, but the timers it > might > >> > > be, yes. It definitely depends on the use case. > >> > > > >> > > In theory, we could also redistribute the list elements > automatically, > >> > > for example in a round robing fashion. The question is whether this > >> > > will be enough in general. > >> > > > >> > > > > >> > > > Option 2: > >> > > > To be honest I don't completely get this approach, what do the > >> indices > >> > > mean > >> > > > in the get set methods? What happens if the same index is used > from > >> > > > multiple operators? > >> > > > This may also suffers in scalability like option 1 (but as I said > I > >> > dont > >> > > > get this completely :() > >> > > > >> > > Yes, I don't like it either. It's actually similar to Option 1 (from > >> > > runtime perspective). I think the main question with Option 2 is > >> > > whether we expose the API as an interface or a state class. If we go > >> > > for this kind of interface we could parameterize the restore > behaviour > >> > > via the descriptor (e.g. flag to merge/union etc.). That should be > >> > > more extensible than providing interfaces. > >> > > > >> > > > I think another approach could be (might be similar what option 2 > is > >> > > trying > >> > > > to achieve) to provide a Set<T> (or Map) like abstraction to > keep > >> the > >> > > non > >> > > > partitioned states. Users could add/remove things from it at > their on > >> > > will, > >> > > > but the system would be free to redistribute the Sets between the > >> > > > operators. In practice this would mean for instance that the Kafka > >> > > sources > >> > > > would store (partition, offset) tuples in the set but and every > time > >> in > >> > > the > >> > > > open method they would check what is assigned to them (the system > is > >> > free > >> > > > to decide). This of course would only work well if we can assume > that > >> > > > distributing the states by equal numbers is desirable. > >> > > > >> > > I think the same point applies to redistributing the list > >> > > automatically (what I meant with whether it is "general enough"). I > >> > > think what you describe here could be the list w/o unioning it. > >> > > > >> > > > > >> > > > Broadcast states: > >> > > > This might be a good time to think about broadcast states. > >> > > Non-partitioned > >> > > > states that are the same at all subtasks, I think this comes up > in a > >> > lot > >> > > of > >> > > > use-cases (I know at least one myself haha) and it is pretty > straight > >> > > > forward from a runtime perspective, the bigger question is the > API. > >> > > > >> > > Can you explain this a little more? > >> > > > >> > > ======== > >> > > > >> > > Another open question (not addressed in the FLIP yet) is how we > treat > >> > > operators that have both keyed and non-keyed state. The current API > >> > > kind of moves this question to the user. > >> > > > >> > > >> >