I have many lzo files on HDFS in such path format: /logs/{id}/{date}/xxx[1-100].lzo
/logs/a/ds=2018-01-01/xxx1.lzo
/logs/b/ds=2018-01-01/xxx1.lzo
...
/logs/z/ds=2018-01-02/xxx1.lzo
...
/logs/z/ds=2020-05-01/xxx100.lzo
I'am using Flink Dataset to read those files by a range of {date} and apply some transformation. Since Flink official does not provide lzo inputformat so I use HadoopInputFormat to implement this. I currently cannot find a good way to give one fileglob path to include all files I need so I have to do it in a loop.
for (date=startDate; !date.isAfter(endDate); date = date.plusDays(1)) {
FileStatus[] fileStatuses = fs.globStatus(new org.apache.hadoop.fs.Path("/logs/*" , "ds=" + date.format(DateTimeFormatter.ISO_LOCAL_DATE) + "/*.lzo"));
for (FileStatus fileStatus: fileStatuses) {
String path = fileStatus.getPath().toString();
lzoFiles.add(path)
}
}
// ...
// I have to initialize the source like that
Job job = new Job(conf);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, String.join(",", lzoFiles));
lzoSets = env.createInput(HadoopInputs.createHadoopInput(
new LzoTextInputFormat(),
LongWritable.class,
Text.class,
job)).name("lzo source").map(x -> x.f1.toString());
However when I submit this job to Flink. If I have a relatively long date range files to read, the job submission take so much time(10mins, even 20mins and more, I have already increase akka.timeout and web.timeout) to be prepared which I cannot accept this. It seems Flink take so much time to optimize the execution plan. Is there and good approach to make my program prepared quickly?