Here’s a more complete view of the task manager log from the start of this occurrence:
2018-01-11 14:50:08.286 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-54-205.us-west-2.compute.internal:2181)] INFO c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Unable to read additional data from server sessionid 0x3c00002d8a7603de, likely server has closed socket, closing socket connection and attempting reconnect 2018-01-11 14:50:08.388 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (5/10)-EventThread] INFO c.i.f.s.c.o.a.curator.framework.state.ConnectionStateManager - State change: SUSPENDED 2018-01-11 14:50:08.412 [flink-akka.actor.default-dispatcher-16] INFO o.a.flink.mesos.runtime.clusterframework.MesosTaskManager - TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@10.80.54.126:31024/user/jobmanager: JobManager requested disconnect: JobManager is no longer the leader 2018-01-11 14:50:08.412 [flink-akka.actor.default-dispatcher-16] INFO o.a.flink.mesos.runtime.clusterframework.MesosTaskManager - Cancelling all computations and discarding all cached data. 2018-01-11 14:50:08.450 [flink-akka.actor.default-dispatcher-16] INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: ES (5/10) (1a2951add18548188742e85d98da271f). 2018-01-11 14:50:08.452 [flink-akka.actor.default-dispatcher-16] INFO org.apache.flink.runtime.taskmanager.Task - Sink: ES (5/10) (1a2951add18548188742e85d98da271f) switched from RUNNING to FAILED. java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@10.80.54.126:31024/user/jobmanager: JobManager requested disconnect: JobManager is no longer the leader at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1073) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:314) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:121) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2018-01-11 14:50:08.486 [flink-akka.actor.default-dispatcher-16] INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Sink: ES (5/10) (1a2951add18548188742e85d98da271f). 2018-01-11 14:50:08.506 [flink-akka.actor.default-dispatcher-16] INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: Kafka (5/10) (71bebe47ce524c0d535845b1e4d9c595). 2018-01-11 14:50:08.506 [flink-akka.actor.default-dispatcher-16] INFO org.apache.flink.runtime.taskmanager.Task - Sink: Kafka (5/10) (71bebe47ce524c0d535845b1e4d9c595) switched from RUNNING to FAILED. ... 2018-01-11 14:50:08.550 [Sink: Kafka (5/10)] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 2018-01-11 14:50:08.553 [Sink: Kafka (5/10)] ERROR org.apache.kafka.clients.producer.KafkaProducer - Interrupted while joining ioThread java.lang.InterruptedException: null at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1260) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:317) at com.intellify.flink.crusher.executor.sink.TracingSourceRecordSink.close(TracingSourceRecordSink.java:67) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) 2018-01-11 14:50:08.639 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (5/10)] INFO c.i.flink.shared.config.SharedIntellifyConfigProvider - Closing archaius initializer 2018-01-11 14:50:08.741 [Sink: ES (5/10)] INFO c.i.flink.shared.config.SharedIntellifyConfigProvider - Closing archaius initializer 2018-01-11 14:50:08.873 [Sink: Kafka (5/10)] ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator. org.apache.kafka.common.KafkaException: Failed to close kafka producer at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1062) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:317) at com.intellify.flink.crusher.executor.sink.TracingSourceRecordSink.close(TracingSourceRecordSink.java:67) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.InterruptedException: null at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1260) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031) ... 10 common frames omitted 2018-01-11 14:50:08.924 [flink-akka.actor.default-dispatcher-16] INFO o.a.flink.mesos.runtime.clusterframework.MesosTaskManager - Disassociating from JobManager 2018-01-11 14:50:08.955 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (5/10)] ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator. java.lang.NoClassDefFoundError: com/intellify/flink/shaded/curator/org/apache/curator/utils/CloseableUtils at com.intellify.flink.shaded.curator.org.apache.curator.ConnectionState.close(ConnectionState.java:119) at com.intellify.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.close(CuratorZookeeperClient.java:227) at com.intellify.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.close(CuratorFrameworkImpl.java:381) at com.intellify.config.ManagedCuratorFramework.reallyClose(ManagedCuratorFramework.java:48) at com.intellify.config.ArchaiusInitializer.close(ArchaiusInitializer.java:75) at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303) at com.intellify.flink.shared.config.SharedIntellifyConfigProvider.close(SharedIntellifyConfigProvider.java:56) at com.intellify.flink.shared.config.SerializableLiveProperty.close(SerializableLiveProperty.java:68) at com.intellify.flink.shared.redis.RedisFactory.close(RedisFactory.java:56) at com.intellify.flink.crusher.executor.process.DuplicateFilterFunction.close(DuplicateFilterFunction.java:90) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.intellify.flink.shaded.curator.org.apache.curator.utils.CloseableUtils at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 16 common frames omitted 2018-01-11 14:50:08.962 [Curator-Framework-0] INFO c.i.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting 2018-01-11 14:50:09.068 [flink-akka.actor.default-dispatcher-16] INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache 2018-01-11 14:50:09.070 [Sink: Kafka (5/10)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Sink: Kafka (5/10) (71bebe47ce524c0d535845b1e4d9c595) [FAILED] 2018-01-11 14:50:09.070 [Sink: StagingLake (5/10)] ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut down timer service java.lang.InterruptedException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) 2018-01-11 14:50:09.146 [flink-akka.actor.default-dispatcher-16] INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache 2018-01-11 14:50:09.200 [flink-akka.actor.default-dispatcher-16] INFO o.a.flink.mesos.runtime.clusterframework.MesosTaskManager - Trying to register at JobManager akka.tcp://flink@10.80.54.126:31024/user/jobmanager (attempt 1, timeout: 500 milliseconds) 2018-01-11 14:50:09.313 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-53-99.us-west-2.compute.internal:2181)] INFO c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening socket connection to server ip-10-80-53-99.us-west-2.compute.internal/10.80.53.99:2181 2018-01-11 14:50:09.314 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-53-99.us-west-2.compute.internal:2181)] INFO c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket connection established to ip-10-80-53-99.us-west-2.compute.internal/10.80.53.99:2181, initiating session 2018-01-11 14:50:09.315 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-53-99.us-west-2.compute.internal:2181)] WARN c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session 0x3c00002d8a7603de for server ip-10-80-53-99.us-west-2.compute.internal/10.80.53.99:2181, unexpected error, closing socket connection and attempting reconnect java.lang.NoClassDefFoundError: com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926) at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363) at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) Caused by: java.lang.ClassNotFoundException: com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.proto.SetWatches at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 3 common frames omitted 2018-01-11 14:50:09.329 [flink-akka.actor.default-dispatcher-15] INFO o.a.flink.mesos.runtime.clusterframework.MesosTaskManager - Trying to register at JobManager akka.tcp://flink@10.80.54.126:31024/user/jobmanager (attempt 1, timeout: 500 milliseconds) 2018-01-11 14:50:09.341 [flink-akka.actor.default-dispatcher-15] INFO o.a.flink.mesos.runtime.clusterframework.MesosTaskManager - Successful registration at JobManager (akka.tcp://flink@10.80.54.126:31024/user/jobmanager), starting network stack and library cache. 2018-01-11 14:50:09.341 [flink-akka.actor.default-dispatcher-15] INFO o.a.flink.mesos.runtime.clusterframework.MesosTaskManager - Determined BLOB server address to be /10.80.54.126:31025. Starting BLOB cache. 2018-01-11 14:50:09.341 [flink-akka.actor.default-dispatcher-15] INFO org.apache.flink.runtime.blob.PermanentBlobCache - Created BLOB cache storage directory tmp/blobStore-078da52f-864b-458a-9c3b-d07f0e0a3b30 2018-01-11 14:50:09.342 [flink-akka.actor.default-dispatcher-15] INFO org.apache.flink.runtime.blob.TransientBlobCache - Created BLOB cache storage directory tmp/blobStore-8e5f0854-215d-4718-b13e-8f77f0f7af51 2018-01-11 14:50:10.272 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-52-23.us-west-2.compute.internal:2181)] INFO c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening socket connection to server ip-10-80-52-23.us-west-2.compute.internal/10.80.52.23:2181 2018-01-11 14:50:10.273 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-52-23.us-west-2.compute.internal:2181)] INFO c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket connection established to ip-10-80-52-23.us-west-2.compute.internal/10.80.52.23:2181, initiating session 2018-01-11 14:50:10.273 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-52-23.us-west-2.compute.internal:2181)] WARN c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session 0x3c00002d8a7603de for server ip-10-80-52-23.us-west-2.compute.internal/10.80.52.23:2181, unexpected error, closing socket connection and attempting reconnect java.lang.NoClassDefFoundError: com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926) at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363) at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) -- Jared Stehler Chief Architect - Intellify Learning o: 617.701.6330 x703 > On Jan 11, 2018, at 10:11 AM, Jared Stehler > <jared.steh...@intellifylearning.com> wrote: > > As another data point, here’s an except from a stack dump for the task > manager: > > "heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> > filter-duplicates (5/10)-EventThread" #94 daemon prio=5 os_prio=0 > tid=0x00007f48c04d4800 nid= > 0x68ef waiting on condition [0x00007f48470eb000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000000cd6121c0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501) > > Locked ownable synchronizers: > - None > > "heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> > filter-duplicates (ip-10-80-52-23.us > <http://ip-10-80-52-23.us/>-west-2.compute.internal:2181)" #93 daemon prio=5 > os_prio=0 tid=0x00007f48c04e1800 nid=0x68ee waiting on condition > [0x00007f48471ec000] > java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at > com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1051) > > Locked ownable synchronizers: > - None > > "Sink: ES (5/10)-EventThread" #68 daemon prio=5 os_prio=0 > tid=0x00007f48c80a4800 nid=0x6829 waiting on condition [0x00007f4851e08000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000000cc4f7950> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501) > > Locked ownable synchronizers: > - None > > "Sink: ES (ip-10-80-53-99.us > <http://ip-10-80-53-99.us/>-west-2.compute.internal:2181)" #67 daemon prio=5 > os_prio=0 tid=0x00007f48c80aa000 nid=0x6827 runnable [0x00007f4851f09000] > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <0x00000000cc4f7c18> (a sun.nio.ch.Util$3) > - locked <0x00000000cc4f7c08> (a > java.util.Collections$UnmodifiableSet) > - locked <0x00000000cc4f7bc0> (a sun.nio.ch.EPollSelectorImpl) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at > com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:349) > at > com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) > > Locked ownable synchronizers: > - None > > -- > Jared Stehler > Chief Architect - Intellify Learning > o: 617.701.6330 x703 > > > >> On Jan 11, 2018, at 10:07 AM, Jared Stehler >> <jared.steh...@intellifylearning.com >> <mailto:jared.steh...@intellifylearning.com>> wrote: >> >> I’m seeing sporadic issues where it appears that curator (or other) user >> threads are left running after a stream shutdown, and then the user class >> loader goes away and I get spammed with ClassNotFoundExceptions… I’m >> wondering if this might have something to do with perhaps the >> UserClassLoader being shut down before close is invoked on all operators? >> >> Here’s a stack trace I see from an attempt at closing an elastic search sink: >> >> java.lang.ClassNotFoundException: >> com.intellify.flink.shaded.curator.org.apache.curator.utils.CloseableUtils >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at >> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> at >> com.intellify.flink.shaded.curator.org.apache.curator.ConnectionState.close(ConnectionState.java:119) >> at >> com.intellify.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.close(CuratorZookeeperClient.java:227) >> at >> com.intellify.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.close(CuratorFrameworkImpl.java:381) >> at >> com.intellify.config.ManagedCuratorFramework.reallyClose(ManagedCuratorFramework.java:48) >> at >> com.intellify.config.ArchaiusInitializer.close(ArchaiusInitializer.java:75) >> at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303) >> at >> com.intellify.flink.shared.config.SharedIntellifyConfigProvider.close(SharedIntellifyConfigProvider.java:56) >> at >> com.intellify.flink.shared.config.SerializableLiveProperty.close(SerializableLiveProperty.java:68) >> at >> com.intellify.flink.shared.elasticsearch.LiveResolvingEs1ApiCallBridge.cleanup(LiveResolvingEs1ApiCallBridge.java:105) >> at >> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:323) >> at >> com.intellify.flink.shared.tracer.TracingSink.close(TracingSink.java:50) >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> at java.lang.Thread.run(Thread.java:748) >> I’m using a curator connection for archaius, and closing it in the call >> bridge’s cleanup method. I’m ensuring that I’m not reaching up into the >> parent class loader by shading curator and zookeeper. >> >> I also see the following on repeat in my task manager log: >> >> 2018-01-11 14:53:13.313 [heartbeat-filter -> input-trace-filter -> >> filter-inactive-ds -> filter-duplicates (ip-10-80-53-99.us >> <http://ip-10-80-53-99.us/>-west-2.compute.internal:2181)] WARN >> c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session >> 0x3c00002d8a7603de for server ip-10-80-53-99.us >> <http://ip-10-80-53-99.us/>-west-2.compute.internal/10.80.53.99:2181, >> unexpected error, closing socket connection and attempting reconnect >> java.lang.NoClassDefFoundError: >> com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches >> at >> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926) >> at >> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363) >> at >> com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) >> >> >> Does anyone have any insight into what might be happening here? Does this >> seem like I’m not closing a thread properly, or something else entirely? >> >> >> -- >> Jared Stehler >> Chief Architect - Intellify Learning >> o: 617.701.6330 x703 >> >> >> >