Hi, It sounds strange.
In the second example aren’t you just setting the “name” and “uid” for the last “map” transformation? While you would like to set it for `unorderedWait` and `filter` as well? I guess you can check this out in your application logs. Can you check what are the actual uids that are being set and used? For the first example, I don’t know. Could it be an issue of Scala and some implicit magic/wrapping (since you are using JavaAsyncDataStream)? Maybe something adds some mapping operator to convert types? And the `uid` is assigned not to the AsyncWaitOperator? Also to be honest, the exception message doesn’t look like the one I would be expecting to see. It looks more like some issue with the state migration. Gordon, could you take a look? You also might be a bit more familiar with some quirks of setting the `uid`. Piotrek > On 31 Oct 2019, at 17:51, Bekir Oguz <bekir.o...@dpgmedia.nl> wrote: > > Hi Piotr, > > We missed this note from the release notes, but still surprised to hit this > bug in our implementation which conforms the workaround solution explained. > The weird behaviour is, we use this async stream in 2 different jobs. And in > TEST environment operator chaining disabled, in PROD enabled. > > Our first job looks like this: > val asyncStream = > JavaAsyncDataStream > .unorderedWait(stream.javaStream, new Enricher(config), 60, > TimeUnit.SECONDS) > .startNewChain() > .uid("Enricher_ID") > startNewChain() already sets the ChainingStrategy to HEAD (similar to the > bugfix for FLINK-13063) and we assign a unique UID which was suggested as a > workaround. > > Our second job looks like this: > AsyncDataStream > .unorderedWait( > source, > new EnricherFunction(config), > 60, > TimeUnit.SECONDS > ) > .filter(new EnricherFilter) > .map(_.get) > .name("EnricherCall") > .uid("EnricherCall_ID") > > In test environment, the first job could not restore from last checkpoint (we > started with no state to fix this), but the second job succeeded. > In prod environment, failure and the solution was the same for the first job. > But then the second job failed to restore its state (different behaviour than > test). Since this job has also a user state in a KeyedProcessFunction, > starting without state was not an option for us. We just tried to restart it > with "operator chaining disabled", and then surprisingly it worked. > > How can we explain this different behaviour of the second job in test and > prod? The only visible difference is the operator chaining config. > > Thanks in advance, > Bekir > > > > On Thu, 31 Oct 2019 at 09:44, Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> wrote: > Hi, > > (This question is more appropriate for the user mailing list, not dev - when > responding to my e-mail please remove dev mailing list from the recipients, > I’ve kept it just FYI that discussion has moved to user mailing list). > > Could it be, that the problem is caused by changes in chaining strategy of > the AsyncWaitOperator in 1.9, as explained in the release notes [1]? > > > AsyncIO > > Due to a bug in the AsyncWaitOperator, in 1.9.0 the default chaining > > behaviour of the operator is now changed so that it > > is never chained after another operator. This should not be problematic for > > migrating from older version snapshots as > > long as an uid was assigned to the operator. If an uid was not assigned to > > the operator, please see the instructions here [2] > > for a possible workaround. > > > > Related issues: > > > > • FLINK-13063: AsyncWaitOperator shouldn’t be releasing > > checkpointingLock [3] > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.9.html#asyncio > > <https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.9.html#asyncio> > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/upgrading.html#matching-operator-state > > <https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/upgrading.html#matching-operator-state> > [3] https://issues.apache.org/jira/browse/FLINK-13063 > <https://issues.apache.org/jira/browse/FLINK-13063> > >> On 30 Oct 2019, at 16:52, Bekir Oguz <bekir.o...@dpgmedia.nl >> <mailto:bekir.o...@dpgmedia.nl>> wrote: >> >> Hi guys, >> during our upgrade from 1.8.1 to 1.9.1, one of our jobs fail to start with >> the following exception. We deploy the job with 'allow-non-restored-state' >> option and from the latest checkpoint dir of the 1.8.1 version. >> >> org.apache.flink.util.StateMigrationException: The new state typeSerializer >> for operator state must not be incompatible. >> at org.apache.flink.runtime.state.DefaultOperatorStateBackend >> .getListState(DefaultOperatorStateBackend.java:323) >> at org.apache.flink.runtime.state.DefaultOperatorStateBackend >> .getListState(DefaultOperatorStateBackend.java:214) >> at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator >> .initializeState(AsyncWaitOperator.java:272) >> at org.apache.flink.streaming.api.operators.AbstractStreamOperator >> .initializeState(AbstractStreamOperator.java:281) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState( >> StreamTask.java:881) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >> .java:395) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >> at java.lang.Thread.run(Thread.java:748) >> >> We see from the Web UI that the 'async wait operator' is causing this, >> which is not changed at all during this upgrade. >> >> All other jobs are migrated without problems, only this one is failing. Has >> anyone else experienced this during migration? >> >> Regards, >> Bekir Oguz >