Hi Arnaud,

Thanks for reporting the issue!

Best, Fabian

Am Do., 8. Nov. 2018 um 20:50 Uhr schrieb LINZ, Arnaud <
al...@bouyguestelecom.fr>:

> 1.    FLINK-10832 <https://issues.apache.org/jira/browse/FLINK-10832>
>
> Created (with heavy difficulties as typing java code in a jira description
> was an awful experience J)
>
>
>
>
>
> *De :* LINZ, Arnaud
> *Envoyé :* mercredi 7 novembre 2018 11:43
> *À :* 'user' <user@flink.apache.org>
> *Objet :* RE: Stopping a streaming app from its own code : behaviour
> change from 1.3 to 1.6
>
>
>
> FYI, the code below ends with version 1.6.0, do not end in 1.6.1. I
> suspect it’s a bug instead of a new feature.
>
>
>
> *De :* LINZ, Arnaud
> *Envoyé :* mercredi 7 novembre 2018 11:14
> *À :* 'user' <user@flink.apache.org>
> *Objet :* RE: Stopping a streaming app from its own code : behaviour
> change from 1.3 to 1.6
>
>
>
> Hello,
>
>
>
> This has nothing to do with HA. All my unit tests involving a streaming
> app now fail in “infinite execution”
>
> This simple code never ends :
>
>     @Test
>
>     *public* *void* testFlink162() *throws* Exception {
>
>         // get the execution environment
>
>         *final* StreamExecutionEnvironment env =
> StreamExecutionEnvironment.*getExecutionEnvironment*();
>
>         // get input data
>
>         *final* DataStreamSource<String> text = env.addSource(*new*
> *SourceFunction<String>()* {
>
>             @Override
>
>             *public* *void* run(*final* SourceContext<String> ctx)
> *throws* Exception {
>
>                 *for* (*int* count = 0; count < 5; count++) {
>
>                     ctx.collect(String.*valueOf*(count));
>
>                 }
>
>             }
>
>             @Override
>
>             *public* *void* cancel() {
>
>             }
>
>         });
>
>         text.print().setParallelism(1);
>
>         env.execute("Simple Test");
>
>         // Never ends !
>
>     }
>
> Is this really a new feature or a critical bug?
>
> In the log, the task executor is stopped
>
> [2018-11-07 11:11:23,608] INFO Stopped TaskExecutor
> akka://flink/user/taskmanager_0.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)
>
> But execute() does not return.
>
>
>
> Arnaud
>
>
>
> Log is :
>
> [2018-11-07 11:11:11,432] INFO Running job on local embedded Flink mini
> cluster
> (org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)
>
> [2018-11-07 11:11:11,449] INFO Starting Flink Mini Cluster
> (org.apache.flink.runtime.minicluster.MiniCluster:227)
>
> [2018-11-07 11:11:11,636] INFO Starting Metrics Registry
> (org.apache.flink.runtime.minicluster.MiniCluster:238)
>
> [2018-11-07 11:11:11,652] INFO No metrics reporter configured, no metrics
> will be exposed/reported.
> (org.apache.flink.runtime.metrics.MetricRegistryImpl:113)
>
> [2018-11-07 11:11:11,703] INFO Starting RPC Service(s)
> (org.apache.flink.runtime.minicluster.MiniCluster:249)
>
> [2018-11-07 11:11:12,244] INFO Slf4jLogger started
> (akka.event.slf4j.Slf4jLogger:92)
>
> [2018-11-07 11:11:12,264] INFO Starting high-availability services
> (org.apache.flink.runtime.minicluster.MiniCluster:290)
>
> [2018-11-07 11:11:12,367] INFO Created BLOB server storage directory
> C:\Users\alinz\AppData\Local\Temp\blobStore-fd104a2d-caaf-4740-a762-d292cb2ed108
> (org.apache.flink.runtime.blob.BlobServer:141)
>
> [2018-11-07 11:11:12,379] INFO Started BLOB server at 0.0.0.0:64504 - max
> concurrent requests: 50 - max backlog: 1000
> (org.apache.flink.runtime.blob.BlobServer:203)
>
> [2018-11-07 11:11:12,380] INFO Starting ResourceManger
> (org.apache.flink.runtime.minicluster.MiniCluster:301)
>
> [2018-11-07 11:11:12,409] INFO Starting RPC endpoint for
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at
> akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 .
> (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
>
> [2018-11-07 11:11:12,432] INFO Proposing leadership to contender
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager@5b1f29fa
> @ akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864
> (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)
>
> [2018-11-07 11:11:12,439] INFO ResourceManager
> akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 was
> granted leadership with fencing token 86394924fb97bad612b67f526f84406f
> (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:953)
>
> [2018-11-07 11:11:12,440] INFO Starting the SlotManager.
> (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:185)
>
> [2018-11-07 11:11:12,442] INFO Received confirmation of leadership for
> leader
> akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 ,
> session=12b67f52-6f84-406f-8639-4924fb97bad6
> (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)
>
> [2018-11-07 11:11:12,452] INFO Created BLOB cache storage directory
> C:\Users\alinz\AppData\Local\Temp\blobStore-b2618f73-5ec6-4fdf-ad43-1da6d6c19a4f
> (org.apache.flink.runtime.blob.PermanentBlobCache:107)
>
> [2018-11-07 11:11:12,454] INFO Created BLOB cache storage directory
> C:\Users\alinz\AppData\Local\Temp\blobStore-df6c61d2-3c51-4335-a96e-6b00c82e4d90
> (org.apache.flink.runtime.blob.TransientBlobCache:107)
>
> [2018-11-07 11:11:12,454] INFO Starting 1 TaskManger(s)
> (org.apache.flink.runtime.minicluster.MiniCluster:316)
>
> [2018-11-07 11:11:12,460] INFO Starting TaskManager with ResourceID:
> e84ce076-ec5e-48d6-90dc-4b18ba7c5757
> (org.apache.flink.runtime.taskexecutor.TaskManagerRunner:352)
>
> [2018-11-07 11:11:12,531] INFO Temporary file directory
> 'C:\Users\alinz\AppData\Local\Temp': total 476 GB, usable 149 GB (31,30%
> usable) (org.apache.flink.runtime.taskexecutor.TaskManagerServices:720)
>
> [2018-11-07 11:11:12,757] INFO Allocated 396 MB for network buffer pool
> (number of memory segments: 12686, bytes per segment: 32768).
> (org.apache.flink.runtime.io.network.buffer.NetworkBufferPool:114)
>
> [2018-11-07 11:11:12,765] INFO Could not load Queryable State Client
> Proxy. Probable reason: flink-queryable-state-runtime is not in the
> classpath. To enable Queryable State, please move the
> flink-queryable-state-runtime jar from the opt to the lib folder.
> (org.apache.flink.runtime.query.QueryableStateUtils:84)
>
> [2018-11-07 11:11:12,765] INFO Could not load Queryable State Server.
> Probable reason: flink-queryable-state-runtime is not in the classpath. To
> enable Queryable State, please move the flink-queryable-state-runtime jar
> from the opt to the lib folder.
> (org.apache.flink.runtime.query.QueryableStateUtils:141)
>
> [2018-11-07 11:11:12,766] INFO Starting the network environment and its
> components. (org.apache.flink.runtime.io.network.NetworkEnvironment:304)
>
> [2018-11-07 11:11:12,768] WARN No hostname could be resolved for the IP
> address 127.0.0.1, using IP address as host name. Local input split
> assignment (such as for HDFS files) may be impacted.
> (org.apache.flink.runtime.taskmanager.TaskManagerLocation:102)
>
> [2018-11-07 11:11:12,769] INFO Limiting managed memory to 0.7 of the
> currently free heap space (2493 MB), memory will be allocated lazily.
> (org.apache.flink.runtime.taskexecutor.TaskManagerServices:331)
>
> [2018-11-07 11:11:12,776] INFO I/O manager uses directory
> C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814
> for spill files. (org.apache.flink.runtime.io.disk.iomanager.IOManager:95)
>
> [2018-11-07 11:11:12,793] INFO Messages have a max timeout of 10000 ms
> (org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration:188)
>
> [2018-11-07 11:11:12,803] INFO Starting RPC endpoint for
> org.apache.flink.runtime.taskexecutor.TaskExecutor at
> akka://flink/user/taskmanager_0 .
> (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
>
> [2018-11-07 11:11:12,813] INFO Start job leader service.
> (org.apache.flink.runtime.taskexecutor.JobLeaderService:118)
>
> [2018-11-07 11:11:12,814] INFO Connecting to ResourceManager
> akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f).
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:904)
>
> [2018-11-07 11:11:12,814] INFO User file cache uses directory
> C:\Users\alinz\AppData\Local\Temp\flink-dist-cache-648ab4eb-f39c-4262-a3cc-07adfa6e5b43
> (org.apache.flink.runtime.filecache.FileCache:107)
>
> [2018-11-07 11:11:12,815] INFO Starting dispatcher rest endpoint.
> (org.apache.flink.runtime.minicluster.MiniCluster:327)
>
> [2018-11-07 11:11:12,845] INFO Resolved ResourceManager address, beginning
> registration (org.apache.flink.runtime.taskexecutor.TaskExecutor:201)
>
> [2018-11-07 11:11:12,846] INFO Registration at ResourceManager attempt 1
> (timeout=100ms) (org.apache.flink.runtime.taskexecutor.TaskExecutor:250)
>
> [2018-11-07 11:11:12,853] INFO Registering TaskManager with ResourceID
> e84ce076-ec5e-48d6-90dc-4b18ba7c5757 (akka://flink/user/taskmanager_0) at
> ResourceManager
> (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:727)
>
> [2018-11-07 11:11:12,855] INFO Successful registration at resource manager
> akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864
> under registration id 8098d3fe3fe83133051c3bb97bf96d37.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:94)
>
> [2018-11-07 11:11:12,877] INFO Starting rest endpoint.
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:128)
>
> [2018-11-07 11:11:13,168] WARN Log file environment variable 'log.file' is
> not set. (org.apache.flink.runtime.webmonitor.WebMonitorUtils:95)
>
> [2018-11-07 11:11:13,169] WARN JobManager log files are unavailable in the
> web dashboard. Log file location not found in environment variable
> 'log.file' or configuration key 'Key: 'web.log.path' , default: null
> (deprecated keys: [jobmanager.web.log.path])'.
> (org.apache.flink.runtime.webmonitor.WebMonitorUtils:101)
>
> [2018-11-07 11:11:13,195] INFO Failed to load web based job submission
> extension. Probable reason: flink-runtime-web is not in the classpath.
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:115)
>
> [2018-11-07 11:11:13,514] INFO Rest endpoint listening at localhost:64523
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:200)
>
> [2018-11-07 11:11:13,514] INFO Proposing leadership to contender
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint@1f86099a @
> http://localhost:64523
> (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)
>
> [2018-11-07 11:11:13,514] INFO Starting job dispatcher(s) for JobManger
> (org.apache.flink.runtime.minicluster.MiniCluster:364)
>
> [2018-11-07 11:11:13,514] INFO http://localhost:64523 was granted
> leadership with leaderSessionID=940dc860-bbcb-4e35-8255-59702b220383
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:757)
>
> [2018-11-07 11:11:13,515] INFO Received confirmation of leadership for
> leader http://localhost:64523 ,
> session=940dc860-bbcb-4e35-8255-59702b220383
> (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)
>
> [2018-11-07 11:11:13,523] INFO Starting RPC endpoint for
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
> akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a .
> (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
>
> [2018-11-07 11:11:13,537] INFO Proposing leadership to contender
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher@27e32fe4 @
> akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a
> (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)
>
> [2018-11-07 11:11:13,538] INFO Flink Mini Cluster started successfully
> (org.apache.flink.runtime.minicluster.MiniCluster:410)
>
> [2018-11-07 11:11:13,538] INFO Dispatcher
> akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a was
> granted leadership with fencing token d7ae33ab-20bb-4598-bc6d-b7a57ca66860
> (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:818)
>
> [2018-11-07 11:11:13,549] INFO Recovering all persisted jobs.
> (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:658)
>
> [2018-11-07 11:11:13,550] INFO Received confirmation of leadership for
> leader akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a ,
> session=d7ae33ab-20bb-4598-bc6d-b7a57ca66860
> (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)
>
> [2018-11-07 11:11:13,566] INFO Submitting job
> 0ef8697ca98f6d2b565ed928d17c8a49 (Simple Test).
> (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:247)
>
> [2018-11-07 11:11:13,592] INFO Starting RPC endpoint for
> org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/jobmanager_1 .
> (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
>
> [2018-11-07 11:11:13,618] INFO Initializing job Simple Test
> (0ef8697ca98f6d2b565ed928d17c8a49).
> (org.apache.flink.runtime.jobmaster.JobMaster:269)
>
> [2018-11-07 11:11:13,623] INFO Using restart strategy NoRestartStrategy
> for Simple Test (0ef8697ca98f6d2b565ed928d17c8a49).
> (org.apache.flink.runtime.jobmaster.JobMaster:280)
>
> [2018-11-07 11:11:13,629] INFO Starting RPC endpoint for
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool at
> akka://flink/user/96fe7c78-28fe-484f-ae16-dcd1d4bc2c6b .
> (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
>
> [2018-11-07 11:11:13,654] INFO Job recovers via failover strategy: full
> graph restart (org.apache.flink.runtime.executiongraph.ExecutionGraph:425)
>
> [2018-11-07 11:11:13,689] INFO Running initialization on master for job
> Simple Test (0ef8697ca98f6d2b565ed928d17c8a49).
> (org.apache.flink.runtime.jobmaster.JobMaster:195)
>
> [2018-11-07 11:11:13,689] INFO Successfully ran initialization on master
> in 0 ms. (org.apache.flink.runtime.jobmaster.JobMaster:224)
>
> [2018-11-07 11:11:13,722] INFO No state backend has been configured, using
> default (Memory / JobManager) MemoryStateBackend (data in heap memory /
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null',
> asynchronous: TRUE, maxStateSize: 5242880)
> (org.apache.flink.runtime.jobmaster.JobMaster:230)
>
> [2018-11-07 11:11:13,740] INFO Proposing leadership to contender
> org.apache.flink.runtime.jobmaster.JobManagerRunner@34c7d1b6 @
> akka://flink/user/jobmanager_1
> (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)
>
> [2018-11-07 11:11:13,740] INFO JobManager runner for job Simple Test
> (0ef8697ca98f6d2b565ed928d17c8a49) was granted leadership with session id
> 56e8324b-0015-4464-b6c7-ba0accdcec2a at akka://flink/user/jobmanager_1.
> (org.apache.flink.runtime.jobmaster.JobManagerRunner:329)
>
> [2018-11-07 11:11:13,743] INFO Starting execution of job Simple Test
> (0ef8697ca98f6d2b565ed928d17c8a49)
> (org.apache.flink.runtime.jobmaster.JobMaster:1009)
>
> [2018-11-07 11:11:13,744] INFO Job Simple Test
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state CREATED to RUNNING.
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)
>
> [2018-11-07 11:11:13,747] INFO Source: Custom Source -> Sink: Print to
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to
> SCHEDULED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)
>
> [2018-11-07 11:11:13,753] INFO Connecting to ResourceManager
> akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f)
> (org.apache.flink.runtime.jobmaster.JobMaster:1285)
>
> [2018-11-07 11:11:13,754] INFO Received confirmation of leadership for
> leader akka://flink/user/jobmanager_1 ,
> session=56e8324b-0015-4464-b6c7-ba0accdcec2a
> (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)
>
> [2018-11-07 11:11:13,757] INFO Resolved ResourceManager address, beginning
> registration (org.apache.flink.runtime.jobmaster.JobMaster:201)
>
> [2018-11-07 11:11:13,758] INFO Registration at ResourceManager attempt 1
> (timeout=100ms) (org.apache.flink.runtime.jobmaster.JobMaster:250)
>
> [2018-11-07 11:11:13,763] INFO Cannot serve slot request, no
> ResourceManager connected. Adding as pending request
> [SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}]
> (org.apache.flink.runtime.jobmaster.slotpool.SlotPool:733)
>
> [2018-11-07 11:11:13,767] INFO Registering job manager
> b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job
> 0ef8697ca98f6d2b565ed928d17c8a49.
> (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:298)
>
> [2018-11-07 11:11:13,771] INFO Registered job manager
> b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job
> 0ef8697ca98f6d2b565ed928d17c8a49.
> (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:672)
>
> [2018-11-07 11:11:13,772] INFO JobManager successfully registered at
> ResourceManager, leader id: 86394924fb97bad612b67f526f84406f.
> (org.apache.flink.runtime.jobmaster.JobMaster:1307)
>
> [2018-11-07 11:11:13,773] INFO Requesting new slot
> [SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] and profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
> (org.apache.flink.runtime.jobmaster.slotpool.SlotPool:689)
>
> [2018-11-07 11:11:13,776] INFO Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> 0ef8697ca98f6d2b565ed928d17c8a49 with allocation id
> AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}.
> (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:422)
>
> [2018-11-07 11:11:13,778] INFO Receive slot request
> AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a} for job
> 0ef8697ca98f6d2b565ed928d17c8a49 from resource manager with leader id
> 86394924fb97bad612b67f526f84406f.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:743)
>
> [2018-11-07 11:11:13,779] INFO Allocated slot for
> AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:755)
>
> [2018-11-07 11:11:13,779] INFO Add job 0ef8697ca98f6d2b565ed928d17c8a49
> for job leader monitoring.
> (org.apache.flink.runtime.taskexecutor.JobLeaderService:186)
>
> [2018-11-07 11:11:13,781] INFO Try to register at job manager
> akka://flink/user/jobmanager_1 with leader id
> 56e8324b-0015-4464-b6c7-ba0accdcec2a.
> (org.apache.flink.runtime.taskexecutor.JobLeaderService:326)
>
> [2018-11-07 11:11:13,782] INFO Resolved JobManager address, beginning
> registration (org.apache.flink.runtime.taskexecutor.JobLeaderService:201)
>
> [2018-11-07 11:11:13,782] INFO Registration at JobManager attempt 1
> (timeout=100ms) (org.apache.flink.runtime.taskexecutor.JobLeaderService:250)
>
> [2018-11-07 11:11:13,785] INFO Successful registration at job manager
> akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49.
> (org.apache.flink.runtime.taskexecutor.JobLeaderService:374)
>
> [2018-11-07 11:11:13,786] INFO Establish JobManager connection for job
> 0ef8697ca98f6d2b565ed928d17c8a49.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1137)
>
> [2018-11-07 11:11:13,789] INFO Offer reserved slots to the leader of job
> 0ef8697ca98f6d2b565ed928d17c8a49.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1042)
>
> [2018-11-07 11:11:13,794] INFO Source: Custom Source -> Sink: Print to
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from SCHEDULED
> to DEPLOYING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)
>
> [2018-11-07 11:11:13,794] INFO Deploying Source: Custom Source -> Sink:
> Print to Std. Out (1/1) (attempt #0) to 127.0.0.1
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:576)
>
> [2018-11-07 11:11:13,819] INFO Received task Source: Custom Source ->
> Sink: Print to Std. Out (1/1).
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:541)
>
> [2018-11-07 11:11:13,820] INFO Activate slot
> AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}.
> (org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable:237)
>
> [2018-11-07 11:11:13,820] INFO Source: Custom Source -> Sink: Print to
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to
> DEPLOYING. (org.apache.flink.runtime.taskmanager.Task:915)
>
> [2018-11-07 11:11:13,820] INFO Creating FileSystem stream leak safety net
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1)
> (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING]
> (org.apache.flink.runtime.taskmanager.Task:579)
>
> [2018-11-07 11:11:13,828] INFO Loading JAR files for task Source: Custom
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2)
> [DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:586)
>
> [2018-11-07 11:11:13,829] INFO Registering task at network: Source: Custom
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2)
> [DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:612)
>
> [2018-11-07 11:11:13,836] INFO Source: Custom Source -> Sink: Print to
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING
> to RUNNING. (org.apache.flink.runtime.taskmanager.Task:915)
>
> [2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING
> to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)
>
> [2018-11-07 11:11:13,840] INFO No state backend has been configured, using
> default (Memory / JobManager) MemoryStateBackend (data in heap memory /
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null',
> asynchronous: TRUE, maxStateSize: 5242880)
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)
>
> 0
>
> 1
>
> 2
>
> 3
>
> 4
>
> [2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)
>
> [2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2).
> (org.apache.flink.runtime.taskmanager.Task:818)
>
> [2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1)
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED]
> (org.apache.flink.runtime.taskmanager.Task:845)
>
> [2018-11-07 11:11:13,899] INFO Un-registering task and sending final
> execution state FINISHED to JobManager for task Source: Custom Source ->
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)
>
> [2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)
>
> [2018-11-07 11:11:13,907] INFO Job Simple Test
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED.
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)
>
> [2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job
> 0ef8697ca98f6d2b565ed928d17c8a49.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)
>
> [2018-11-07 11:11:13,908] INFO Shutting down
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)
>
> [2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster
> (org.apache.flink.runtime.minicluster.MiniCluster:427)
>
> [2018-11-07 11:11:23,580] INFO Shutting down rest endpoint.
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)
>
> [2018-11-07 11:11:23,582] INFO Stopping TaskExecutor
> akka://flink/user/taskmanager_0.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)
>
> [2018-11-07 11:11:23,583] INFO Shutting down
> TaskExecutorLocalStateStoresManager.
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)
>
> [2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory
> C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814
> (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)
>
> [2018-11-07 11:11:23,591] INFO Shutting down the network environment and
> its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)
>
> [2018-11-07 11:11:23,591] INFO Removing cache directory
> C:\Users\alinz\AppData\Local\Temp\flink-web-ui
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)
>
> [2018-11-07 11:11:23,593] INFO Closing the SlotManager.
> (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)
>
> [2018-11-07 11:11:23,593] INFO Suspending the SlotManager.
> (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)
>
> [2018-11-07 11:11:23,596] INFO Close ResourceManager connection
> cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new
> address null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)
>
> [2018-11-07 11:11:23,607] INFO Stop job leader service.
> (org.apache.flink.runtime.taskexecutor.JobLeaderService:135)
>
> [2018-11-07 11:11:23,608] INFO Stopped TaskExecutor
> akka://flink/user/taskmanager_0.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)
>
>
>
>
>
>
>
> *De :* LINZ, Arnaud
> *Envoyé :* mardi 6 novembre 2018 14:23
> *À :* user <user@flink.apache.org>
> *Objet :* Stopping a streaming app from its own code : behaviour change
> from 1.3 to 1.6
>
>
>
> Hello,
>
>
>
> In flink 1.3, I was able to make a clean stop of a HA streaming
> application just by ending the source “run()” method (with an ending
> condition).
>
> I try to update my code to flink 1.6.2, but that is no longer working.
>
>
>
> Even if there are no sources and no item to process, the cluster continue
> its execution forever, with an infinite number of such messages:
>
> Checkpoint triggering task Source: Custom Source (1/2) of job
> 3b286f5344c50f0e466bb8ee79a2bb69 is not in state RUNNING but SCHEDULED
> instead. Aborting checkpoint.
>
>
>
> Why has this behavior changed? How am I supposed to stop a streaming
> execution from its own code now? Is
> https://issues.apache.org/jira/browse/FLINK-2111 of any use?
>
>
>
> Thanks,
>
> Arnaud
>
>
>
> ------------------------------
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>

Reply via email to