Hi all I want to test to submit a job from my local IDE and I deployed a Flink cluster in my vm. Here is my code from Flink 1.9 document and add some of my parameters. public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment .createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");
DataSet<String> data = env.readTextFile("/tmp/file"); data .filter(new FilterFunction<String>() { public boolean filter(String value) { return value.startsWith("http://"); } }) .writeAsText("/tmp/file1"); env.execute(); } When I run the program, I raises the error like: Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 1f32190552e955bb2048c31930edfb0e) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186) at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820) at TestMain.main(TestMain.java:25) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259) ... 8 more Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: TestMain$1 at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: TestMain$1 at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290) at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39) at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90) at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315) at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106) ... 3 more Caused by: java.lang.ClassNotFoundException: TestMain$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) ... 9 more My understanding is that when using remote environment, I don’t need to upload my program jar to flink cluster. So can anyone help me for this issue ? Thanks, SImon