I have many lzo files on HDFS in such path format: /logs/{id}/{date}/xxx[1-100].lzo

/logs/a/ds=2018-01-01/xxx1.lzo
/logs/b/ds=2018-01-01/xxx1.lzo
...
/logs/z/ds=2018-01-02/xxx1.lzo
...
/logs/z/ds=2020-05-01/xxx100.lzo

I'am using Flink Dataset to read those files by a range of {date} and apply some transformation. Since Flink official does not provide lzo inputformat so I use HadoopInputFormat to implement this. I currently cannot find a good way to give one fileglob path to include all files I need so I have to do it in a loop.

for (date=startDate; !date.isAfter(endDate); date = date.plusDays(1)) {
    FileStatus[] fileStatuses = fs.globStatus(new org.apache.hadoop.fs.Path("/logs/*" , "ds=" + date.format(DateTimeFormatter.ISO_LOCAL_DATE) + "/*.lzo"));
    for (FileStatus fileStatus: fileStatuses) {
        String path = fileStatus.getPath().toString();
        lzoFiles.add(path)
    }
}       
// ...
// I have to initialize the source like that
Job job = new Job(conf);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, String.join(",", lzoFiles));
lzoSets = env.createInput(HadoopInputs.createHadoopInput(
    new LzoTextInputFormat(),
    LongWritable.class,
    Text.class,
    job)).name("lzo source").map(x -> x.f1.toString());

However when I submit this job to Flink. If I have a relatively long date range files to read, the job submission take so much time(10mins, even 20mins and more, I have already increase akka.timeout and web.timeout) to be prepared which I cannot accept this. It seems Flink take so much time to optimize the execution plan. Is there and good approach to make my program prepared quickly?
Shengnan

Reply via email to