Hi Weijie, Thanks for your reply!
Overall I'd be fine with the builder pattern, but it is a little bit long when carrying explicit 'build()' and declaring the builder. Keeping the StateDeclaration immutable is OK, but it is a little bit inconvenient for overriding the undefined options by job configuration at runtime. I'd suggest providing some methods responsible for rebuilding a new StateDeclaration with new configurable options, just like the ConfigOptions#defaultValue does. Well, this is just a suggestion, I'm not going to insist on it. Best, Zakelly On Tue, Mar 12, 2024 at 2:07 PM weijie guo <guoweijieres...@gmail.com> wrote: > Hi Zakelly, > > > But still, from a user's point of view, state can be characterized along > two relatively independent dimensions, how states redistribute and the data > structure. Thus I still suggest a chained-like configuration API that > configures one aspect on each call. > > > I think the chained-like style is a good suggestion. But I'm not going to > introduce any mutable-like API to StateDeclaration (even though we can > achieve immutability by returning a new object). For this reason, I decided > to use the builder pattern, which also has the benefit of chaining calls > and allows us to support further configurations such as setTTL in the > future. For ease of use, we'll also provide some shortcuts to avoid having > to go through a long build chain each time. Of course, I have updated the > the FLIP about this part. > > > > Best regards, > > Weijie > > > weijie guo <guoweijieres...@gmail.com> 于2024年3月12日周二 14:00写道: > > > Hi Hangxiang, > > > > > So these operators only define all states they may use which could be > > explained by the caller, right ? > > > > Yes, you're right. > > > > > > Best regards, > > > > Weijie > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年3月12日周二 13:59写道: > > > >> Hi Max, > >> > >> > In this thread it looks like the plan is to remove the old state > >> declaration API. I think we should consider keeping the old APIs to > >> avoid breaking too many jobs. > >> > >> We're not plan to remove any old apis, which means that changes made in > >> V2 won't affect any V1 DataStream jobs. But V2 is limited to the new > state > >> declaration API, and users who want to migrate to DataStream V2 will > need > >> to rewrite their jobs anyway. > >> > >> Best regards, > >> > >> Weijie > >> > >> > >> Hangxiang Yu <master...@gmail.com> 于2024年3月12日周二 10:26写道: > >> > >>> Hi, Weijie. > >>> Thanks for your answer! > >>> > >>> > No, Introducing and declaring new state > >>> > at runtime is something we want to explicitly disallow. > >>> > >>> I just thinked about how some operators define their useState() when > >>> their > >>> real used states may be changed at runtime, e.g. different state types > >>> for > >>> different state sizes. > >>> So these operators only define all states they may use which could be > >>> explained by the caller, right ? > >>> > >>> On Mon, Mar 11, 2024 at 10:57 PM Maximilian Michels <m...@apache.org> > >>> wrote: > >>> > >>> > The FLIP mentions: "The contents described in this FLIP are all new > >>> > APIs and do not involve compatibility issues." > >>> > > >>> > In this thread it looks like the plan is to remove the old state > >>> > declaration API. I think we should consider keeping the old APIs to > >>> > avoid breaking too many jobs. The new APIs will still be beneficial > >>> > for new jobs, e.g. for SQL jobs. > >>> > > >>> > -Max > >>> > > >>> > On Fri, Mar 8, 2024 at 4:39 AM Zakelly Lan <zakelly....@gmail.com> > >>> wrote: > >>> > > > >>> > > Hi Weijie, > >>> > > > >>> > > Thanks for your answer! Well I get your point. Since partitions are > >>> > > first-class citizens, and redistribution means how states migrate > >>> when > >>> > > partitions change, I'd be fine with deemphasizing the concept of > >>> > > keyed/operator state if we highlight the definition of partition in > >>> the > >>> > > document. Keeping `RedistributionMode` under `StateDeclaration` is > >>> also > >>> > > fine with me, as I guess it is only for internal usage. > >>> > > But still, from a user's point of view, state can be characterized > >>> along > >>> > > two relatively independent dimensions, how states redistribute and > >>> the > >>> > data > >>> > > structure. Thus I still suggest a chained-like configuration API > that > >>> > > configures one aspect on each call, such as: > >>> > > ``` > >>> > > # Keyed stream, no redistribution mode specified, the state will go > >>> with > >>> > > partition (no redistribution). ---- Keyed state > >>> > > StateDeclaration a = States.declare(name).listState(type); > >>> > > > >>> > > # Keyed stream, redistribution strategy specified, the state > follows > >>> the > >>> > > specified redistribute strategy. ---- Operator state > >>> > > StateDeclaration b = > >>> > > States.declare(name).listState(type).redistributeBy(strategy); > >>> > > > >>> > > # Non-keyed stream, redistribution strategy *must be* specified. > >>> > > StateDeclaration c = > >>> > > States.declare(name).listState(type).redistributeBy(strategy); > >>> > > > >>> > > # Broadcast stream and state > >>> > > StateDeclaration d = States.declare(name).mapState(typeK, > >>> > > typeV).broadcast(); > >>> > > ``` > >>> > > It can drive users to think about redistribution issues when > needed. > >>> And > >>> > it > >>> > > also provides more flexibility to add more combinations such as > >>> > > broadcasting list state, or chain more configurable aspects such as > >>> > adding > >>> > > `withTtl()` in future. WDYT? > >>> > > > >>> > > > >>> > > Best, > >>> > > Zakelly > >>> > > > >>> > > On Thu, Mar 7, 2024 at 6:04 PM weijie guo < > guoweijieres...@gmail.com > >>> > > >>> > wrote: > >>> > > > >>> > > > Hi Jinzhong, > >>> > > > > >>> > > > Thanks for the reply! > >>> > > > > >>> > > > > Overall, I think that the “Eager State Declaration” is a good > >>> > proposal, > >>> > > > which can enhance Flink's state management capabilities and > provide > >>> > > > possibilities for subsequent state optimizations. > >>> > > > > >>> > > > It's nice to see that people who are familiar with the state > stuff > >>> like > >>> > > > this proposal. :) > >>> > > > > >>> > > > > When the user attempts to access an undeclared state at > >>> runtime, it > >>> > is > >>> > > > more reasonable to throw an exception rather than returning > >>> > Option#empty, > >>> > > > as Gyula mentioned above. > >>> > > > > >>> > > > Yes, I agree that this is better then a confused empty, and I > have > >>> > modified > >>> > > > the corresponding part of this FLIP. > >>> > > > > >>> > > > > In addition, I'm not quite sure whether all of the existing > >>> usage in > >>> > > > which states are registered at runtime dynamically can be > migrated > >>> to > >>> > the > >>> > > > "Eager State Declaration" style with minimal cost? > >>> > > > > >>> > > > I think for most user functions, this is fairly straightforward > to > >>> > migrate. > >>> > > > But states whose declarations depend on runtime information(e.g. > >>> > > > RuntimeContext) are, in principle, not supported in the new API. > >>> > Anyway, > >>> > > > the old and new apis are completely incompatible, so rewriting > >>> jobs is > >>> > > > inevitable. User can think about how to write a good process > >>> function > >>> > that > >>> > > > conforms to the eager declaration style. > >>> > > > > >>> > > > > For state TTL, should StateDeclaration also provide interfaces > >>> for > >>> > users > >>> > > > to declare state ttl? > >>> > > > > >>> > > > Of course, We can and we need to provide this one. But whether or > >>> not > >>> > it's > >>> > > > in this FLIP isn't very important for me, because we're mainly > >>> talking > >>> > > > about the general principles and ways of declaring and accessing > >>> state > >>> > in > >>> > > > this FLIP. I promise we won't leave it out in the end D). > >>> > > > > >>> > > > > >>> > > > > >>> > > > Best regards, > >>> > > > > >>> > > > Weijie > >>> > > > > >>> > > > > >>> > > > Jinzhong Li <lijinzhong2...@gmail.com> 于2024年3月7日周四 17:34写道: > >>> > > > > >>> > > > > Hi Weijie, > >>> > > > > > >>> > > > > Thanks for driving this! > >>> > > > > > >>> > > > > 1. Overall, I think that the “Eager State Declaration” is a > good > >>> > > > proposal, > >>> > > > > which can enhance Flink's state management capabilities and > >>> provide > >>> > > > > possibilities for subsequent state optimizations. > >>> > > > > > >>> > > > > 2. When the user attempts to access an undeclared state at > >>> runtime, > >>> > it is > >>> > > > > more reasonable to throw an exception rather than returning > >>> > Option#empty, > >>> > > > > as Gyula mentioned above. > >>> > > > > In addition, I'm not quite sure whether all of the existing > >>> usage in > >>> > > > which > >>> > > > > states are registered at runtime dynamically can be migrated to > >>> the > >>> > > > "Eager > >>> > > > > State Declaration" style with minimal cost? > >>> > > > > > >>> > > > > 3. For state TTL, should StateDeclaration also provide > >>> interfaces for > >>> > > > users > >>> > > > > to declare state ttl? > >>> > > > > > >>> > > > > Best, > >>> > > > > Jinzhong Li > >>> > > > > > >>> > > > > > >>> > > > > On Thu, Mar 7, 2024 at 5:08 PM weijie guo < > >>> guoweijieres...@gmail.com > >>> > > > >>> > > > > wrote: > >>> > > > > > >>> > > > > > Hi Hangxiang, > >>> > > > > > > >>> > > > > > Thanks for your reply! > >>> > > > > > > >>> > > > > > > We have also discussed in FLIP-359/FLINK-32658 about > >>> limiting the > >>> > > > user > >>> > > > > > operation to avoid creating state when processElement. Could > >>> > current > >>> > > > > > interfaces also help this? > >>> > > > > > > >>> > > > > > I think so. It is illegal to create state at runtime in our > >>> > proposal. > >>> > > > > > > >>> > > > > > > >>> > > > > > > Could you provide more examples about how useStates() > works ? > >>> > Since > >>> > > > > some > >>> > > > > > operations may change their used states at runtime, the value > >>> this > >>> > > > method > >>> > > > > > returns will be modified at runtime, right? > >>> > > > > > > >>> > > > > > > >>> > > > > > No, Introducing and declaring new state > >>> > > > > > at runtime is something we want to explicitly disallow. You > can > >>> > simply > >>> > > > > > assume that useState is only called when the JobGraph is > >>> generated, > >>> > > > > > and any future changes to it are invalid and illegal. > >>> > > > > > > >>> > > > > > > >>> > > > > > > IIUC, RedistributionMode/Strategy should not be used by > >>> users, > >>> > right > >>> > > > ? > >>> > > > > > If so, I'm +1 to move them to inner interfaces which seems a > >>> bit > >>> > > > > confusing > >>> > > > > > to users. > >>> > > > > > > >>> > > > > > > >>> > > > > > As for this question, I think my answer to Zakelly should be > >>> > helpful. > >>> > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > Best regards, > >>> > > > > > > >>> > > > > > Weijie > >>> > > > > > > >>> > > > > > > >>> > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年3月7日周四 16:58写道: > >>> > > > > > > >>> > > > > > > Hi Zakelly, > >>> > > > > > > > >>> > > > > > > Thanks for your reply! > >>> > > > > > > > >>> > > > > > > > My advice would be to conceal RedistributionMode/Strategy > >>> from > >>> > the > >>> > > > > > > standard user interface, particularly within the helper > class > >>> > > > 'State'. > >>> > > > > > But > >>> > > > > > > I'm OK to keep it in `StateDeclaration` since its > interfaces > >>> are > >>> > > > > > basically > >>> > > > > > > used by the framework. > >>> > > > > > > > >>> > > > > > > I'm sorry, I didn't mention some of the details/concepts > >>> > introduced > >>> > > > by > >>> > > > > > the Umbrella FLIP and FLIP-409 in this FLIP. This might make > it > >>> > hard to > >>> > > > > > understand the motivation behind > >>> > > > > > > RedistributionMode, I'll add more context in FLIP then. > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > Briefly, in V2, we explicitly define the concept of > >>> partition. > >>> > In the > >>> > > > > > case of KeyedPartitionStream, one key corresponds to one > >>> > partition.For > >>> > > > > > NonKeyedPartitionStream one parallelism/subtask corresponds > to > >>> one > >>> > > > > > partition. All states are considered to be confined within > the > >>> > > > partition. > >>> > > > > > On this basis, an obvious question is whether and how the > state > >>> > should > >>> > > > be > >>> > > > > > redistribution when the partition changes? So we divide the > >>> state > >>> > into > >>> > > > > > three categories: > >>> > > > > > > > >>> > > > > > > - Don't need to redistribute states when the partition > >>> > changes. > >>> > > > > > > - Has to decide how to distribute states when the > >>> partition > >>> > > > changes. > >>> > > > > > > - Always has the same state across different partitions. > >>> > > > > > > > >>> > > > > > > After introducing the concept of partition, the > >>> redistribution > >>> > > > > > pattern/mode of state is the more essential difference > between > >>> > states. > >>> > > > > For > >>> > > > > > this reason, we don't want to emphasize keyed/operator state > in > >>> > the V2 > >>> > > > > API > >>> > > > > > > any > >>> > > > > > > more. Keep in mind, partition are first-class citizens. > And, > >>> > even in > >>> > > > > V1, > >>> > > > > > we have to let the user know that split/union are two > different > >>> > > > > strategies > >>> > > > > > for list state. > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > As for whether or not to expose RedistributionMode to > users, > >>> I > >>> > have > >>> > > > an > >>> > > > > > open mind. But as I said just now, we still can't avoid this > >>> > problem in > >>> > > > > the > >>> > > > > > splitRedistributionListState and > unionRedistributionListState. > >>> IMO, > >>> > > > it's > >>> > > > > > better to explain it in the API level instead of avoiding it. > >>> WDTY? > >>> > > > > > > > >>> > > > > > > Best regards, > >>> > > > > > > > >>> > > > > > > Weijie > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年3月7日周四 > 16:39写道: > >>> > > > > > > > >>> > > > > > >> Hi Gyula, > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> Thanks for your reply! > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> Let me answer these questions: > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> > What is the semantics of the usesStates method? When is > it > >>> > called? > >>> > > > > Can > >>> > > > > > >> the used state change dynamically at runtime? Can the > logic > >>> > depend > >>> > > > on > >>> > > > > > something computed in open(..) for example? > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> useStates is used to predefine all the states that the > >>> process > >>> > > > > function > >>> > > > > > needs to access. In other words, we want to avoid declaring > the > >>> > state > >>> > > > > > dynamically at runtime and this allows the SQL planner and JM > >>> to > >>> > > > optimize > >>> > > > > > the job better. As a result, this logic must be fully > >>> available at > >>> > > > > compile > >>> > > > > > time (when the JobGraph is generated), so it can't rely on > >>> > computations > >>> > > > > > that are executed after deploy to TM. > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> > > >>> > > > > > >> Currently state access is pretty dynamic in Flink and I > >>> would > >>> > assume > >>> > > > > > many jobs create states on the fly based on some required > >>> logic. > >>> > Are we > >>> > > > > > planning to address these use-cases? > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> It depends on what type of context we need. If the type > and > >>> > number > >>> > > > of > >>> > > > > > states depend on runtime context, that's something we want to > >>> > avoid. If > >>> > > > > it > >>> > > > > > only depended on information available at compile time, I > >>> think we > >>> > > > could > >>> > > > > > support > >>> > > > > > >> it. > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> > > >>> > > > > > >> Are we planning to support deleting/dropping states that > >>> are not > >>> > > > > > required anymore? > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> We really don't want the user to be able to dynamically > >>> > > > declare/delete > >>> > > > > > a state at runtime, but if you just want to clear/clean the > >>> value > >>> > of > >>> > > > > state, > >>> > > > > > the new API works the same as the old API. > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> > I think if a state is not declared or otherwise cannot > be > >>> > > > accessed, > >>> > > > > an > >>> > > > > > >> exceptions must be thrown. We cannot confuse empty value > >>> with > >>> > > > > something > >>> > > > > > >> inaccessible. > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> After thinking about it a bit more, I think you have a > >>> point! > >>> > > > > > >> It's important to make a clear distinction between an > empty > >>> > state > >>> > > > and > >>> > > > > > illegal access, especially since flink currently discourage > >>> > setting a > >>> > > > > > non-null default value for the state. > >>> > > > > > >> I will modify the proposal as you suggested then :) > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> > The RedistributionMode enum sounds a bit strange to me, > >>> as it > >>> > > > > doesn't > >>> > > > > > >> actually specify a mode of redistribution. It feels more > >>> like a > >>> > > > flag. > >>> > > > > > Can > >>> > > > > > >> we simply have an Optional<RedistributionStrategy> > instead? > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> We actually define three types RedistributionMode instead > >>> of two > >>> > > > > because > >>> > > > > > >> we don't want to think of IDENTICAL as a redistribution > >>> > strategy, > >>> > > > it's > >>> > > > > > just > >>> > > > > > >> an invariant: the State of that type is always the same > >>> across > >>> > > > > > partitions. > >>> > > > > > >> If it only has None and REDISTRIBUTABLE, I think your > >>> proposal > >>> > is > >>> > > > > > >> feasible then. But we don't want to confuse these three > >>> > > > > semantics/modes. > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> > BroadcastStates are currently very limited by only > >>> Map-like > >>> > > > states, > >>> > > > > > and > >>> > > > > > >> the new interface also enforces that. Can we remove this > >>> > limitation? > >>> > > > > If > >>> > > > > > >> not, should broadcastState declaration extend mapstate > >>> > declaration? > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> Personally, I don't want to make this restriction. This is > >>> also > >>> > why > >>> > > > > the > >>> > > > > > method in StateManager to get BroadcastState has the > parameter > >>> of > >>> > > > > > BroadcastStateDeclaration instead of MapStateDeclaration. In > >>> the > >>> > > > future, > >>> > > > > if > >>> > > > > > the state backend supports other types of broadcast state, we > >>> can > >>> > add a > >>> > > > > > corresponding method to the States utility class to get the > >>> > > > > > BroadcastSateDeclaration. > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> Best regards, > >>> > > > > > >> > >>> > > > > > >> Weijie > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> Hangxiang Yu <master...@gmail.com> 于2024年3月7日周四 11:55写道: > >>> > > > > > >> > >>> > > > > > >>> Hi, Weijie. > >>> > > > > > >>> Thanks for your proposal. > >>> > > > > > >>> I'd like to start the discussion with some questions: > >>> > > > > > >>> 1. We have also discussed in FLIP-359/FLINK-32658 about > >>> > limiting > >>> > > > the > >>> > > > > > user > >>> > > > > > >>> operation to avoid creating state when processElement. > >>> Could > >>> > > > current > >>> > > > > > >>> interfaces also help this? > >>> > > > > > >>> > >>> > > > > > >>> 2. Could you provide more examples about how useStates() > >>> works > >>> > ? > >>> > > > > Since > >>> > > > > > >>> some > >>> > > > > > >>> operations may change their used states at runtime, the > >>> value > >>> > this > >>> > > > > > method > >>> > > > > > >>> returns will be modified at runtime, right ? > >>> > > > > > >>> If so, I'm thinking if we could get some deterministic > >>> State > >>> > > > > > Declaration > >>> > > > > > >>> Set before running which could help a lot for some state > >>> > operations > >>> > > > > > e.g. > >>> > > > > > >>> pre-check schema compatibility, queryable schema. > >>> > > > > > >>> > >>> > > > > > >>> 3. IIUC, RedistributionMode/Strategy should not be used > by > >>> > users, > >>> > > > > > right ? > >>> > > > > > >>> If so, I'm +1 to move them to inner interfaces which > seems > >>> a > >>> > bit > >>> > > > > > >>> confusing > >>> > > > > > >>> to users. > >>> > > > > > >>> > >>> > > > > > >>> On Thu, Mar 7, 2024 at 11:39 AM Zakelly Lan < > >>> > zakelly....@gmail.com > >>> > > > > > >>> > > > > > >>> wrote: > >>> > > > > > >>> > >>> > > > > > >>> > Hi Weijie, > >>> > > > > > >>> > > >>> > > > > > >>> > Thanks for proposing this! > >>> > > > > > >>> > > >>> > > > > > >>> > Unifying and optimizing state definitions is a very > good > >>> > thing. I > >>> > > > > > like > >>> > > > > > >>> the > >>> > > > > > >>> > idea of 'definition goes before using', so overall +1 > for > >>> > this > >>> > > > > > >>> proposal. > >>> > > > > > >>> > > >>> > > > > > >>> > However, I think the current definition is somewhat > >>> unclear. > >>> > > > From a > >>> > > > > > >>> user's > >>> > > > > > >>> > point of view, I believe that state can be > characterized > >>> > along > >>> > > > two > >>> > > > > > >>> > relatively independent axes: the scenario (keyed, > >>> non-keyed, > >>> > or > >>> > > > > > >>> broadcast) > >>> > > > > > >>> > and the data structure (single value, list, map). I > >>> recommend > >>> > > > that > >>> > > > > we > >>> > > > > > >>> fully > >>> > > > > > >>> > decouple these aspects, rather than linking the nature > >>> of the > >>> > > > > > >>> definition to > >>> > > > > > >>> > specific assumptions, such as equating broadcast states > >>> with > >>> > > > maps, > >>> > > > > or > >>> > > > > > >>> > considering list states could be non-keyed. > >>> > > > > > >>> > Furthermore, the concept of 'Redistribution' may > impose a > >>> > > > cognitive > >>> > > > > > >>> burden > >>> > > > > > >>> > on general users. My advice would be to conceal > >>> > > > > > >>> RedistributionMode/Strategy > >>> > > > > > >>> > from the standard user interface, particularly within > the > >>> > helper > >>> > > > > > class > >>> > > > > > >>> > 'State'. But I'm OK to keep it in `StateDeclaration` > >>> since > >>> > its > >>> > > > > > >>> interfaces > >>> > > > > > >>> > are basically used by the framework. My preferred > syntax > >>> > would > >>> > > > be: > >>> > > > > > >>> > ``` > >>> > > > > > >>> > StateDeclaration a = > >>> > State.declare(name).keyed().listState(type); > >>> > > > > > >>> > StateDeclaration b = > >>> > > > > State.declare(name).broadcast().mapState(typeK, > >>> > > > > > >>> > typeV); > >>> > > > > > >>> > StateDeclaration c = > >>> > > > > > State.declare(name).keyed().aggregatingState(type, > >>> > > > > > >>> > function); > >>> > > > > > >>> > ``` > >>> > > > > > >>> > WDYT? > >>> > > > > > >>> > > >>> > > > > > >>> > > >>> > > > > > >>> > Best, > >>> > > > > > >>> > Zakelly > >>> > > > > > >>> > > >>> > > > > > >>> > On Wed, Mar 6, 2024 at 11:04 PM Gyula Fóra < > >>> > gyula.f...@gmail.com > >>> > > > > > >>> > > > > > >>> wrote: > >>> > > > > > >>> > > >>> > > > > > >>> > > Hi Weijie! > >>> > > > > > >>> > > > >>> > > > > > >>> > > Thank you for the proposal. > >>> > > > > > >>> > > > >>> > > > > > >>> > > I have some initial questions to start the > discussion: > >>> > > > > > >>> > > > >>> > > > > > >>> > > 1. What is the semantics of the usesStates method? > >>> When is > >>> > it > >>> > > > > > >>> called? Can > >>> > > > > > >>> > > the used state change dynamically at runtime? Can the > >>> logic > >>> > > > > depend > >>> > > > > > on > >>> > > > > > >>> > > something computed in open(..) for example? > >>> > > > > > >>> > > > >>> > > > > > >>> > > Currently state access is pretty dynamic in Flink > and I > >>> > would > >>> > > > > > assume > >>> > > > > > >>> many > >>> > > > > > >>> > > jobs create states on the fly based on some required > >>> > logic. Are > >>> > > > > we > >>> > > > > > >>> > planning > >>> > > > > > >>> > > to address these use-cases? > >>> > > > > > >>> > > > >>> > > > > > >>> > > Are we planning to support deleting/dropping states > >>> that > >>> > are > >>> > > > not > >>> > > > > > >>> required > >>> > > > > > >>> > > anymore? > >>> > > > > > >>> > > > >>> > > > > > >>> > > 2. Get state now returns an optional, but you mention > >>> that: > >>> > > > > > >>> > > " If you want to get a state that is not declared or > >>> has no > >>> > > > > access, > >>> > > > > > >>> > > Option#empty is returned." > >>> > > > > > >>> > > > >>> > > > > > >>> > > I think if a state is not declared or otherwise > cannot > >>> be > >>> > > > > accessed, > >>> > > > > > >>> an > >>> > > > > > >>> > > exceptions must be thrown. We cannot confuse empty > >>> value > >>> > with > >>> > > > > > >>> something > >>> > > > > > >>> > > inaccessible. > >>> > > > > > >>> > > > >>> > > > > > >>> > > 3. The RedistributionMode enum sounds a bit strange > to > >>> me, > >>> > as > >>> > > > it > >>> > > > > > >>> doesn't > >>> > > > > > >>> > > actually specify a mode of redistribution. It feels > >>> more > >>> > like a > >>> > > > > > >>> flag. Can > >>> > > > > > >>> > > we simply have an Optional<RedistributionStrategy> > >>> instead? > >>> > > > > > >>> > > > >>> > > > > > >>> > > 4. BroadcastStates are currently very limited by only > >>> > Map-like > >>> > > > > > >>> states, > >>> > > > > > >>> > and > >>> > > > > > >>> > > the new interface also enforces that. > >>> > > > > > >>> > > Can we remove this limitation? If not, should > >>> > broadcastState > >>> > > > > > >>> declaration > >>> > > > > > >>> > > extend mapstate declaration? > >>> > > > > > >>> > > > >>> > > > > > >>> > > Cheers, > >>> > > > > > >>> > > Gyula > >>> > > > > > >>> > > > >>> > > > > > >>> > > Cheers > >>> > > > > > >>> > > Gyuka > >>> > > > > > >>> > > > >>> > > > > > >>> > > On Wed, Mar 6, 2024 at 11:18 AM weijie guo < > >>> > > > > > >>> guoweijieres...@gmail.com> > >>> > > > > > >>> > > wrote: > >>> > > > > > >>> > > > >>> > > > > > >>> > > > Hi devs, > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > I'd like to start a discussion about FLIP-433: > State > >>> > Access > >>> > > > on > >>> > > > > > >>> > > > DataStream API V2 > >>> > > > > > >>> > > > [1]. This is the third sub-FLIP of DataStream API > V2. > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > After FLIP-410 [2], we can already write a simple > >>> > stateless > >>> > > > job > >>> > > > > > >>> using > >>> > > > > > >>> > the > >>> > > > > > >>> > > > DataStream V2 API. But as we all know, stateful > >>> > computing is > >>> > > > > > >>> Flink's > >>> > > > > > >>> > > trump > >>> > > > > > >>> > > > card. In this FLIP, we will discuss how to declare > >>> and > >>> > access > >>> > > > > > >>> state on > >>> > > > > > >>> > > > DataStream API V2 and we manage to avoid some of > the > >>> > > > > shortcomings > >>> > > > > > >>> of V1 > >>> > > > > > >>> > > in > >>> > > > > > >>> > > > this regard. > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > You can find more details in this FLIP. Its > >>> relationship > >>> > with > >>> > > > > > other > >>> > > > > > >>> > > > sub-FLIPs can be found in the umbrella FLIP > >>> > > > > > >>> > > > [3]. Looking forward to hearing from you, thanks! > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > Best regards, > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > Weijie > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > [1] > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > >>> > > > > > >>> > > >>> > > > > > >>> > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-433%3A+State+Access+on+DataStream+API+V2 > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > [2] > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > >>> > > > > > >>> > > >>> > > > > > >>> > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2 > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > [3] > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > >>> > > > > > >>> > > >>> > > > > > >>> > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2 > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > >>> > > > > > >>> > > >>> > > > > > >>> > >>> > > > > > >>> > >>> > > > > > >>> -- > >>> > > > > > >>> Best, > >>> > > > > > >>> Hangxiang. > >>> > > > > > >>> > >>> > > > > > >> > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > >>> > >>> > >>> -- > >>> Best, > >>> Hangxiang. > >>> > >> >