Hey Bharath, I probably know what is taking long in my shutdown sequence.
My code roughly looks like this - https://gist.github.com/omkardeshpande8/dc4259a8aa7a726a4fe787d9ece8f44a I think Thread.sleep(30000) in this code is increasing the time taken for the shutdown sequence. Is that correct? So what does stopping a samza container mean in context of a Beam Pipeline? Does pres.waitUntilFinish() return when samza container is being stopped ? We added Runtime.getRuntime().halt() because our JVM running a Beam pipeline was hanging up without exiting, in a lot of cases. Do you have suggestions on better way to handle this? And does rebalancing always include stopping all the containers? We are running on K8S and the pods are often moved around. And every time a pod is moved, rebalance will be triggered. And the rebalance in turn will restart all other pods. On 3/22/20, 10:29 PM, "Bharath Kumara Subramanian" <codin.mart...@gmail.com> wrote: This email is from an external sender. Hi Omkar, The errors are related to timeouts during shutdown which gets triggered during a rebalance. Whenever a new processor joins the quorum or leaves the quorum, a rebalance is triggered which requires all the existing processors to shutdown its container before agreeing on the new job model. In your case, it looks like the container is taking beyond the configured timeout (task.shutdown.ms) and hence throwing an exception. Do you know what is taking so long in your shutdown sequence? Meanwhile, you can start by increasing the shutdown timeout to a higher value. *Note:* You will need to account for the increase in the *consensus timeout* - the time the leader of the quorum will wait for other participants to agree on the new job model. If the other processors are still in the shutdown phase, the leader may end up expiring the current barrier and trigger another rebalance. For e.g. if the current setup is *task.shutdown.ms <http://task.shutdown.ms> = 10000* *job.coordinator.zk.consensus.timeout.ms <http://job.coordinator.zk.consensus.timeout.ms> = 30000* then your new setup will roughly (depending on how much room you already have between these two configurations) need to be following where "*x*" - denotes the increase in the value *task.shutdown.ms <http://task.shutdown.ms> = 10000 + x* *job.coordinator.zk.consensus.timeout.ms <http://job.coordinator.zk.consensus.timeout.ms> = 30000 + x* Let me know how it goes. Thanks, Bharath On Sat, Mar 21, 2020 at 3:17 PM Deshpande, Omkar <omkar_deshpa...@intuit.com.invalid> wrote: > We are using beam with samza runner - beam.version 2.19.0, samza.version > 1.3.0 > > And we are seeing the following excption frequently. Should we be tweaking > some configuration? Does this point to any network connectivity issue? > > 2020/03/21 21:42:09.896 INFO o.a.s.zk.ZkBarrierForVersionUpgrade - > Subscribing data changes on the path: > /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state > for barrier version: 151. > 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime - > Execution of action: JobModelVersionChange failed. > java.lang.IllegalStateException: ZkClient already closed! > at > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987) > at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158) > at > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194) > at > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337) > at > org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144) > at > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536) > at > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator - > Received exception in debounce timer! Stopping the job coordinator > java.lang.IllegalStateException: ZkClient already closed! > at > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987) > at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158) > at > org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194) > at > org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337) > at > org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144) > at > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536) > at > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2020/03/21 21:42:09.897 INFO org.apache.samza.zk.ZkJobCoordinator - Job > Coordinator shutdown is in progress! > 2020/03/21 21:42:09.898 ERROR o.a.samza.container.SamzaContainer - Caught > exception/error in run loop. > org.apache.samza.SamzaException: Run loop is interrupted > at > org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:262) > at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:160) > at > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.InterruptedException: null > at java.lang.Object.wait(Native Method) > at > org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:259) > ... 7 common frames omitted > 2020/03/21 21:42:09.898 INFO o.a.samza.container.SamzaContainer - > Shutting down SamzaContainer. > 2020/03/21 21:42:09.899 ERROR o.a.b.r.samza.SamzaPipelineResult - > Container shutdown timed out after 10000 ms. > java.util.concurrent.TimeoutException: Container shutdown timed out after > 10000 ms. > at > org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371) > at > org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104) > at > org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386) > at > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533) > at > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2020/03/21 21:42:09.901 ERROR c.i.s.sdk.core.SppBaseProcessor - An illegal > error occurred, forcibly terminating application > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.util.concurrent.TimeoutException: Container shutdown timed out after > 10000 ms. > at > org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:113) > at > org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:77) > at > org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:92) > at > com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.run(GracefulLifecycleManager.java:30) > at > com.intuit.strmprocess.sdk.core.SppBaseProcessor.run(SppBaseProcessor.java:74) > at > com.intuit.cgde.clickstream.c360.ProcessC360Data.main(ProcessC360Data.java:103) > Caused by: java.util.concurrent.TimeoutException: Container shutdown timed > out after 10000 ms. > at > org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371) > at > org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104) > at > org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386) > at > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533) > at > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2020/03/21 21:42:09.902 INFO o.a.samza.container.SamzaContainer - > Shutting down consumer multiplexer. > 2020/03/21 21:42:09.902 INFO o.a.samza.container.SamzaContainer - > Shutting down task instance stream tasks. > 2020/03/21 21:42:09.902 INFO o.a.samza.container.SamzaContainer - > Shutting down task thread pool > 2020/03/21 21:42:09.902 INFO c.i.s.sdk.core.SppBaseProcessor - > Application finished execution; terminating Cluster. > 2020/03/21 21:42:09.903 ERROR o.a.b.r.samza.SamzaPipelineResult - > Container shutdown timed out after 10000 ms. > java.util.concurrent.TimeoutException: Container shutdown timed out after > 10000 ms. > at > org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371) > at > org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104) > at > org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386) > at > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533) > at > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2020/03/21 21:42:09.903 INFO c.i.s.sdk.core.SppBaseProcessor - > Application Killer sleeping for 30000 ms > Exception in thread "Thread-8" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.util.concurrent.TimeoutException: Container shutdown timed out after > 10000 ms. > at > org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:113) > at > org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:77) > at > org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:92) > at > org.apache.beam.runners.samza.SamzaPipelineResult.cancel(SamzaPipelineResult.java:62) > at > com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.lambda$run$0(GracefulLifecycleManager.java:23) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.TimeoutException: Container shutdown timed > out after 10000 ms. > at > org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371) > at > org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104) > at > org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386) > at > org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533) > at > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > 2020/03/21 21:42:09.907 INFO o.a.samza.container.SamzaContainer - > Shutting down timer executor > 2020/03/21 21:42:09.908 INFO o.a.k.clients.producer.KafkaProducer - > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. >