I will update the design doc with more details for the Checkpointed variants and remove Option 2 (I think that's an orthogonal thing).
The way I see it now, we should have base CheckpointedBase interface, have the current Checkpointed interface be a subclass for not repartitionable state. Then we have two other List-based variants: 1) Union List => on restore all state is unioned (what is currently in the design doc) 2) List => on restore state is automatically redistributed (if parallelism stays the same, state should go to the same sub tasks, but no guarantees when changed parallelism). ==== Regarding the other thing you and Aljoscha discussed: I feel like that should be handled as part of the side input effort. Does that make sense? On Fri, Aug 12, 2016 at 3:11 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi Aljoscha, > > Yes this is pretty much how I think about it as well. > > Basically the state in this case would be computed from the side inputs > with the same state update logic on all operators. I think it is imprtant > that operators compute their own state or at least observe all state > changes otherwise a lot of things can get weird. > > Lets say for instance I am building a dynamic filter where new filter > conditions are added /removed on the fly. For the sake of my argument lets > also assume that initializing a new filter condition is a heavy operation. > The global state in this case is the union of all filter conditions. > > If at any point in time the operators could only observe the current state > we might end up with a very inefficient code, while if we observe all state > changes individually (add 1 new filter) we can jus instantiate the new > filter without worrying about the other ones. > > I am not completely sure if its clear what I am trying to say :D > > Gyula > > On Fri, Aug 12, 2016, 14:28 Aljoscha Krettek <aljos...@apache.org> wrote: > >> Hi Gyula, >> I was thinking about this as well, in the context of side-inputs, which >> would be a generalization of your use case. If I'm not mistaken. In my head >> I was calling it global state. Essentially, this state would be the same on >> all operators and when checkpointing you would only have to checkpoint the >> state of operator 0. Upon restore you would distribute this state to all >> operators again. >> >> Is this what you had in mind? >> >> Cheers, >> Aljoscha >> >> On Fri, 12 Aug 2016 at 13:07 Gyula Fóra <gyula.f...@gmail.com> wrote: >> >> > Hi, >> > Let me try to explain what I mean by broadcast states. >> > >> > I think it is a very common pattern that people broadcast control >> messages >> > to operators that also receive normal input events. >> > >> > some examples: broadcast a model for prediction, broadcast some >> information >> > that should be the same at all subtasks but is evolving over time. At the >> > same time these operators usually also do normal event processing based >> on >> > the broadcasted input stream. >> > >> > There is currently no proper solution for this provided by the api. We >> can >> > of course use connected operators or wrapper types and broadcast one of >> the >> > input but there are several limitations. We cant use keyed states for >> > instance becase that requires both inputs to be keyed (so we cant >> > broadcast). >> > >> > Cheers, >> > Gyula >> > >> > On Fri, Aug 12, 2016, 12:28 Ufuk Celebi <u...@apache.org> wrote: >> > >> > > Comments inline. >> > > >> > > On Thu, Aug 11, 2016 at 8:06 PM, Gyula Fóra <gyula.f...@gmail.com> >> > wrote: >> > > > Option 1: >> > > > I think the main problem here is sending all the state everywhere >> will >> > > not >> > > > scale at all. I think this will even fail for some internal Flink >> > > operators >> > > > (window timers I think are kept like this, maybe Im wrong here). The >> > > > general problem here what we don't have with the key-value states is >> > that >> > > > the system can't do the repartitioning automatically. I think we >> should >> > > try >> > > > to make abstractions that would allow the system to do this. >> > > >> > > The state size can definitely become a problem. For Kafka sources for >> > > example I don' think it would be problematic, but the timers it might >> > > be, yes. It definitely depends on the use case. >> > > >> > > In theory, we could also redistribute the list elements automatically, >> > > for example in a round robing fashion. The question is whether this >> > > will be enough in general. >> > > >> > > > >> > > > Option 2: >> > > > To be honest I don't completely get this approach, what do the >> indices >> > > mean >> > > > in the get set methods? What happens if the same index is used from >> > > > multiple operators? >> > > > This may also suffers in scalability like option 1 (but as I said I >> > dont >> > > > get this completely :() >> > > >> > > Yes, I don't like it either. It's actually similar to Option 1 (from >> > > runtime perspective). I think the main question with Option 2 is >> > > whether we expose the API as an interface or a state class. If we go >> > > for this kind of interface we could parameterize the restore behaviour >> > > via the descriptor (e.g. flag to merge/union etc.). That should be >> > > more extensible than providing interfaces. >> > > >> > > > I think another approach could be (might be similar what option 2 is >> > > trying >> > > > to achieve) to provide a Set<T> (or Map) like abstraction to keep >> the >> > > non >> > > > partitioned states. Users could add/remove things from it at their on >> > > will, >> > > > but the system would be free to redistribute the Sets between the >> > > > operators. In practice this would mean for instance that the Kafka >> > > sources >> > > > would store (partition, offset) tuples in the set but and every time >> in >> > > the >> > > > open method they would check what is assigned to them (the system is >> > free >> > > > to decide). This of course would only work well if we can assume that >> > > > distributing the states by equal numbers is desirable. >> > > >> > > I think the same point applies to redistributing the list >> > > automatically (what I meant with whether it is "general enough"). I >> > > think what you describe here could be the list w/o unioning it. >> > > >> > > > >> > > > Broadcast states: >> > > > This might be a good time to think about broadcast states. >> > > Non-partitioned >> > > > states that are the same at all subtasks, I think this comes up in a >> > lot >> > > of >> > > > use-cases (I know at least one myself haha) and it is pretty straight >> > > > forward from a runtime perspective, the bigger question is the API. >> > > >> > > Can you explain this a little more? >> > > >> > > ======== >> > > >> > > Another open question (not addressed in the FLIP yet) is how we treat >> > > operators that have both keyed and non-keyed state. The current API >> > > kind of moves this question to the user. >> > > >> > >>