[ https://issues.apache.org/jira/browse/FLINK-9174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438306#comment-16438306 ]
ASF GitHub Bot commented on FLINK-9174: --------------------------------------- GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/5847 [FLINK-9174][datastream]Fix the type of state created in ProccessWindowFunction.proccess() is inconsistency ## What is the purpose of the change This PR fixes the type of state created in ```ProccessWindowFunction.proccess()``` is inconsistency. Problem detail, ```java context.windowState().getListState(); // return type is HeapListState or RocksDBListState context.globalState().getListState(); // return type is UserFacingListState ```` This cause the problem in the following code, ```java Iterable<T> iterableState = listState.get(); if (terableState.iterator().hasNext()) { for (T value : iterableState) { value.setRetracting(true); collector.collect(value); } state.clear(); } ``` If the listState is created from context.globalState() then it's fine, but when it created from context.windowState() this will cause NPE. I met this in 1.3.2 but I found it also affect 1.5.0. ## Brief change log - modify ```WindowOperator#PerWindowStateStore``` to ensure the type of state created from `context.windowState().createXXXState()` is consistency with the state created from `context.globalState().createXXXState()` ## Verifying this change - add a unit test in ``` WindowOperatorTest#testStateTypeIsConsistencyCreatedFromWindowStateAndGlobalState()``` to guard this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink fixStateTypeInconsistencyForWindowOperator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5847.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5847 ---- commit d92122575c150f7d0b6b7ed48824c490d4de0b64 Author: sihuazhou <summerleafs@...> Date: 2018-04-14T10:02:24Z Fix the type of state created from windowState & globalState inconsistency. ---- > The type of state created in ProccessWindowFunction.proccess() is > inconsistency > ------------------------------------------------------------------------------- > > Key: FLINK-9174 > URL: https://issues.apache.org/jira/browse/FLINK-9174 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.5.0 > Reporter: Sihua Zhou > Assignee: Sihua Zhou > Priority: Major > Fix For: 1.5.0 > > > The type of state created from windowState and globalState in > {{ProcessWindowFunction.process()}} is inconsistency. For detail, > {code} > context.windowState().getListState(); // return type is HeapListState or > RocksDBListState > context.globalState().getListState(); // return type is UserFacingListState > {code} > This cause the problem in the following code, > {code} > Iterable<T> iterableState = listState.get(); > if (terableState.iterator().hasNext()) { > for (T value : iterableState) { > value.setRetracting(true); > collector.collect(value); > } > state.clear(); > } > {code} > If the {{listState}} is created from {{context.globalState()}} is fine, but > when it created from {{context.windowState()}} this will cause NPE. I met > this in 1.3.2 but I found it also affect 1.5.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)