Do you have any logs that could help us identify the issue? How many files
is a long date range?

In general, you could try out the same program with the DataStream API (use
StreamExecutionEnvironment#readFile [1] with PROCESS_ONCE to get a behavior
equivalent to batch). DataStreams are only slightly optimized. Since
DataSet API will be eventually deprecated, that is also the recommended way
if you don't need feature specific to DataSet API.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html#data-sources

On Tue, May 19, 2020 at 7:48 AM ysnakie <ysna...@hotmail.com> wrote:

> I have many lzo files on HDFS in such path format:
> /logs/{id}/{date}/xxx[1-100].lzo
>
> */logs/a/ds=2018-01-01/xxx1.lzo*
> */logs/b/ds=2018-01-01/xxx1.lzo*
> *...*
> */logs/z/ds=2018-01-02/xxx1.lzo*
> *...*
> */logs/z/ds=2020-05-01/xxx100.lzo*
>
> I'am using Flink Dataset to read those files by a range of {date} and
> apply some transformation. Since Flink official does not provide lzo
> inputformat so I use HadoopInputFormat to implement this. I currently
> cannot find a good way to give one fileglob path to include all files I
> need so I have to do it in a loop.
>
> for (date=startDate; !date.isAfter(endDate); date = date.plusDays(1)) {
>     FileStatus[] fileStatuses = fs.globStatus(new
> org.apache.hadoop.fs.Path("/logs/*" , "ds=" +
> date.format(DateTimeFormatter.ISO_LOCAL_DATE) + "/*.lzo"));
>     for (FileStatus fileStatus: fileStatuses) {
>         String path = fileStatus.getPath().toString();
>         lzoFiles.add(path)
>     }
> }
> // ...
> // I have to initialize the source like that
> Job job = new Job(conf);
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job,
> String.join(",", lzoFiles));
> lzoSets = env.createInput(HadoopInputs.createHadoopInput(
>     new LzoTextInputFormat(),
>     LongWritable.class,
>     Text.class,
>     job)).name("lzo source").map(x -> x.f1.toString());
>
> However when I submit this job to Flink. If I have a relatively long date
> range files to read, the job submission take so much time(10mins, even
> 20mins and more, I have already increase akka.timeout and web.timeout) to
> be prepared which I cannot accept this. It seems Flink take so much time to
> optimize the execution plan. Is there and good approach to make my program
> prepared quickly?
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ysnakie&uid=ysnakie%40hotmail.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmc8371a9788890d59e567ed336b96676b.jpg&items=%5B%22ysnakie%40hotmail.com%22%5D>
> Shengnan
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to