[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694375#comment-16694375 ]
Arnaud Linz commented on FLINK-10832: ------------------------------------- Sorry for the long reply, I did not get the post notification. Your question made me think that you can't reproduce this issue, so I've made a minimal project and I could not reproduce it either. So it's probably a jar conflict with one of my project dependencies. Based on this, I will conduct further investigations to identify the conflict and keep you informed. > StreamExecutionEnvironment.execute() does not return when all sources end > ------------------------------------------------------------------------- > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core > Affects Versions: 1.5.5, 1.6.2 > Reporter: Arnaud Linz > Priority: Critical > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource<String> text = env.addSource(*new* > +SourceFunction<String>()+ { > @Override > *public* *void* run(*final* SourceContext<String> ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} > {{[2018-11-07 11:11:23,583] INFO Shutting down > TaskExecutorLocalStateStoresManager. > (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} > {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory > C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 > (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}} > {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and > its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}} > {{[2018-11-07 11:11:23,591] INFO Removing cache directory > C:\Users\alinz\AppData\Local\Temp\flink-web-ui > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}} > {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. > (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}} > {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. > (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}} > {{[2018-11-07 11:11:23,596] INFO Close ResourceManager connection > cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new > address null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)}} > {{[2018-11-07 11:11:23,607] INFO Stop job leader service. > (org.apache.flink.runtime.taskexecutor.JobLeaderService:135)}} > {{[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)