Hi, I am trying to run a pipeline on Flink 1.8.1 ,getting the following exception:
*java.lang.StackOverflowError at java.lang.Exception.<init>(Exception.java:66) at java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56) at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at java.lang.Class.getDeclaredMethod(Class.java:2130) at org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)* I have even tried running in legacy mode, the pipeline code is : private void execute(String[] args) { ParameterTool pt = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.setMaxParallelism(30); env.setParallelism(20); env.enableCheckpointing(5000); StateBackend backend = new FsStateBackend(pt.getRequired("checkpoint_path"), true); env.setStateBackend(backend); FlinkDynamoDBStreamsConsumer<ObjectNode> flinkDynamoDBStreamsConsumer = new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME, new JsonNodeDeserializationSchema(), dynamodbStreamsConsumerConfig); SingleOutputStreamOperator<ObjectNode> sourceStream = env .addSource(flinkDynamoDBStreamsConsumer) .name("Dynamo DB Streams"); sourceStream .keyBy(new CdcKeySelector()) .addSink(new FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams", new JsonSerializationSchema())) .name("Kafka Sink"); try { env.execute(); } catch (Exception e) { System.out.println("Caught exception for pipeline" + e.getMessage()); e.printStackTrace(); } } Regards, Vinay Patil