Hi Omkar,

The errors are related to timeouts during shutdown which gets triggered
during a rebalance. Whenever a new processor joins the quorum or leaves the
quorum, a rebalance is triggered which requires all the existing processors
to shutdown its container before agreeing on the new job model.
In your case, it looks like the container is taking beyond the configured
timeout (task.shutdown.ms) and hence throwing an exception.

Do you know what is taking so long in your shutdown sequence?

Meanwhile, you can start by increasing the shutdown timeout to a higher
value.
*Note:* You will need to account for the increase in the *consensus timeout*
- the time the leader of the quorum will wait for other participants to
agree on the new job model. If the other processors are still in the
shutdown phase, the leader may end up expiring the current barrier and
trigger another rebalance.

For e.g. if the current setup is
*task.shutdown.ms <http://task.shutdown.ms> = 10000*
*job.coordinator.zk.consensus.timeout.ms
<http://job.coordinator.zk.consensus.timeout.ms> = 30000*
then your new setup will roughly (depending on how much room you already
have between these two configurations) need to be following where "*x*" -
denotes the increase in the value
*task.shutdown.ms <http://task.shutdown.ms> = 10000 + x*
*job.coordinator.zk.consensus.timeout.ms
<http://job.coordinator.zk.consensus.timeout.ms> = 30000 + x*
Let me know how it goes.

Thanks,
Bharath

On Sat, Mar 21, 2020 at 3:17 PM Deshpande, Omkar
<omkar_deshpa...@intuit.com.invalid> wrote:

> We are using beam with samza runner - beam.version 2.19.0, samza.version
> 1.3.0
>
> And we are seeing the following excption frequently. Should we be tweaking
> some configuration? Does this point to any network connectivity issue?
>
> 2020/03/21 21:42:09.896 INFO  o.a.s.zk.ZkBarrierForVersionUpgrade -
> Subscribing data changes on the path:
> /app-clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62/clickstreamc360pl127-v1327c712f-fd45-4a67-ba80-7baae72e1f62-coordinationData/jobModelGeneration/jobModelUpgradeBarrier/versionBarriers/barrier_151/barrier_state
> for barrier version: 151.
> 2020/03/21 21:42:09.896 ERROR o.a.s.zk.ScheduleAfterDebounceTime -
> Execution of action: JobModelVersionChange failed.
> java.lang.IllegalStateException: ZkClient already closed!
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
>         at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
>         at
> org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
>         at
> org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
>         at
> org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
>         at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
>         at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.897 ERROR org.apache.samza.zk.ZkJobCoordinator -
> Received exception in debounce timer! Stopping the job coordinator
> java.lang.IllegalStateException: ZkClient already closed!
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
>         at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:1158)
>         at
> org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:194)
>         at
> org.apache.samza.zk.ZkUtils.subscribeDataChanges(ZkUtils.java:337)
>         at
> org.apache.samza.zk.ZkBarrierForVersionUpgrade.join(ZkBarrierForVersionUpgrade.java:144)
>         at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:536)
>         at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.897 INFO  org.apache.samza.zk.ZkJobCoordinator - Job
> Coordinator shutdown is in progress!
> 2020/03/21 21:42:09.898 ERROR o.a.samza.container.SamzaContainer - Caught
> exception/error in run loop.
> org.apache.samza.SamzaException: Run loop is interrupted
>         at
> org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:262)
>         at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:160)
>         at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.InterruptedException: null
>         at java.lang.Object.wait(Native Method)
>         at
> org.apache.samza.task.AsyncRunLoop.blockIfBusyOrNoNewWork(AsyncRunLoop.java:259)
>         ... 7 common frames omitted
> 2020/03/21 21:42:09.898 INFO  o.a.samza.container.SamzaContainer -
> Shutting down SamzaContainer.
> 2020/03/21 21:42:09.899 ERROR o.a.b.r.samza.SamzaPipelineResult -
> Container shutdown timed out after 10000 ms.
> java.util.concurrent.TimeoutException: Container shutdown timed out after
> 10000 ms.
>         at
> org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
>         at
> org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
>         at
> org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
>         at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
>         at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.901 ERROR c.i.s.sdk.core.SppBaseProcessor - An illegal
> error occurred, forcibly terminating application
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.util.concurrent.TimeoutException: Container shutdown timed out after
> 10000 ms.
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:113)
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:77)
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:92)
>         at
> com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.run(GracefulLifecycleManager.java:30)
>         at
> com.intuit.strmprocess.sdk.core.SppBaseProcessor.run(SppBaseProcessor.java:74)
>         at
> com.intuit.cgde.clickstream.c360.ProcessC360Data.main(ProcessC360Data.java:103)
> Caused by: java.util.concurrent.TimeoutException: Container shutdown timed
> out after 10000 ms.
>         at
> org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
>         at
> org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
>         at
> org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
>         at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
>         at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>        at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer -
> Shutting down consumer multiplexer.
> 2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer -
> Shutting down task instance stream tasks.
> 2020/03/21 21:42:09.902 INFO  o.a.samza.container.SamzaContainer -
> Shutting down task thread pool
> 2020/03/21 21:42:09.902 INFO  c.i.s.sdk.core.SppBaseProcessor -
> Application finished execution; terminating Cluster.
> 2020/03/21 21:42:09.903 ERROR o.a.b.r.samza.SamzaPipelineResult -
> Container shutdown timed out after 10000 ms.
> java.util.concurrent.TimeoutException: Container shutdown timed out after
> 10000 ms.
>         at
> org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
>         at
> org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
>         at
> org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
>         at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
>         at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 2020/03/21 21:42:09.903 INFO  c.i.s.sdk.core.SppBaseProcessor -
> Application Killer sleeping for 30000 ms
> Exception in thread "Thread-8"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.util.concurrent.TimeoutException: Container shutdown timed out after
> 10000 ms.
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:113)
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:77)
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:92)
>         at
> org.apache.beam.runners.samza.SamzaPipelineResult.cancel(SamzaPipelineResult.java:62)
>         at
> com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.lambda$run$0(GracefulLifecycleManager.java:23)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.TimeoutException: Container shutdown timed
> out after 10000 ms.
>         at
> org.apache.samza.processor.StreamProcessor.stopSamzaContainer(StreamProcessor.java:371)
>         at
> org.apache.samza.processor.StreamProcessor.access$300(StreamProcessor.java:104)
>         at
> org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:386)
>         at
> org.apache.samza.zk.ZkJobCoordinator$ZkJobModelVersionChangeHandler.lambda$doHandleDataChange$0(ZkJobCoordinator.java:533)
>         at
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:169)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         ... 1 more
> 2020/03/21 21:42:09.907 INFO  o.a.samza.container.SamzaContainer -
> Shutting down timer executor
> 2020/03/21 21:42:09.908 INFO  o.a.k.clients.producer.KafkaProducer -
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
>

Reply via email to