Hi,
   I have been facing issues while trying to read from a hdfs sequence file.

This is my code snippet

DataSource<Tuple2<Text, Text>> input = env
    .createInput(HadoopInputs.readSequenceFile(Text.class, Text.class,
ravenDataDir),
        TypeInformation.of(new TypeHint<Tuple2<Text, Text>>() {
        }));


Upon executing this in yarn cluster mode, I am getting following error
The type returned by the input format could not be automatically
determined. Please specify the TypeInformation of the produced type
explicitly by using the 'createInput(InputFormat, TypeInformation)' method
instead.
org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551)
flipkart.EnrichementFlink.main(EnrichementFlink.java:31)


When I add the TypeInformation myself as follows, I run into the same issue.

DataSource<Tuple2<Text, Text>> input = env
    .createInput(HadoopInputs.readSequenceFile(Text.class, Text.class,
ravenDataDir));




When I add these libraries in the lib folder,
flink-hadoop-compatibility_2.11-1.7.0.jar


the error changes to this

java.lang.NoClassDefFoundError:
org/apache/flink/api/common/typeutils/TypeSerializerSnapshot
at
org.apache.flink.api.java.typeutils.WritableTypeInfo.createSerializer(WritableTypeInfo.java:111)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
at
org.apache.flink.optimizer.postpass.JavaApiPostPass.createSerializer(JavaApiPostPass.java:283)
at
org.apache.flink.optimizer.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:252)
at
org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:97)
at
org.apache.flink.optimizer.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:81)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:527)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399)
at
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:379)
at
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:906)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:473)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)


Can someone help me resolve this issue?

Thanks,
Akshay

Reply via email to