I am reading files from a directory with this statement:

/val text = env.readFile(new MyInputFormat(), "/path/to/input/dir/")/

/MyInputFormat/ extends /DelimitedInputFormat/ which extends
/DelimitedInputFormat<Record>/.

In the output Record, I need to add a field that stores the name (or full
path) of the file from which it has been read.

In order to do that, I wrote the following code in /MyInputFormat/ :

/
private final StringValue fname = new StringValue();
...
@Override
        public Record readRecord(Record reusable, byte[] bytes, int offset, int
numBytes) {//throws IOException {
                                
                fname.setValue(this.getFilePath().toString());  
                reusable.setField(0, this.fname);
                return reusable;
        }
/

The code compiles and runs without problems, but in the output I always get:
//path/to/input/dir//
for every record!
What I wanted to have is:
//path/to/input/dir/file1.txt/
//path/to/input/dir/file3.txt/
/.../

Can anyone help me?



--
View this message in context: 
http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/Getting-the-name-of-a-file-in-a-directory-tp801.html
Sent from the Apache Flink (Incubator) User Mailing List archive. mailing list 
archive at Nabble.com.

Reply via email to