Hi,

It sounds strange. 

In the second example aren’t you just setting the “name” and “uid” for the last 
“map” transformation? While you would like to set it for `unorderedWait` and 
`filter` as well? I guess you can check this out in your application logs. Can 
you check what are the actual uids that are being set and used?

For the first example, I don’t know. Could it be an issue of Scala and some 
implicit magic/wrapping (since you are using JavaAsyncDataStream)? Maybe 
something adds some mapping operator to convert types? And the `uid`  is 
assigned not to the AsyncWaitOperator? 

Also to be honest, the exception message doesn’t look like the one I would be 
expecting to see. It looks more like some issue with the state migration. 
Gordon, could you take a look? You also might be a bit more familiar with some 
quirks of setting the `uid`.

Piotrek

> On 31 Oct 2019, at 17:51, Bekir Oguz <bekir.o...@dpgmedia.nl> wrote:
> 
> Hi Piotr,
> 
> We missed this note from the release notes, but still surprised to hit this 
> bug in our implementation which conforms the workaround solution explained.
> The weird behaviour is, we use this async stream in 2 different jobs. And in 
> TEST environment operator chaining disabled, in PROD enabled.
> 
> Our first job looks like this:
> val asyncStream =
>   JavaAsyncDataStream
>     .unorderedWait(stream.javaStream, new Enricher(config), 60, 
> TimeUnit.SECONDS)
>     .startNewChain()
>     .uid("Enricher_ID")
> startNewChain() already sets the ChainingStrategy to HEAD (similar to the 
> bugfix for FLINK-13063) and we assign a unique UID which was suggested as a 
> workaround. 
> 
> Our second job looks like this:
> AsyncDataStream
>   .unorderedWait(
>     source,
>     new EnricherFunction(config),
>     60,
>     TimeUnit.SECONDS
>   )
>   .filter(new EnricherFilter)
>   .map(_.get)
>   .name("EnricherCall")
>   .uid("EnricherCall_ID")
> 
> In test environment, the first job could not restore from last checkpoint (we 
> started with no state to fix this), but the second job succeeded.
> In prod environment, failure and the solution was the same for the first job. 
> But then the second job failed to restore its state (different behaviour than 
> test). Since this job has also a user state in a KeyedProcessFunction, 
> starting without state was not an option for us. We just tried to restart it 
> with "operator chaining disabled", and then surprisingly it worked.
> 
> How can we explain this different behaviour of the second job in test and 
> prod? The only visible difference is the operator chaining config.
> 
> Thanks in advance,
> Bekir
> 
> 
> 
> On Thu, 31 Oct 2019 at 09:44, Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>> wrote:
> Hi,
> 
> (This question is more appropriate for the user mailing list, not dev - when 
> responding to my e-mail please remove dev mailing list from the recipients, 
> I’ve kept it just FYI that discussion has moved to user mailing list).
> 
> Could it be, that the problem is caused by changes in chaining strategy of 
> the AsyncWaitOperator in 1.9, as explained in the release notes [1]?
> 
> > AsyncIO
> > Due to a bug in the AsyncWaitOperator, in 1.9.0 the default chaining 
> > behaviour of the operator is now changed so that it 
> > is never chained after another operator. This should not be problematic for 
> > migrating from older version snapshots as 
> > long as an uid was assigned to the operator. If an uid was not assigned to 
> > the operator, please see the instructions here [2]
> > for a possible workaround.
> >
> > Related issues:
> >
> >     • FLINK-13063: AsyncWaitOperator shouldn’t be releasing 
> > checkpointingLock [3]
> 
> Piotrek
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.9.html#asyncio
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.9.html#asyncio>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/upgrading.html#matching-operator-state
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/upgrading.html#matching-operator-state>
> [3] https://issues.apache.org/jira/browse/FLINK-13063 
> <https://issues.apache.org/jira/browse/FLINK-13063>
> 
>> On 30 Oct 2019, at 16:52, Bekir Oguz <bekir.o...@dpgmedia.nl 
>> <mailto:bekir.o...@dpgmedia.nl>> wrote:
>> 
>> Hi guys,
>> during our upgrade from 1.8.1 to 1.9.1, one of our jobs fail to start with
>> the following exception. We deploy the job with 'allow-non-restored-state'
>> option and from the latest checkpoint dir of the 1.8.1 version.
>> 
>> org.apache.flink.util.StateMigrationException: The new state typeSerializer
>> for operator state must not be incompatible.
>>    at org.apache.flink.runtime.state.DefaultOperatorStateBackend
>> .getListState(DefaultOperatorStateBackend.java:323)
>>    at org.apache.flink.runtime.state.DefaultOperatorStateBackend
>> .getListState(DefaultOperatorStateBackend.java:214)
>>    at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
>> .initializeState(AsyncWaitOperator.java:272)
>>    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> .initializeState(AbstractStreamOperator.java:281)
>>    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(
>> StreamTask.java:881)
>>    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> .java:395)
>>    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>    at java.lang.Thread.run(Thread.java:748)
>> 
>> We see from the Web UI that the 'async wait operator' is causing this,
>> which is not changed at all during this upgrade.
>> 
>> All other jobs are migrated without problems, only this one is failing. Has
>> anyone else experienced this during migration?
>> 
>> Regards,
>> Bekir Oguz
> 

Reply via email to