Hi Ravi, Tried with both new and legacy mode, it works locally but on cluster I am getting this exception, I am passing jackson ObjectNode class, should be serializable. What do you think?
On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, < ravibhushanratna...@gmail.com> wrote: > Hi Vinay, > > Please make sure that all your custom code is serializable. You can run > this using new mode. > > Thanks, > Ravi > > On Sat 20 Jul, 2019, 08:13 Vinay Patil, <vinay18.pa...@gmail.com> wrote: > >> Hi, >> >> I am trying to run a pipeline on Flink 1.8.1 ,getting the following >> exception: >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *java.lang.StackOverflowError at >> java.lang.Exception.<init>(Exception.java:66) at >> java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56) >> at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at >> java.lang.Class.getDeclaredMethod(Class.java:2130) at >> org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)* >> >> I have even tried running in legacy mode, the pipeline code is : >> >> private void execute(String[] args) { >> ParameterTool pt = ParameterTool.fromArgs(args); >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> //env.setMaxParallelism(30); >> env.setParallelism(20); >> >> env.enableCheckpointing(5000); >> StateBackend backend = new >> FsStateBackend(pt.getRequired("checkpoint_path"), true); >> env.setStateBackend(backend); >> >> FlinkDynamoDBStreamsConsumer<ObjectNode> >> flinkDynamoDBStreamsConsumer = >> new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME, >> new JsonNodeDeserializationSchema(), >> dynamodbStreamsConsumerConfig); >> >> SingleOutputStreamOperator<ObjectNode> sourceStream = env >> .addSource(flinkDynamoDBStreamsConsumer) >> .name("Dynamo DB Streams"); >> >> sourceStream >> .keyBy(new CdcKeySelector()) >> .addSink(new >> FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams", >> new JsonSerializationSchema())) >> .name("Kafka Sink"); >> >> try { >> env.execute(); >> } catch (Exception e) { >> System.out.println("Caught exception for pipeline" + >> e.getMessage()); >> e.printStackTrace(); >> } >> } >> >> Regards, >> Vinay Patil >> >