I will update the design doc with more details for the Checkpointed
variants and remove Option 2 (I think that's an orthogonal thing).

The way I see it now, we should have base CheckpointedBase interface,
have the current Checkpointed interface be a subclass for not
repartitionable state. Then we have two other List-based variants:

1) Union List => on restore all state is unioned (what is currently in
the design doc)

2) List => on restore state is automatically redistributed (if
parallelism stays the same, state should go to the same sub tasks, but
no guarantees when changed parallelism).

====

Regarding the other thing you and Aljoscha discussed: I feel like that
should be handled as part of the side input effort. Does that make
sense?



On Fri, Aug 12, 2016 at 3:11 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
> Hi Aljoscha,
>
> Yes this is pretty much how I think about it as well.
>
> Basically the state in this case would be computed from the side inputs
> with the same state update logic on all operators. I think it is imprtant
> that operators compute their own state or at least observe all state
> changes otherwise a lot of things can get weird.
>
> Lets say for instance I am building a dynamic filter where new filter
> conditions are added /removed on the fly. For the sake of my argument lets
> also assume that initializing a new filter condition is a heavy operation.
> The global state in this case is the union of all filter conditions.
>
> If at any point in time the operators could only observe the current state
> we might end up with a very inefficient code, while if we observe all state
> changes individually  (add 1 new filter) we can jus instantiate the new
> filter without worrying about the other ones.
>
> I am not completely sure if its clear what I am trying to say :D
>
> Gyula
>
> On Fri, Aug 12, 2016, 14:28 Aljoscha Krettek <aljos...@apache.org> wrote:
>
>> Hi Gyula,
>> I was thinking about this as well, in the context of side-inputs, which
>> would be a generalization of your use case. If I'm not mistaken. In my head
>> I was calling it global state. Essentially, this state would be the same on
>> all operators and when checkpointing you would only have to checkpoint the
>> state of operator 0. Upon restore you would distribute this state to all
>> operators again.
>>
>> Is this what you had in mind?
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 12 Aug 2016 at 13:07 Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>> > Hi,
>> > Let me try to explain what I mean by broadcast states.
>> >
>> > I think it is a very common pattern that people broadcast control
>> messages
>> > to operators that also receive normal input events.
>> >
>> > some examples: broadcast a model for prediction, broadcast some
>> information
>> > that should be the same at all subtasks but is evolving over time. At the
>> > same time these operators usually also do normal event processing based
>> on
>> > the broadcasted input stream.
>> >
>> > There is currently no proper solution for this provided by the api. We
>> can
>> > of course use connected operators or wrapper types and broadcast one of
>> the
>> > input but there are several limitations. We cant use keyed states for
>> > instance becase that requires both inputs to be keyed (so we cant
>> > broadcast).
>> >
>> > Cheers,
>> > Gyula
>> >
>> > On Fri, Aug 12, 2016, 12:28 Ufuk Celebi <u...@apache.org> wrote:
>> >
>> > > Comments inline.
>> > >
>> > > On Thu, Aug 11, 2016 at 8:06 PM, Gyula Fóra <gyula.f...@gmail.com>
>> > wrote:
>> > > > Option 1:
>> > > > I think the main problem here is sending all the state everywhere
>> will
>> > > not
>> > > > scale at all. I think this will even fail for some internal Flink
>> > > operators
>> > > > (window timers I think are kept like this, maybe Im wrong here). The
>> > > > general problem here what we don't have with the key-value states is
>> > that
>> > > > the system can't do the repartitioning automatically. I think we
>> should
>> > > try
>> > > > to make abstractions that would allow the system to do this.
>> > >
>> > > The state size can definitely become a problem. For Kafka sources for
>> > > example I don' think it would be problematic, but the timers it might
>> > > be, yes. It definitely depends on the use case.
>> > >
>> > > In theory, we could also redistribute the list elements automatically,
>> > > for example in a round robing fashion. The question is whether this
>> > > will be enough in general.
>> > >
>> > > >
>> > > > Option 2:
>> > > > To be honest I don't completely get this approach, what do the
>> indices
>> > > mean
>> > > > in the get set methods? What happens if the same index is used from
>> > > > multiple operators?
>> > > > This may also suffers in scalability like option 1 (but as I said I
>> > dont
>> > > > get this completely :()
>> > >
>> > > Yes, I don't like it either. It's actually similar to Option 1 (from
>> > > runtime perspective). I think the main question with Option 2 is
>> > > whether we expose the API as an interface or a state class. If we go
>> > > for this kind of interface we could parameterize the restore behaviour
>> > > via the descriptor (e.g. flag to merge/union etc.). That should be
>> > > more extensible than providing interfaces.
>> > >
>> > > > I think another approach could be (might be similar what option 2 is
>> > > trying
>> > > > to achieve) to provide a Set<T>  (or Map) like  abstraction to keep
>> the
>> > > non
>> > > > partitioned states. Users could add/remove things from it at their on
>> > > will,
>> > > > but the system would be free to redistribute the Sets between the
>> > > > operators. In practice this would mean for instance that the Kafka
>> > > sources
>> > > > would store (partition, offset) tuples in the set but and every time
>> in
>> > > the
>> > > > open method they would check what is assigned to them (the system is
>> > free
>> > > > to decide). This of course would only work well if we can assume that
>> > > > distributing the states by equal numbers is desirable.
>> > >
>> > > I think the same point applies to redistributing the list
>> > > automatically (what I meant with whether it is "general enough"). I
>> > > think what you describe here could be the list w/o unioning it.
>> > >
>> > > >
>> > > > Broadcast states:
>> > > > This might be a good time to think about broadcast states.
>> > > Non-partitioned
>> > > > states that are the same at all subtasks, I think this comes up in a
>> > lot
>> > > of
>> > > > use-cases (I know at least one myself haha) and it is pretty straight
>> > > > forward from a runtime perspective, the bigger question is the API.
>> > >
>> > > Can you explain this a little more?
>> > >
>> > > ========
>> > >
>> > > Another open question (not addressed in the FLIP yet) is how we treat
>> > > operators that have both keyed and non-keyed state. The current API
>> > > kind of moves this question to the user.
>> > >
>> >
>>

Reply via email to