Hello,


This has nothing to do with HA. All my unit tests involving a streaming app now 
fail in “infinite execution”
This simple code never ends :
    @Test
    public void testFlink163() throws Exception {
        // get the execution environment
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        // get input data
        final DataStreamSource<String> text = env.addSource(new 
SourceFunction<String>() {
            @Override
            public void run(final SourceContext<String> ctx) throws Exception {
                for (int count = 0; count < 5; count++) {
                    ctx.collect(String.valueOf(count));
                }
            }
            @Override
            public void cancel() {
            }
        });
        text.print().setParallelism(1);
        env.execute("Simple Test");
        // Never ends !
    }
Is this really a new feature or a critical bug?
In the log, the task executor is stopped
[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:330)
But execute() does not return.

Arnaud

Log is :
[2018-11-07 11:11:11,432] INFO Running job on local embedded Flink mini cluster 
(org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)
[2018-11-07 11:11:11,449] INFO Starting Flink Mini Cluster 
(org.apache.flink.runtime.minicluster.MiniCluster:227)
[2018-11-07 11:11:11,636] INFO Starting Metrics Registry 
(org.apache.flink.runtime.minicluster.MiniCluster:238)
[2018-11-07 11:11:11,652] INFO No metrics reporter configured, no metrics will 
be exposed/reported. (org.apache.flink.runtime.metrics.MetricRegistryImpl:113)
[2018-11-07 11:11:11,703] INFO Starting RPC Service(s) 
(org.apache.flink.runtime.minicluster.MiniCluster:249)
[2018-11-07 11:11:12,244] INFO Slf4jLogger started 
(akka.event.slf4j.Slf4jLogger:92)
[2018-11-07 11:11:12,264] INFO Starting high-availability services 
(org.apache.flink.runtime.minicluster.MiniCluster:290)
[2018-11-07 11:11:12,367] INFO Created BLOB server storage directory 
C:\Users\alinz\AppData\Local\Temp\blobStore-fd104a2d-caaf-4740-a762-d292cb2ed108
 (org.apache.flink.runtime.blob.BlobServer:141)
[2018-11-07 11:11:12,379] INFO Started BLOB server at 0.0.0.0:64504 - max 
concurrent requests: 50 - max backlog: 1000 
(org.apache.flink.runtime.blob.BlobServer:203)
[2018-11-07 11:11:12,380] INFO Starting ResourceManger 
(org.apache.flink.runtime.minicluster.MiniCluster:301)
[2018-11-07 11:11:12,409] INFO Starting RPC endpoint for 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 . 
(org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
[2018-11-07 11:11:12,432] INFO Proposing leadership to contender 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager@5b1f29fa @ 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 
(org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)
[2018-11-07 11:11:12,439] INFO ResourceManager 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 was 
granted leadership with fencing token 86394924fb97bad612b67f526f84406f 
(org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:953)
[2018-11-07 11:11:12,440] INFO Starting the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:185)
[2018-11-07 11:11:12,442] INFO Received confirmation of leadership for leader 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 , 
session=12b67f52-6f84-406f-8639-4924fb97bad6 
(org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)
[2018-11-07 11:11:12,452] INFO Created BLOB cache storage directory 
C:\Users\alinz\AppData\Local\Temp\blobStore-b2618f73-5ec6-4fdf-ad43-1da6d6c19a4f
 (org.apache.flink.runtime.blob.PermanentBlobCache:107)
[2018-11-07 11:11:12,454] INFO Created BLOB cache storage directory 
C:\Users\alinz\AppData\Local\Temp\blobStore-df6c61d2-3c51-4335-a96e-6b00c82e4d90
 (org.apache.flink.runtime.blob.TransientBlobCache:107)
[2018-11-07 11:11:12,454] INFO Starting 1 TaskManger(s) 
(org.apache.flink.runtime.minicluster.MiniCluster:316)
[2018-11-07 11:11:12,460] INFO Starting TaskManager with ResourceID: 
e84ce076-ec5e-48d6-90dc-4b18ba7c5757 
(org.apache.flink.runtime.taskexecutor.TaskManagerRunner:352)
[2018-11-07 11:11:12,531] INFO Temporary file directory 
'C:\Users\alinz\AppData\Local\Temp': total 476 GB, usable 149 GB (31,30% 
usable) (org.apache.flink.runtime.taskexecutor.TaskManagerServices:720)
[2018-11-07 11:11:12,757] INFO Allocated 396 MB for network buffer pool (number 
of memory segments: 12686, bytes per segment: 32768). 
(org.apache.flink.runtime.io.network.buffer.NetworkBufferPool:114)
[2018-11-07 11:11:12,765] INFO Could not load Queryable State Client Proxy. 
Probable reason: flink-queryable-state-runtime is not in the classpath. To 
enable Queryable State, please move the flink-queryable-state-runtime jar from 
the opt to the lib folder. 
(org.apache.flink.runtime.query.QueryableStateUtils:84)
[2018-11-07 11:11:12,765] INFO Could not load Queryable State Server. Probable 
reason: flink-queryable-state-runtime is not in the classpath. To enable 
Queryable State, please move the flink-queryable-state-runtime jar from the opt 
to the lib folder. (org.apache.flink.runtime.query.QueryableStateUtils:141)
[2018-11-07 11:11:12,766] INFO Starting the network environment and its 
components. (org.apache.flink.runtime.io.network.NetworkEnvironment:304)
[2018-11-07 11:11:12,768] WARN No hostname could be resolved for the IP address 
127.0.0.1, using IP address as host name. Local input split assignment (such as 
for HDFS files) may be impacted. 
(org.apache.flink.runtime.taskmanager.TaskManagerLocation:102)
[2018-11-07 11:11:12,769] INFO Limiting managed memory to 0.7 of the currently 
free heap space (2493 MB), memory will be allocated lazily. 
(org.apache.flink.runtime.taskexecutor.TaskManagerServices:331)
[2018-11-07 11:11:12,776] INFO I/O manager uses directory 
C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 
for spill files. (org.apache.flink.runtime.io.disk.iomanager.IOManager:95)
[2018-11-07 11:11:12,793] INFO Messages have a max timeout of 10000 ms 
(org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration:188)
[2018-11-07 11:11:12,803] INFO Starting RPC endpoint for 
org.apache.flink.runtime.taskexecutor.TaskExecutor at 
akka://flink/user/taskmanager_0 . 
(org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
[2018-11-07 11:11:12,813] INFO Start job leader service. 
(org.apache.flink.runtime.taskexecutor.JobLeaderService:118)
[2018-11-07 11:11:12,814] INFO Connecting to ResourceManager 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f).
 (org.apache.flink.runtime.taskexecutor.TaskExecutor:904)
[2018-11-07 11:11:12,814] INFO User file cache uses directory 
C:\Users\alinz\AppData\Local\Temp\flink-dist-cache-648ab4eb-f39c-4262-a3cc-07adfa6e5b43
 (org.apache.flink.runtime.filecache.FileCache:107)
[2018-11-07 11:11:12,815] INFO Starting dispatcher rest endpoint. 
(org.apache.flink.runtime.minicluster.MiniCluster:327)
[2018-11-07 11:11:12,845] INFO Resolved ResourceManager address, beginning 
registration (org.apache.flink.runtime.taskexecutor.TaskExecutor:201)
[2018-11-07 11:11:12,846] INFO Registration at ResourceManager attempt 1 
(timeout=100ms) (org.apache.flink.runtime.taskexecutor.TaskExecutor:250)
[2018-11-07 11:11:12,853] INFO Registering TaskManager with ResourceID 
e84ce076-ec5e-48d6-90dc-4b18ba7c5757 (akka://flink/user/taskmanager_0) at 
ResourceManager 
(org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:727)
[2018-11-07 11:11:12,855] INFO Successful registration at resource manager 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 under 
registration id 8098d3fe3fe83133051c3bb97bf96d37. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:94)
[2018-11-07 11:11:12,877] INFO Starting rest endpoint. 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:128)
[2018-11-07 11:11:13,168] WARN Log file environment variable 'log.file' is not 
set. (org.apache.flink.runtime.webmonitor.WebMonitorUtils:95)
[2018-11-07 11:11:13,169] WARN JobManager log files are unavailable in the web 
dashboard. Log file location not found in environment variable 'log.file' or 
configuration key 'Key: 'web.log.path' , default: null (deprecated keys: 
[jobmanager.web.log.path])'. 
(org.apache.flink.runtime.webmonitor.WebMonitorUtils:101)
[2018-11-07 11:11:13,195] INFO Failed to load web based job submission 
extension. Probable reason: flink-runtime-web is not in the classpath. 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:115)
[2018-11-07 11:11:13,514] INFO Rest endpoint listening at localhost:64523 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:200)
[2018-11-07 11:11:13,514] INFO Proposing leadership to contender 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint@1f86099a @ 
http://localhost:64523 
(org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)
[2018-11-07 11:11:13,514] INFO Starting job dispatcher(s) for JobManger 
(org.apache.flink.runtime.minicluster.MiniCluster:364)
[2018-11-07 11:11:13,514] INFO http://localhost:64523 was granted leadership 
with leaderSessionID=940dc860-bbcb-4e35-8255-59702b220383 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:757)
[2018-11-07 11:11:13,515] INFO Received confirmation of leadership for leader 
http://localhost:64523 , session=940dc860-bbcb-4e35-8255-59702b220383 
(org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)
[2018-11-07 11:11:13,523] INFO Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a . 
(org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
[2018-11-07 11:11:13,537] INFO Proposing leadership to contender 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher@27e32fe4 @ 
akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a 
(org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)
[2018-11-07 11:11:13,538] INFO Flink Mini Cluster started successfully 
(org.apache.flink.runtime.minicluster.MiniCluster:410)
[2018-11-07 11:11:13,538] INFO Dispatcher 
akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a was granted 
leadership with fencing token d7ae33ab-20bb-4598-bc6d-b7a57ca66860 
(org.apache.flink.runtime.dispatcher.StandaloneDispatcher:818)
[2018-11-07 11:11:13,549] INFO Recovering all persisted jobs. 
(org.apache.flink.runtime.dispatcher.StandaloneDispatcher:658)
[2018-11-07 11:11:13,550] INFO Received confirmation of leadership for leader 
akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a , 
session=d7ae33ab-20bb-4598-bc6d-b7a57ca66860 
(org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)
[2018-11-07 11:11:13,566] INFO Submitting job 0ef8697ca98f6d2b565ed928d17c8a49 
(Simple Test). (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:247)
[2018-11-07 11:11:13,592] INFO Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_1 
. (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
[2018-11-07 11:11:13,618] INFO Initializing job Simple Test 
(0ef8697ca98f6d2b565ed928d17c8a49). 
(org.apache.flink.runtime.jobmaster.JobMaster:269)
[2018-11-07 11:11:13,623] INFO Using restart strategy NoRestartStrategy for 
Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). 
(org.apache.flink.runtime.jobmaster.JobMaster:280)
[2018-11-07 11:11:13,629] INFO Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.slotpool.SlotPool at 
akka://flink/user/96fe7c78-28fe-484f-ae16-dcd1d4bc2c6b . 
(org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
[2018-11-07 11:11:13,654] INFO Job recovers via failover strategy: full graph 
restart (org.apache.flink.runtime.executiongraph.ExecutionGraph:425)
[2018-11-07 11:11:13,689] INFO Running initialization on master for job Simple 
Test (0ef8697ca98f6d2b565ed928d17c8a49). 
(org.apache.flink.runtime.jobmaster.JobMaster:195)
[2018-11-07 11:11:13,689] INFO Successfully ran initialization on master in 0 
ms. (org.apache.flink.runtime.jobmaster.JobMaster:224)
[2018-11-07 11:11:13,722] INFO No state backend has been configured, using 
default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
asynchronous: TRUE, maxStateSize: 5242880) 
(org.apache.flink.runtime.jobmaster.JobMaster:230)
[2018-11-07 11:11:13,740] INFO Proposing leadership to contender 
org.apache.flink.runtime.jobmaster.JobManagerRunner@34c7d1b6 @ 
akka://flink/user/jobmanager_1 
(org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)
[2018-11-07 11:11:13,740] INFO JobManager runner for job Simple Test 
(0ef8697ca98f6d2b565ed928d17c8a49) was granted leadership with session id 
56e8324b-0015-4464-b6c7-ba0accdcec2a at akka://flink/user/jobmanager_1. 
(org.apache.flink.runtime.jobmaster.JobManagerRunner:329)
[2018-11-07 11:11:13,743] INFO Starting execution of job Simple Test 
(0ef8697ca98f6d2b565ed928d17c8a49) 
(org.apache.flink.runtime.jobmaster.JobMaster:1009)
[2018-11-07 11:11:13,744] INFO Job Simple Test 
(0ef8697ca98f6d2b565ed928d17c8a49) switched from state CREATED to RUNNING. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)
[2018-11-07 11:11:13,747] INFO Source: Custom Source -> Sink: Print to Std. Out 
(1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to SCHEDULED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)
[2018-11-07 11:11:13,753] INFO Connecting to ResourceManager 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f)
 (org.apache.flink.runtime.jobmaster.JobMaster:1285)
[2018-11-07 11:11:13,754] INFO Received confirmation of leadership for leader 
akka://flink/user/jobmanager_1 , session=56e8324b-0015-4464-b6c7-ba0accdcec2a 
(org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)
[2018-11-07 11:11:13,757] INFO Resolved ResourceManager address, beginning 
registration (org.apache.flink.runtime.jobmaster.JobMaster:201)
[2018-11-07 11:11:13,758] INFO Registration at ResourceManager attempt 1 
(timeout=100ms) (org.apache.flink.runtime.jobmaster.JobMaster:250)
[2018-11-07 11:11:13,763] INFO Cannot serve slot request, no ResourceManager 
connected. Adding as pending request 
[SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] 
(org.apache.flink.runtime.jobmaster.slotpool.SlotPool:733)
[2018-11-07 11:11:13,767] INFO Registering job manager 
b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job 
0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:298)
[2018-11-07 11:11:13,771] INFO Registered job manager 
b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job 
0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:672)
[2018-11-07 11:11:13,772] INFO JobManager successfully registered at 
ResourceManager, leader id: 86394924fb97bad612b67f526f84406f. 
(org.apache.flink.runtime.jobmaster.JobMaster:1307)
[2018-11-07 11:11:13,773] INFO Requesting new slot 
[SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] and profile 
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager. 
(org.apache.flink.runtime.jobmaster.slotpool.SlotPool:689)
[2018-11-07 11:11:13,776] INFO Request slot with profile 
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=0} for job 
0ef8697ca98f6d2b565ed928d17c8a49 with allocation id 
AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. 
(org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:422)
[2018-11-07 11:11:13,778] INFO Receive slot request 
AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a} for job 
0ef8697ca98f6d2b565ed928d17c8a49 from resource manager with leader id 
86394924fb97bad612b67f526f84406f. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:743)
[2018-11-07 11:11:13,779] INFO Allocated slot for 
AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:755)
[2018-11-07 11:11:13,779] INFO Add job 0ef8697ca98f6d2b565ed928d17c8a49 for job 
leader monitoring. (org.apache.flink.runtime.taskexecutor.JobLeaderService:186)
[2018-11-07 11:11:13,781] INFO Try to register at job manager 
akka://flink/user/jobmanager_1 with leader id 
56e8324b-0015-4464-b6c7-ba0accdcec2a. 
(org.apache.flink.runtime.taskexecutor.JobLeaderService:326)
[2018-11-07 11:11:13,782] INFO Resolved JobManager address, beginning 
registration (org.apache.flink.runtime.taskexecutor.JobLeaderService:201)
[2018-11-07 11:11:13,782] INFO Registration at JobManager attempt 1 
(timeout=100ms) (org.apache.flink.runtime.taskexecutor.JobLeaderService:250)
[2018-11-07 11:11:13,785] INFO Successful registration at job manager 
akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.taskexecutor.JobLeaderService:374)
[2018-11-07 11:11:13,786] INFO Establish JobManager connection for job 
0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:1137)
[2018-11-07 11:11:13,789] INFO Offer reserved slots to the leader of job 
0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:1042)
[2018-11-07 11:11:13,794] INFO Source: Custom Source -> Sink: Print to Std. Out 
(1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from SCHEDULED to DEPLOYING. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)
[2018-11-07 11:11:13,794] INFO Deploying Source: Custom Source -> Sink: Print 
to Std. Out (1/1) (attempt #0) to 127.0.0.1 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:576)
[2018-11-07 11:11:13,819] INFO Received task Source: Custom Source -> Sink: 
Print to Std. Out (1/1). 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:541)
[2018-11-07 11:11:13,820] INFO Activate slot 
AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. 
(org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable:237)
[2018-11-07 11:11:13,820] INFO Source: Custom Source -> Sink: Print to Std. Out 
(1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to DEPLOYING. 
(org.apache.flink.runtime.taskmanager.Task:915)
[2018-11-07 11:11:13,820] INFO Creating FileSystem stream leak safety net for 
task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
(07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING] 
(org.apache.flink.runtime.taskmanager.Task:579)
[2018-11-07 11:11:13,828] INFO Loading JAR files for task Source: Custom Source 
-> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) 
[DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:586)
[2018-11-07 11:11:13,829] INFO Registering task at network: Source: Custom 
Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) 
[DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:612)
[2018-11-07 11:11:13,836] INFO Source: Custom Source -> Sink: Print to Std. Out 
(1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. 
(org.apache.flink.runtime.taskmanager.Task:915)
[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. Out 
(1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)
[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
asynchronous: TRUE, maxStateSize: 5242880) 
(org.apache.flink.streaming.runtime.tasks.StreamTask:230)
0
1
2
3
4
[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. Out 
(1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.taskmanager.Task:915)
[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom Source 
-> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
(org.apache.flink.runtime.taskmanager.Task:818)
[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed for 
task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
(07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
(org.apache.flink.runtime.taskmanager.Task:845)
[2018-11-07 11:11:13,899] INFO Un-registering task and sending final execution 
state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to 
Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)
[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. Out 
(1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)
[2018-11-07 11:11:13,907] INFO Job Simple Test 
(0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)
[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)
[2018-11-07 11:11:13,908] INFO Shutting down 
(org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)
[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
(org.apache.flink.runtime.minicluster.MiniCluster:427)
[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)
[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:291)
[2018-11-07 11:11:23,583] INFO Shutting down 
TaskExecutorLocalStateStoresManager. 
(org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)
[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 
(org.apache.flink.runtime.io.disk.iomanager.IOManager:110)
[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its 
components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)
[2018-11-07 11:11:23,591] INFO Removing cache directory 
C:\Users\alinz\AppData\Local\Temp\flink-web-ui 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)
[2018-11-07 11:11:23,593] INFO Closing the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)
[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)
[2018-11-07 11:11:23,596] INFO Close ResourceManager connection 
cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new address 
null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)
[2018-11-07 11:11:23,607] INFO Stop job leader service. 
(org.apache.flink.runtime.taskexecutor.JobLeaderService:135)
[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:330)



De : LINZ, Arnaud
Envoyé : mardi 6 novembre 2018 14:23
À : user <user@flink.apache.org>
Objet : Stopping a streaming app from its own code : behaviour change from 1.3 
to 1.6


Hello,



In flink 1.3, I was able to make a clean stop of a HA streaming application 
just by ending the source “run()” method (with an ending condition).

I try to update my code to flink 1.6.2, but that is no longer working.



Even if there are no sources and no item to process, the cluster continue its 
execution forever, with an infinite number of such messages:

Checkpoint triggering task Source: Custom Source (1/2) of job 
3b286f5344c50f0e466bb8ee79a2bb69 is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.



Why has this behavior changed? How am I supposed to stop a streaming execution 
from its own code now? Is https://issues.apache.org/jira/browse/FLINK-2111 of 
any use?



Thanks,

Arnaud



________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.

Reply via email to