I'm using flink 1.5.0. The test gives the same error even with flink-1.6.0. Also, I introduced a Thread.sleep(30000); before the assert statement. That didn't help either.
Regards, James On Thu, Oct 11, 2018 at 11:11 AM James Isaac <dataenginee...@gmail.com> wrote: > Hi, > > I'm trying to run an integration test of my flink application. My test > code looks like this: > > public class HttpsCsvIngestorTest extends AbstractTestBase { > > private final static Logger LOG = > LoggerFactory.getLogger(HttpsCsvIngestorTest.class); > > @Test > public void testHttpsCsvIngestion() throws Exception { > > Thread flinkJob = new Thread(new Runnable() { > @Override > public void run() { > String[] args = new String[] { "--configFile", > "src/test/resources/config.properties", "--secretKey", > "12345" }; > JobExecutionResult execResult = > CsvProcessorFlinkDriver.runFlinkJob(args); > } > }); > > flinkJob.start(); > LOG.info("Starting flink job"); > > Thread.sleep(10000); > String[] args2 = new String[] { "localhost", filename }; > FileUploadClient.main(args2); > > assertTrue(new File(System.getProperty("user.dir") + > File.separator + "src/main/resources/Result.csv") > .exists()); > System.out.println("Test completed. Going to shutdown flink job"); > } > > } > > > Here I'm starting my flink application from a child thread, and uploading > a file for processing from the main thread. The test runs fine, and I get > the expected result file. > However, I get the following error at the end, when the application is > being shut down: > > 2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map -> > Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477 - Error > during disposal of stream operator. > java.lang.NoSuchMethodError: > org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:745) > 2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map -> > Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477 - Error > during disposal of stream operator. > java.lang.NoSuchMethodError: > org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:745) > 2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map -> > Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477 - Error > during disposal of stream operator. > java.lang.NoSuchMethodError: > org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:745) > 2018-10-10 16:24:46,671 ERROR Source: JettyServerFileSource -> Map -> > Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477 - Error > during disposal of stream operator. > java.lang.NoSuchMethodError: > org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:745) > 2018-10-10 16:24:46,671 ERROR Source: JettyServerFileSource -> Map -> > Process -> Sink: Unnamed (1/1) Task Task.java:843 - FATAL - exception in > resource cleanup of task Source: JettyServerFileSource -> Map -> Process -> > Sink: Unnamed (1/1) (12d3e0627e62ad44c57c45b720682e56). > java.lang.IllegalStateException: Memory manager has been shut down. > at > org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:470) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:824) > at java.lang.Thread.run(Thread.java:745) > org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve > the JobExecutionResult from the JobManager. > at > org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300) > at > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387) > at > org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:566) > at > org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:540) > at > org.apache.flink.runtime.minicluster.FlinkMiniCluster.executeJobBlocking(FlinkMiniCluster.scala:714) > at > org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79) > at > mycode.CsvProcessorFlinkDriver.flinkJettyExecution(CsvProcessorFlinkDriver.java:132) > at > mycode.CsvProcessorFlinkDriver.runFlinkJob(CsvProcessorFlinkDriver.java:56) > at com.demo.code.HttpsCsvIngestorTest$1.run(HttpsCsvIngestorTest.java:30) > at java.lang.Thread.run(Thread.java:745) > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/$a#-1711434410]] after [21474835000 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.messages.JobClientMessages$SubmitJobAndWait". > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.run(LightArrayRevolverScheduler.scala:338) > at > akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(LightArrayRevolverScheduler.scala:142) > at > akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(LightArrayRevolverScheduler.scala:141) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > akka.actor.LightArrayRevolverScheduler.close(LightArrayRevolverScheduler.scala:140) > at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:892) > at > akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply$mcV$sp(ActorSystem.scala:826) > at > akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:826) > at > akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:826) > at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:842) > at > akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$addRec$1$1.applyOrElse(ActorSystem.scala:1021) > at > akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$addRec$1$1.applyOrElse(ActorSystem.scala:1021) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Here CsvProcessorFlinkDriver.java:132 is the executionResult = env.execute > line. > > Is there something I'm doing wrong? I also notice that if I start the > flink application in the main thread of the test class(instead of from a > child thread), execution does not progress to the lines starting from > LOG.info("Starting flink job"); > > Regards, > James > >