Hi Ravi,

Tried with both new and legacy mode, it works locally but on cluster I am
getting this exception, I am passing jackson ObjectNode class, should be
serializable. What do you think?

On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, <
ravibhushanratna...@gmail.com> wrote:

> Hi Vinay,
>
> Please make sure that all your custom code is serializable. You can run
> this using new mode.
>
> Thanks,
> Ravi
>
> On Sat 20 Jul, 2019, 08:13 Vinay Patil, <vinay18.pa...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to run a pipeline on Flink 1.8.1 ,getting the following
>> exception:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *java.lang.StackOverflowError at
>> java.lang.Exception.<init>(Exception.java:66) at
>> java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56)
>> at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at
>> java.lang.Class.getDeclaredMethod(Class.java:2130) at
>> org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)*
>>
>> I have even tried running in legacy mode, the pipeline code is :
>>
>> private void execute(String[] args) {
>>         ParameterTool pt = ParameterTool.fromArgs(args);
>>
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         //env.setMaxParallelism(30);
>>         env.setParallelism(20);
>>
>>         env.enableCheckpointing(5000);
>>         StateBackend backend = new
>> FsStateBackend(pt.getRequired("checkpoint_path"), true);
>>         env.setStateBackend(backend);
>>
>>         FlinkDynamoDBStreamsConsumer<ObjectNode>
>> flinkDynamoDBStreamsConsumer =
>>                 new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME,
>> new JsonNodeDeserializationSchema(),
>>                         dynamodbStreamsConsumerConfig);
>>
>>         SingleOutputStreamOperator<ObjectNode> sourceStream = env
>>                 .addSource(flinkDynamoDBStreamsConsumer)
>>                 .name("Dynamo DB Streams");
>>
>>         sourceStream
>>                 .keyBy(new CdcKeySelector())
>>                 .addSink(new
>> FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
>>                         new JsonSerializationSchema()))
>>                 .name("Kafka Sink");
>>
>>         try {
>>             env.execute();
>>         } catch (Exception e) {
>>             System.out.println("Caught exception for pipeline" +
>> e.getMessage());
>>             e.printStackTrace();
>>         }
>>     }
>>
>> Regards,
>> Vinay Patil
>>
>

Reply via email to