[ 
https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378304#comment-16378304
 ] 

ASF GitHub Bot commented on FLINK-8750:
---------------------------------------

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/5588

    [FLINK-8750][runtime] Improve detection of no remaining data after 
EndOfPartitionEvent

    Because of race condition between:
      1. releasing inputChannelsWithData lock in this method and reaching this 
place
      2. empty data notification that re-enqueues a channel
    we can end up with moreAvailable flag set to true, while we expect no more 
data.
        
    This commit detects such situation, makes a correct assertion and turn off 
moreAvailable flag.
    
    ## Verifying this change
    
    This bug could be reproduce by looping couple of thousand times 
`org.apache.flink.table.runtime.stream.table.JoinITCase`. 
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink f8750

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5588.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5588
    
----
commit 388d16118763dddff7d4c3593572169ad3e65c0d
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-23T10:37:37Z

    [hotfix][tests] Deduplicate code in SingleInputGateTest

commit e22a44b24ab1e9f02c236440f899a1f4dfdfc873
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-23T11:11:14Z

    [hotfix][runtime] Remove duplicated check

commit 85d98dee9bfc59fee660db934855014d6b73182e
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-23T10:20:21Z

    [FLINK-8760][runtime] Correctly propagate moreAvailable flag through 
SingleInputGate
    
    Previously if we SingleInputGate was re-eqnqueuing an input channel, 
isMoreAvailable
    might incorrectly return false. This might caused some dead locks.

commit 20e808053d365e66b3ebd21a10e7acda3a9ebdbd
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-23T10:27:54Z

    [hotfixu][tests] Do not hide original exception in 
SuccessAfterNetworkBuffersFailureITCase

commit 57f83c7747f192dfa1c98902676baedc3ccd1694
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-23T10:28:20Z

    [FLINK-8694][runtime] Fix notifyDataAvailable race condition
    
    Before there was a race condition that might resulted in igonoring some 
notifyDataAvailable calls.
    This fixes the problem by moving buffersAvailable handling to Supartitions 
and adds stress test
    for flushAlways (without this fix this test is dead locking).

commit 065be67f6d862ef35f2caa5a39773816385475b1
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-26T15:13:06Z

    [hotfix][runtime] Optimize EvenSerializer.isEvent method
    
    For example, previously if the method was used to check for 
EndOfPartitionEvent
    and the Buffer contained huge custom event, the even had to be deserialized 
before
    performing the actual check. Now we are quickly entering the correct 
if/else branch
    and doing full costly deserialization only if we have to.
    
    Other calls to isEvent() then checking against EndOfPartitionEvent were not 
used.

commit 626e10fc8e8b9ae148b82460117c090147961a4f
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-27T09:39:00Z

    [FLINK-8750][runtime] Improve detection of no remaining data after 
EndOfPartitionEvent
    
    Because of race condition between:
      1. releasing inputChannelsWithData lock in this method and reaching this 
place
      2. empty data notification that re-enqueues a channel
    we can end up with moreAvailable flag set to true, while we expect no more 
data.
    
    This commit detects such situation, makes a correct assertion and turn off 
moreAvailable flag.

----


> InputGate may contain data after an EndOfPartitionEvent
> -------------------------------------------------------
>
>                 Key: FLINK-8750
>                 URL: https://issues.apache.org/jira/browse/FLINK-8750
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: Nico Kruber
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates 
> that there was still some data after an {{EndOfPartitionEvent}} or that 
> {{BufferOrEvent#moreAvailable}} contained the wrong value:
> {code}
> testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase)  
> Time elapsed: 4.611 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: null
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173)
>       at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292)
>       at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to