Hi Arnaud, Thanks for reporting the issue!
Best, Fabian Am Do., 8. Nov. 2018 um 20:50 Uhr schrieb LINZ, Arnaud < al...@bouyguestelecom.fr>: > 1. FLINK-10832 <https://issues.apache.org/jira/browse/FLINK-10832> > > Created (with heavy difficulties as typing java code in a jira description > was an awful experience J) > > > > > > *De :* LINZ, Arnaud > *Envoyé :* mercredi 7 novembre 2018 11:43 > *À :* 'user' <user@flink.apache.org> > *Objet :* RE: Stopping a streaming app from its own code : behaviour > change from 1.3 to 1.6 > > > > FYI, the code below ends with version 1.6.0, do not end in 1.6.1. I > suspect it’s a bug instead of a new feature. > > > > *De :* LINZ, Arnaud > *Envoyé :* mercredi 7 novembre 2018 11:14 > *À :* 'user' <user@flink.apache.org> > *Objet :* RE: Stopping a streaming app from its own code : behaviour > change from 1.3 to 1.6 > > > > Hello, > > > > This has nothing to do with HA. All my unit tests involving a streaming > app now fail in “infinite execution” > > This simple code never ends : > > @Test > > *public* *void* testFlink162() *throws* Exception { > > // get the execution environment > > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment.*getExecutionEnvironment*(); > > // get input data > > *final* DataStreamSource<String> text = env.addSource(*new* > *SourceFunction<String>()* { > > @Override > > *public* *void* run(*final* SourceContext<String> ctx) > *throws* Exception { > > *for* (*int* count = 0; count < 5; count++) { > > ctx.collect(String.*valueOf*(count)); > > } > > } > > @Override > > *public* *void* cancel() { > > } > > }); > > text.print().setParallelism(1); > > env.execute("Simple Test"); > > // Never ends ! > > } > > Is this really a new feature or a critical bug? > > In the log, the task executor is stopped > > [2018-11-07 11:11:23,608] INFO Stopped TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:330) > > But execute() does not return. > > > > Arnaud > > > > Log is : > > [2018-11-07 11:11:11,432] INFO Running job on local embedded Flink mini > cluster > (org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114) > > [2018-11-07 11:11:11,449] INFO Starting Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:227) > > [2018-11-07 11:11:11,636] INFO Starting Metrics Registry > (org.apache.flink.runtime.minicluster.MiniCluster:238) > > [2018-11-07 11:11:11,652] INFO No metrics reporter configured, no metrics > will be exposed/reported. > (org.apache.flink.runtime.metrics.MetricRegistryImpl:113) > > [2018-11-07 11:11:11,703] INFO Starting RPC Service(s) > (org.apache.flink.runtime.minicluster.MiniCluster:249) > > [2018-11-07 11:11:12,244] INFO Slf4jLogger started > (akka.event.slf4j.Slf4jLogger:92) > > [2018-11-07 11:11:12,264] INFO Starting high-availability services > (org.apache.flink.runtime.minicluster.MiniCluster:290) > > [2018-11-07 11:11:12,367] INFO Created BLOB server storage directory > C:\Users\alinz\AppData\Local\Temp\blobStore-fd104a2d-caaf-4740-a762-d292cb2ed108 > (org.apache.flink.runtime.blob.BlobServer:141) > > [2018-11-07 11:11:12,379] INFO Started BLOB server at 0.0.0.0:64504 - max > concurrent requests: 50 - max backlog: 1000 > (org.apache.flink.runtime.blob.BlobServer:203) > > [2018-11-07 11:11:12,380] INFO Starting ResourceManger > (org.apache.flink.runtime.minicluster.MiniCluster:301) > > [2018-11-07 11:11:12,409] INFO Starting RPC endpoint for > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at > akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 . > (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224) > > [2018-11-07 11:11:12,432] INFO Proposing leadership to contender > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager@5b1f29fa > @ akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 > (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274) > > [2018-11-07 11:11:12,439] INFO ResourceManager > akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 was > granted leadership with fencing token 86394924fb97bad612b67f526f84406f > (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:953) > > [2018-11-07 11:11:12,440] INFO Starting the SlotManager. > (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:185) > > [2018-11-07 11:11:12,442] INFO Received confirmation of leadership for > leader > akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 , > session=12b67f52-6f84-406f-8639-4924fb97bad6 > (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229) > > [2018-11-07 11:11:12,452] INFO Created BLOB cache storage directory > C:\Users\alinz\AppData\Local\Temp\blobStore-b2618f73-5ec6-4fdf-ad43-1da6d6c19a4f > (org.apache.flink.runtime.blob.PermanentBlobCache:107) > > [2018-11-07 11:11:12,454] INFO Created BLOB cache storage directory > C:\Users\alinz\AppData\Local\Temp\blobStore-df6c61d2-3c51-4335-a96e-6b00c82e4d90 > (org.apache.flink.runtime.blob.TransientBlobCache:107) > > [2018-11-07 11:11:12,454] INFO Starting 1 TaskManger(s) > (org.apache.flink.runtime.minicluster.MiniCluster:316) > > [2018-11-07 11:11:12,460] INFO Starting TaskManager with ResourceID: > e84ce076-ec5e-48d6-90dc-4b18ba7c5757 > (org.apache.flink.runtime.taskexecutor.TaskManagerRunner:352) > > [2018-11-07 11:11:12,531] INFO Temporary file directory > 'C:\Users\alinz\AppData\Local\Temp': total 476 GB, usable 149 GB (31,30% > usable) (org.apache.flink.runtime.taskexecutor.TaskManagerServices:720) > > [2018-11-07 11:11:12,757] INFO Allocated 396 MB for network buffer pool > (number of memory segments: 12686, bytes per segment: 32768). > (org.apache.flink.runtime.io.network.buffer.NetworkBufferPool:114) > > [2018-11-07 11:11:12,765] INFO Could not load Queryable State Client > Proxy. Probable reason: flink-queryable-state-runtime is not in the > classpath. To enable Queryable State, please move the > flink-queryable-state-runtime jar from the opt to the lib folder. > (org.apache.flink.runtime.query.QueryableStateUtils:84) > > [2018-11-07 11:11:12,765] INFO Could not load Queryable State Server. > Probable reason: flink-queryable-state-runtime is not in the classpath. To > enable Queryable State, please move the flink-queryable-state-runtime jar > from the opt to the lib folder. > (org.apache.flink.runtime.query.QueryableStateUtils:141) > > [2018-11-07 11:11:12,766] INFO Starting the network environment and its > components. (org.apache.flink.runtime.io.network.NetworkEnvironment:304) > > [2018-11-07 11:11:12,768] WARN No hostname could be resolved for the IP > address 127.0.0.1, using IP address as host name. Local input split > assignment (such as for HDFS files) may be impacted. > (org.apache.flink.runtime.taskmanager.TaskManagerLocation:102) > > [2018-11-07 11:11:12,769] INFO Limiting managed memory to 0.7 of the > currently free heap space (2493 MB), memory will be allocated lazily. > (org.apache.flink.runtime.taskexecutor.TaskManagerServices:331) > > [2018-11-07 11:11:12,776] INFO I/O manager uses directory > C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 > for spill files. (org.apache.flink.runtime.io.disk.iomanager.IOManager:95) > > [2018-11-07 11:11:12,793] INFO Messages have a max timeout of 10000 ms > (org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration:188) > > [2018-11-07 11:11:12,803] INFO Starting RPC endpoint for > org.apache.flink.runtime.taskexecutor.TaskExecutor at > akka://flink/user/taskmanager_0 . > (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224) > > [2018-11-07 11:11:12,813] INFO Start job leader service. > (org.apache.flink.runtime.taskexecutor.JobLeaderService:118) > > [2018-11-07 11:11:12,814] INFO Connecting to ResourceManager > akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f). > (org.apache.flink.runtime.taskexecutor.TaskExecutor:904) > > [2018-11-07 11:11:12,814] INFO User file cache uses directory > C:\Users\alinz\AppData\Local\Temp\flink-dist-cache-648ab4eb-f39c-4262-a3cc-07adfa6e5b43 > (org.apache.flink.runtime.filecache.FileCache:107) > > [2018-11-07 11:11:12,815] INFO Starting dispatcher rest endpoint. > (org.apache.flink.runtime.minicluster.MiniCluster:327) > > [2018-11-07 11:11:12,845] INFO Resolved ResourceManager address, beginning > registration (org.apache.flink.runtime.taskexecutor.TaskExecutor:201) > > [2018-11-07 11:11:12,846] INFO Registration at ResourceManager attempt 1 > (timeout=100ms) (org.apache.flink.runtime.taskexecutor.TaskExecutor:250) > > [2018-11-07 11:11:12,853] INFO Registering TaskManager with ResourceID > e84ce076-ec5e-48d6-90dc-4b18ba7c5757 (akka://flink/user/taskmanager_0) at > ResourceManager > (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:727) > > [2018-11-07 11:11:12,855] INFO Successful registration at resource manager > akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 > under registration id 8098d3fe3fe83133051c3bb97bf96d37. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:94) > > [2018-11-07 11:11:12,877] INFO Starting rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:128) > > [2018-11-07 11:11:13,168] WARN Log file environment variable 'log.file' is > not set. (org.apache.flink.runtime.webmonitor.WebMonitorUtils:95) > > [2018-11-07 11:11:13,169] WARN JobManager log files are unavailable in the > web dashboard. Log file location not found in environment variable > 'log.file' or configuration key 'Key: 'web.log.path' , default: null > (deprecated keys: [jobmanager.web.log.path])'. > (org.apache.flink.runtime.webmonitor.WebMonitorUtils:101) > > [2018-11-07 11:11:13,195] INFO Failed to load web based job submission > extension. Probable reason: flink-runtime-web is not in the classpath. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:115) > > [2018-11-07 11:11:13,514] INFO Rest endpoint listening at localhost:64523 > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:200) > > [2018-11-07 11:11:13,514] INFO Proposing leadership to contender > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint@1f86099a @ > http://localhost:64523 > (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274) > > [2018-11-07 11:11:13,514] INFO Starting job dispatcher(s) for JobManger > (org.apache.flink.runtime.minicluster.MiniCluster:364) > > [2018-11-07 11:11:13,514] INFO http://localhost:64523 was granted > leadership with leaderSessionID=940dc860-bbcb-4e35-8255-59702b220383 > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:757) > > [2018-11-07 11:11:13,515] INFO Received confirmation of leadership for > leader http://localhost:64523 , > session=940dc860-bbcb-4e35-8255-59702b220383 > (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229) > > [2018-11-07 11:11:13,523] INFO Starting RPC endpoint for > org.apache.flink.runtime.dispatcher.StandaloneDispatcher at > akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a . > (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224) > > [2018-11-07 11:11:13,537] INFO Proposing leadership to contender > org.apache.flink.runtime.dispatcher.StandaloneDispatcher@27e32fe4 @ > akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a > (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274) > > [2018-11-07 11:11:13,538] INFO Flink Mini Cluster started successfully > (org.apache.flink.runtime.minicluster.MiniCluster:410) > > [2018-11-07 11:11:13,538] INFO Dispatcher > akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a was > granted leadership with fencing token d7ae33ab-20bb-4598-bc6d-b7a57ca66860 > (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:818) > > [2018-11-07 11:11:13,549] INFO Recovering all persisted jobs. > (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:658) > > [2018-11-07 11:11:13,550] INFO Received confirmation of leadership for > leader akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a , > session=d7ae33ab-20bb-4598-bc6d-b7a57ca66860 > (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229) > > [2018-11-07 11:11:13,566] INFO Submitting job > 0ef8697ca98f6d2b565ed928d17c8a49 (Simple Test). > (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:247) > > [2018-11-07 11:11:13,592] INFO Starting RPC endpoint for > org.apache.flink.runtime.jobmaster.JobMaster at > akka://flink/user/jobmanager_1 . > (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224) > > [2018-11-07 11:11:13,618] INFO Initializing job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49). > (org.apache.flink.runtime.jobmaster.JobMaster:269) > > [2018-11-07 11:11:13,623] INFO Using restart strategy NoRestartStrategy > for Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). > (org.apache.flink.runtime.jobmaster.JobMaster:280) > > [2018-11-07 11:11:13,629] INFO Starting RPC endpoint for > org.apache.flink.runtime.jobmaster.slotpool.SlotPool at > akka://flink/user/96fe7c78-28fe-484f-ae16-dcd1d4bc2c6b . > (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224) > > [2018-11-07 11:11:13,654] INFO Job recovers via failover strategy: full > graph restart (org.apache.flink.runtime.executiongraph.ExecutionGraph:425) > > [2018-11-07 11:11:13,689] INFO Running initialization on master for job > Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). > (org.apache.flink.runtime.jobmaster.JobMaster:195) > > [2018-11-07 11:11:13,689] INFO Successfully ran initialization on master > in 0 ms. (org.apache.flink.runtime.jobmaster.JobMaster:224) > > [2018-11-07 11:11:13,722] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.runtime.jobmaster.JobMaster:230) > > [2018-11-07 11:11:13,740] INFO Proposing leadership to contender > org.apache.flink.runtime.jobmaster.JobManagerRunner@34c7d1b6 @ > akka://flink/user/jobmanager_1 > (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274) > > [2018-11-07 11:11:13,740] INFO JobManager runner for job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) was granted leadership with session id > 56e8324b-0015-4464-b6c7-ba0accdcec2a at akka://flink/user/jobmanager_1. > (org.apache.flink.runtime.jobmaster.JobManagerRunner:329) > > [2018-11-07 11:11:13,743] INFO Starting execution of job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) > (org.apache.flink.runtime.jobmaster.JobMaster:1009) > > [2018-11-07 11:11:13,744] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state CREATED to RUNNING. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356) > > [2018-11-07 11:11:13,747] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to > SCHEDULED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316) > > [2018-11-07 11:11:13,753] INFO Connecting to ResourceManager > akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f) > (org.apache.flink.runtime.jobmaster.JobMaster:1285) > > [2018-11-07 11:11:13,754] INFO Received confirmation of leadership for > leader akka://flink/user/jobmanager_1 , > session=56e8324b-0015-4464-b6c7-ba0accdcec2a > (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229) > > [2018-11-07 11:11:13,757] INFO Resolved ResourceManager address, beginning > registration (org.apache.flink.runtime.jobmaster.JobMaster:201) > > [2018-11-07 11:11:13,758] INFO Registration at ResourceManager attempt 1 > (timeout=100ms) (org.apache.flink.runtime.jobmaster.JobMaster:250) > > [2018-11-07 11:11:13,763] INFO Cannot serve slot request, no > ResourceManager connected. Adding as pending request > [SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] > (org.apache.flink.runtime.jobmaster.slotpool.SlotPool:733) > > [2018-11-07 11:11:13,767] INFO Registering job manager > b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:298) > > [2018-11-07 11:11:13,771] INFO Registered job manager > b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:672) > > [2018-11-07 11:11:13,772] INFO JobManager successfully registered at > ResourceManager, leader id: 86394924fb97bad612b67f526f84406f. > (org.apache.flink.runtime.jobmaster.JobMaster:1307) > > [2018-11-07 11:11:13,773] INFO Requesting new slot > [SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] and profile > ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, > nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager. > (org.apache.flink.runtime.jobmaster.slotpool.SlotPool:689) > > [2018-11-07 11:11:13,776] INFO Request slot with profile > ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, > nativeMemoryInMB=0, networkMemoryInMB=0} for job > 0ef8697ca98f6d2b565ed928d17c8a49 with allocation id > AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. > (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:422) > > [2018-11-07 11:11:13,778] INFO Receive slot request > AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a} for job > 0ef8697ca98f6d2b565ed928d17c8a49 from resource manager with leader id > 86394924fb97bad612b67f526f84406f. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:743) > > [2018-11-07 11:11:13,779] INFO Allocated slot for > AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:755) > > [2018-11-07 11:11:13,779] INFO Add job 0ef8697ca98f6d2b565ed928d17c8a49 > for job leader monitoring. > (org.apache.flink.runtime.taskexecutor.JobLeaderService:186) > > [2018-11-07 11:11:13,781] INFO Try to register at job manager > akka://flink/user/jobmanager_1 with leader id > 56e8324b-0015-4464-b6c7-ba0accdcec2a. > (org.apache.flink.runtime.taskexecutor.JobLeaderService:326) > > [2018-11-07 11:11:13,782] INFO Resolved JobManager address, beginning > registration (org.apache.flink.runtime.taskexecutor.JobLeaderService:201) > > [2018-11-07 11:11:13,782] INFO Registration at JobManager attempt 1 > (timeout=100ms) (org.apache.flink.runtime.taskexecutor.JobLeaderService:250) > > [2018-11-07 11:11:13,785] INFO Successful registration at job manager > akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.taskexecutor.JobLeaderService:374) > > [2018-11-07 11:11:13,786] INFO Establish JobManager connection for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1137) > > [2018-11-07 11:11:13,789] INFO Offer reserved slots to the leader of job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1042) > > [2018-11-07 11:11:13,794] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from SCHEDULED > to DEPLOYING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316) > > [2018-11-07 11:11:13,794] INFO Deploying Source: Custom Source -> Sink: > Print to Std. Out (1/1) (attempt #0) to 127.0.0.1 > (org.apache.flink.runtime.executiongraph.ExecutionGraph:576) > > [2018-11-07 11:11:13,819] INFO Received task Source: Custom Source -> > Sink: Print to Std. Out (1/1). > (org.apache.flink.runtime.taskexecutor.TaskExecutor:541) > > [2018-11-07 11:11:13,820] INFO Activate slot > AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. > (org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable:237) > > [2018-11-07 11:11:13,820] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to > DEPLOYING. (org.apache.flink.runtime.taskmanager.Task:915) > > [2018-11-07 11:11:13,820] INFO Creating FileSystem stream leak safety net > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING] > (org.apache.flink.runtime.taskmanager.Task:579) > > [2018-11-07 11:11:13,828] INFO Loading JAR files for task Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) > [DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:586) > > [2018-11-07 11:11:13,829] INFO Registering task at network: Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) > [DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:612) > > [2018-11-07 11:11:13,836] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING > to RUNNING. (org.apache.flink.runtime.taskmanager.Task:915) > > [2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING > to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316) > > [2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230) > > 0 > > 1 > > 2 > > 3 > > 4 > > [2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915) > > [2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818) > > [2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845) > > [2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337) > > [2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316) > > [2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356) > > [2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320) > > [2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102) > > [2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427) > > [2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265) > > [2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291) > > [2018-11-07 11:11:23,583] INFO Shutting down > TaskExecutorLocalStateStoresManager. > (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213) > > [2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory > C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 > (org.apache.flink.runtime.io.disk.iomanager.IOManager:110) > > [2018-11-07 11:11:23,591] INFO Shutting down the network environment and > its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344) > > [2018-11-07 11:11:23,591] INFO Removing cache directory > C:\Users\alinz\AppData\Local\Temp\flink-web-ui > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733) > > [2018-11-07 11:11:23,593] INFO Closing the SlotManager. > (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249) > > [2018-11-07 11:11:23,593] INFO Suspending the SlotManager. > (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212) > > [2018-11-07 11:11:23,596] INFO Close ResourceManager connection > cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new > address null. (org.apache.flink.runtime.jobmaster.JobMaster:1355) > > [2018-11-07 11:11:23,607] INFO Stop job leader service. > (org.apache.flink.runtime.taskexecutor.JobLeaderService:135) > > [2018-11-07 11:11:23,608] INFO Stopped TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:330) > > > > > > > > *De :* LINZ, Arnaud > *Envoyé :* mardi 6 novembre 2018 14:23 > *À :* user <user@flink.apache.org> > *Objet :* Stopping a streaming app from its own code : behaviour change > from 1.3 to 1.6 > > > > Hello, > > > > In flink 1.3, I was able to make a clean stop of a HA streaming > application just by ending the source “run()” method (with an ending > condition). > > I try to update my code to flink 1.6.2, but that is no longer working. > > > > Even if there are no sources and no item to process, the cluster continue > its execution forever, with an infinite number of such messages: > > Checkpoint triggering task Source: Custom Source (1/2) of job > 3b286f5344c50f0e466bb8ee79a2bb69 is not in state RUNNING but SCHEDULED > instead. Aborting checkpoint. > > > > Why has this behavior changed? How am I supposed to stop a streaming > execution from its own code now? Is > https://issues.apache.org/jira/browse/FLINK-2111 of any use? > > > > Thanks, > > Arnaud > > > > ------------------------------ > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses pièces > jointes. Toute utilisation ou diffusion non autorisée est interdite. Si > vous n'êtes pas destinataire de ce message, merci de le détruire et > d'avertir l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. The > company that sent this message cannot therefore be held liable for its > content nor attachments. Any unauthorized use or dissemination is > prohibited. If you are not the intended recipient of this message, then > please delete it and notify the sender. >