Hi All,

I am trying to get some file from HDFS which is locally installed. But I am
not able to. I tried with both these ways. But all the time the program is
ending with "Process finished with exit code 239." Any help will be helpful-

public class Processor {


  public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env =
ExecutionEnvironment.*getExecutionEnvironment*();
        //env.setParallelism(1);

        Job job = Job.*getInstance*();
        HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new
HadoopInputFormat<LongWritable, Text>(new TextInputFormat(),
LongWritable.class, Text.class, job);
        TextInputFormat.*addInputPath*(job, new
Path("hdfs://localhost:9000/newfolder/testdata1"));
        DataSet<Tuple2<LongWritable, Text>> text =
env.createInput(hadoopInputFormat);
        DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());



        words.print();
        env.execute("Processor");
    }


    public static final class Tokenizer extends
RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String,
Integer>> {
        @Override
        public void flatMap(Tuple2<LongWritable, Text> value,
org.apache.flink.util.Collector<Tuple2<String, Integer>> out) throws
Exception {
            // normalize and split the line
            String line = value.f1.toString();
            String[] tokens = line.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}








  public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env =
ExecutionEnvironment.*getExecutionEnvironment*();
        //env.setParallelism(1);

        DataSet<Tuple2<LongWritable, Text>> input;

        input = env.createInput(HadoopInputs.*readHadoopFile*(new
TextInputFormat(),
                LongWritable.class, Text.class,
"hdfs://localhost:9000/newfolder/testdata1"));

        DataSet<String> stringInput = input.map(new
MapFunction<Tuple2<LongWritable, Text>, String>() {
            @Override
            public String map(Tuple2<LongWritable, Text>
longWritableTextTuple2) throws Exception {
                return longWritableTextTuple2.f1.toString();
            }
        });

        stringInput.print();

        env.execute("Processor");
    }

Reply via email to