Here’s a more complete view of the task manager log from the start of this 
occurrence:

2018-01-11 14:50:08.286 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates 
(ip-10-80-54-205.us-west-2.compute.internal:2181)] INFO  
c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Unable to read 
additional data from server sessionid 0x3c00002d8a7603de, likely server has 
closed socket, closing socket connection and attempting reconnect
2018-01-11 14:50:08.388 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates (5/10)-EventThread] INFO  
c.i.f.s.c.o.a.curator.framework.state.ConnectionStateManager  - State change: 
SUSPENDED
2018-01-11 14:50:08.412 [flink-akka.actor.default-dispatcher-16] INFO  
o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - TaskManager 
akka://flink/user/taskmanager disconnects from JobManager 
akka.tcp://flink@10.80.54.126:31024/user/jobmanager: JobManager requested 
disconnect: JobManager is no longer the leader
2018-01-11 14:50:08.412 [flink-akka.actor.default-dispatcher-16] INFO  
o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Cancelling all 
computations and discarding all cached data.
2018-01-11 14:50:08.450 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.taskmanager.Task  - Attempting to fail task externally 
Sink: ES (5/10) (1a2951add18548188742e85d98da271f).
2018-01-11 14:50:08.452 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.taskmanager.Task  - Sink: ES (5/10) 
(1a2951add18548188742e85d98da271f) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from 
JobManager akka.tcp://flink@10.80.54.126:31024/user/jobmanager: JobManager 
requested disconnect: JobManager is no longer the leader
        at 
org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1073)
        at 
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:314)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at 
org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:121)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2018-01-11 14:50:08.486 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.taskmanager.Task  - Triggering cancellation of task 
code Sink: ES (5/10) (1a2951add18548188742e85d98da271f).
2018-01-11 14:50:08.506 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.taskmanager.Task  - Attempting to fail task externally 
Sink: Kafka (5/10) (71bebe47ce524c0d535845b1e4d9c595).
2018-01-11 14:50:08.506 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.taskmanager.Task  - Sink: Kafka (5/10) 
(71bebe47ce524c0d535845b1e4d9c595) switched from RUNNING to FAILED.

...

2018-01-11 14:50:08.550 [Sink: Kafka (5/10)] INFO  
org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka producer 
with timeoutMillis = 9223372036854775807 ms.
2018-01-11 14:50:08.553 [Sink: Kafka (5/10)] ERROR 
org.apache.kafka.clients.producer.KafkaProducer  - Interrupted while joining 
ioThread
java.lang.InterruptedException: null
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1260)
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031)
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010)
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:317)
        at 
com.intellify.flink.crusher.executor.sink.TracingSourceRecordSink.close(TracingSourceRecordSink.java:67)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)

2018-01-11 14:50:08.639 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates (5/10)] INFO  
c.i.flink.shared.config.SharedIntellifyConfigProvider  - Closing archaius 
initializer
2018-01-11 14:50:08.741 [Sink: ES (5/10)] INFO  
c.i.flink.shared.config.SharedIntellifyConfigProvider  - Closing archaius 
initializer

2018-01-11 14:50:08.873 [Sink: Kafka (5/10)] ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask  - Error during disposal of 
stream operator.
org.apache.kafka.common.KafkaException: Failed to close kafka producer
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1062)
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010)
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:317)
        at 
com.intellify.flink.crusher.executor.sink.TracingSourceRecordSink.close(TracingSourceRecordSink.java:67)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException: null
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1260)
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031)
        ... 10 common frames omitted

2018-01-11 14:50:08.924 [flink-akka.actor.default-dispatcher-16] INFO  
o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Disassociating 
from JobManager

2018-01-11 14:50:08.955 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates (5/10)] ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask  - Error during disposal of 
stream operator.
java.lang.NoClassDefFoundError: 
com/intellify/flink/shaded/curator/org/apache/curator/utils/CloseableUtils
        at 
com.intellify.flink.shaded.curator.org.apache.curator.ConnectionState.close(ConnectionState.java:119)
        at 
com.intellify.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.close(CuratorZookeeperClient.java:227)
        at 
com.intellify.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.close(CuratorFrameworkImpl.java:381)
        at 
com.intellify.config.ManagedCuratorFramework.reallyClose(ManagedCuratorFramework.java:48)
        at 
com.intellify.config.ArchaiusInitializer.close(ArchaiusInitializer.java:75)
        at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303)
        at 
com.intellify.flink.shared.config.SharedIntellifyConfigProvider.close(SharedIntellifyConfigProvider.java:56)
        at 
com.intellify.flink.shared.config.SerializableLiveProperty.close(SerializableLiveProperty.java:68)
        at 
com.intellify.flink.shared.redis.RedisFactory.close(RedisFactory.java:56)
        at 
com.intellify.flink.crusher.executor.process.DuplicateFilterFunction.close(DuplicateFilterFunction.java:90)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
com.intellify.flink.shaded.curator.org.apache.curator.utils.CloseableUtils
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 16 common frames omitted

2018-01-11 14:50:08.962 [Curator-Framework-0] INFO  
c.i.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  - 
backgroundOperationsLoop exiting

2018-01-11 14:50:09.068 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB cache
2018-01-11 14:50:09.070 [Sink: Kafka (5/10)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams 
are closed for task Sink: Kafka (5/10) (71bebe47ce524c0d535845b1e4d9c595) 
[FAILED]
2018-01-11 14:50:09.070 [Sink: StagingLake (5/10)] ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask  - Could not shut down 
timer service
java.lang.InterruptedException: null
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
        at 
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)

2018-01-11 14:50:09.146 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB cache
2018-01-11 14:50:09.200 [flink-akka.actor.default-dispatcher-16] INFO  
o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Trying to register 
at JobManager akka.tcp://flink@10.80.54.126:31024/user/jobmanager (attempt 1, 
timeout: 500 milliseconds)
2018-01-11 14:50:09.313 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates 
(ip-10-80-53-99.us-west-2.compute.internal:2181)] INFO  
c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket 
connection to server ip-10-80-53-99.us-west-2.compute.internal/10.80.53.99:2181
2018-01-11 14:50:09.314 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates 
(ip-10-80-53-99.us-west-2.compute.internal:2181)] INFO  
c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection 
established to ip-10-80-53-99.us-west-2.compute.internal/10.80.53.99:2181, 
initiating session
2018-01-11 14:50:09.315 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates 
(ip-10-80-53-99.us-west-2.compute.internal:2181)] WARN  
c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
0x3c00002d8a7603de for server 
ip-10-80-53-99.us-west-2.compute.internal/10.80.53.99:2181, unexpected error, 
closing socket connection and attempting reconnect
java.lang.NoClassDefFoundError: 
com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches
        at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926)
        at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363)
        at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
Caused by: java.lang.ClassNotFoundException: 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.proto.SetWatches
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 3 common frames omitted

2018-01-11 14:50:09.329 [flink-akka.actor.default-dispatcher-15] INFO  
o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Trying to register 
at JobManager akka.tcp://flink@10.80.54.126:31024/user/jobmanager (attempt 1, 
timeout: 500 milliseconds)
2018-01-11 14:50:09.341 [flink-akka.actor.default-dispatcher-15] INFO  
o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Successful 
registration at JobManager 
(akka.tcp://flink@10.80.54.126:31024/user/jobmanager), starting network stack 
and library cache.
2018-01-11 14:50:09.341 [flink-akka.actor.default-dispatcher-15] INFO  
o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Determined BLOB 
server address to be /10.80.54.126:31025. Starting BLOB cache.
2018-01-11 14:50:09.341 [flink-akka.actor.default-dispatcher-15] INFO  
org.apache.flink.runtime.blob.PermanentBlobCache  - Created BLOB cache storage 
directory tmp/blobStore-078da52f-864b-458a-9c3b-d07f0e0a3b30
2018-01-11 14:50:09.342 [flink-akka.actor.default-dispatcher-15] INFO  
org.apache.flink.runtime.blob.TransientBlobCache  - Created BLOB cache storage 
directory tmp/blobStore-8e5f0854-215d-4718-b13e-8f77f0f7af51

2018-01-11 14:50:10.272 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates 
(ip-10-80-52-23.us-west-2.compute.internal:2181)] INFO  
c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket 
connection to server ip-10-80-52-23.us-west-2.compute.internal/10.80.52.23:2181
2018-01-11 14:50:10.273 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates 
(ip-10-80-52-23.us-west-2.compute.internal:2181)] INFO  
c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection 
established to ip-10-80-52-23.us-west-2.compute.internal/10.80.52.23:2181, 
initiating session
2018-01-11 14:50:10.273 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates 
(ip-10-80-52-23.us-west-2.compute.internal:2181)] WARN  
c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
0x3c00002d8a7603de for server 
ip-10-80-52-23.us-west-2.compute.internal/10.80.52.23:2181, unexpected error, 
closing socket connection and attempting reconnect
java.lang.NoClassDefFoundError: 
com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches
        at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926)
        at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363)
        at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)



--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



> On Jan 11, 2018, at 10:11 AM, Jared Stehler 
> <jared.steh...@intellifylearning.com> wrote:
> 
> As another data point, here’s an except from a stack dump for the task 
> manager:
> 
> "heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> 
> filter-duplicates (5/10)-EventThread" #94 daemon prio=5 os_prio=0 
> tid=0x00007f48c04d4800 nid=
> 0x68ef waiting on condition [0x00007f48470eb000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000000cd6121c0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>         at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>         at 
> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501)
> 
>    Locked ownable synchronizers:
>         - None
> 
> "heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> 
> filter-duplicates (ip-10-80-52-23.us 
> <http://ip-10-80-52-23.us/>-west-2.compute.internal:2181)" #93 daemon prio=5 
> os_prio=0 tid=0x00007f48c04e1800 nid=0x68ee waiting on condition 
> [0x00007f48471ec000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>         at java.lang.Thread.sleep(Native Method)
>         at 
> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1051)
> 
>    Locked ownable synchronizers:
>         - None
> 
> "Sink: ES (5/10)-EventThread" #68 daemon prio=5 os_prio=0 
> tid=0x00007f48c80a4800 nid=0x6829 waiting on condition [0x00007f4851e08000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000000cc4f7950> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>         at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>         at 
> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501)
> 
>    Locked ownable synchronizers:
>         - None
> 
> "Sink: ES (ip-10-80-53-99.us 
> <http://ip-10-80-53-99.us/>-west-2.compute.internal:2181)" #67 daemon prio=5 
> os_prio=0 tid=0x00007f48c80aa000 nid=0x6827 runnable [0x00007f4851f09000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>         at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>         - locked <0x00000000cc4f7c18> (a sun.nio.ch.Util$3)
>         - locked <0x00000000cc4f7c08> (a 
> java.util.Collections$UnmodifiableSet)
>         - locked <0x00000000cc4f7bc0> (a sun.nio.ch.EPollSelectorImpl)
>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>         at 
> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:349)
>         at 
> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> 
>    Locked ownable synchronizers:
>         - None
> 
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703
> 
> 
> 
>> On Jan 11, 2018, at 10:07 AM, Jared Stehler 
>> <jared.steh...@intellifylearning.com 
>> <mailto:jared.steh...@intellifylearning.com>> wrote:
>> 
>> I’m seeing sporadic issues where it appears that curator (or other) user 
>> threads are left running after a stream shutdown, and then the user class 
>> loader goes away and I get spammed with ClassNotFoundExceptions… I’m 
>> wondering if this might have something to do with perhaps the 
>> UserClassLoader being shut down before close is invoked on all operators?
>> 
>> Here’s a stack trace I see from an attempt at closing an elastic search sink:
>> 
>> java.lang.ClassNotFoundException: 
>> com.intellify.flink.shaded.curator.org.apache.curator.utils.CloseableUtils
>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>     at 
>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>     at 
>> com.intellify.flink.shaded.curator.org.apache.curator.ConnectionState.close(ConnectionState.java:119)
>>     at 
>> com.intellify.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.close(CuratorZookeeperClient.java:227)
>>     at 
>> com.intellify.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.close(CuratorFrameworkImpl.java:381)
>>     at 
>> com.intellify.config.ManagedCuratorFramework.reallyClose(ManagedCuratorFramework.java:48)
>>     at 
>> com.intellify.config.ArchaiusInitializer.close(ArchaiusInitializer.java:75)
>>     at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303)
>>     at 
>> com.intellify.flink.shared.config.SharedIntellifyConfigProvider.close(SharedIntellifyConfigProvider.java:56)
>>     at 
>> com.intellify.flink.shared.config.SerializableLiveProperty.close(SerializableLiveProperty.java:68)
>>     at 
>> com.intellify.flink.shared.elasticsearch.LiveResolvingEs1ApiCallBridge.cleanup(LiveResolvingEs1ApiCallBridge.java:105)
>>     at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:323)
>>     at 
>> com.intellify.flink.shared.tracer.TracingSink.close(TracingSink.java:50)
>>     at 
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>>     at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>     at java.lang.Thread.run(Thread.java:748)
>> I’m using a curator connection for archaius, and closing it in the call 
>> bridge’s cleanup method. I’m ensuring that I’m not reaching up into the 
>> parent class loader by shading curator and zookeeper. 
>> 
>> I also see the following on repeat in my task manager log:
>> 
>> 2018-01-11 14:53:13.313 [heartbeat-filter -> input-trace-filter -> 
>> filter-inactive-ds -> filter-duplicates (ip-10-80-53-99.us 
>> <http://ip-10-80-53-99.us/>-west-2.compute.internal:2181)] WARN  
>> c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
>> 0x3c00002d8a7603de for server ip-10-80-53-99.us 
>> <http://ip-10-80-53-99.us/>-west-2.compute.internal/10.80.53.99:2181, 
>> unexpected error, closing socket connection and attempting reconnect
>> java.lang.NoClassDefFoundError: 
>> com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches
>>      at 
>> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926)
>>      at 
>> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363)
>>      at 
>> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
>> 
>> 
>> Does anyone have any insight into what might be happening here? Does this 
>> seem like I’m not closing a thread properly, or something else entirely?
>> 
>> 
>> --
>> Jared Stehler
>> Chief Architect - Intellify Learning
>> o: 617.701.6330 x703
>> 
>> 
>> 
> 

Reply via email to