Hi, I have been facing issues while trying to read from a hdfs sequence file.
This is my code snippet DataSource<Tuple2<Text, Text>> input = env .createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, ravenDataDir), TypeInformation.of(new TypeHint<Tuple2<Text, Text>>() { })); Upon executing this in yarn cluster mode, I am getting following error The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead. org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551) flipkart.EnrichementFlink.main(EnrichementFlink.java:31) When I add the TypeInformation myself as follows, I run into the same issue. DataSource<Tuple2<Text, Text>> input = env .createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, ravenDataDir)); When I add these libraries in the lib folder, flink-hadoop-compatibility_2.11-1.7.0.jar the error changes to this java.lang.NoClassDefFoundError: org/apache/flink/api/common/typeutils/TypeSerializerSnapshot at org.apache.flink.api.java.typeutils.WritableTypeInfo.createSerializer(WritableTypeInfo.java:111) at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107) at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52) at org.apache.flink.optimizer.postpass.JavaApiPostPass.createSerializer(JavaApiPostPass.java:283) at org.apache.flink.optimizer.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:252) at org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:97) at org.apache.flink.optimizer.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:81) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:527) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399) at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:379) at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:906) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:473) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) Can someone help me resolve this issue? Thanks, Akshay