Hi @ches...@apache.org <ches...@apache.org> ,

Thanks for your support, it was really helpful.
Do you know the list of directories when you submit the job? [Yes we do
have]
The impletemation is progress and will get back to you if any further
challenges we may face.
Appreciate your support in this regard.

Regards,
Satya

On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <satyaadi...@gmail.com> wrote:

> Thank you @Chesnay let me try this change .
>
> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> You could also try using streams to make it a little more concise:
>>
>> directories.stream()
>>    .map(directory -> createInputStream(environment, directory))
>>    .reduce(DataStream::union)
>>    .map(joinedStream -> joinedStream.addSink(kafka));
>>
>>
>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>
>> Do you know the list of directories when you submit the job?
>>
>> If so, then you can iterate over them, create a source for each
>> directory, union them, and apply the sink to the union.
>>
>> private static
>> DataStream<String>createInputStream(StreamExecutionEnvironment environment,
>> String directory) {
>>    TextInputFormat format =new TextInputFormat(new
>> org.apache.flink.core.fs.Path(directory));
>> format.setNestedFileEnumeration(true); return environment.readFile(format,
>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>> FilePathFilter.createDefaultFilter()); }
>>
>> public static void runJob()throws Exception {
>>    StreamExecutionEnvironment environment =
>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>> directories =getDirectories(); DataStream<String> joinedStreams =null; for
>> (String directory : directories) {
>>       DataStream<String> inputStream =createInputStream(environment,
>> directory); if (joinedStreams ==null) {
>>          joinedStreams = inputStream; }else {
>>          joinedStreams.union(inputStream); }
>>    }
>>    // add a sanity check that there was at least 1 directory
>>
>>    joinedStreams.addSink(kafka); }
>>
>>
>>
>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>
>> Hi Guys,
>>
>> Got stuck with it please help me here
>> Regards,
>> Satya
>>
>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <satyaadi...@gmail.com>
>> <satyaadi...@gmail.com> wrote:
>>
>> Hi Guys,
>>
>> Sorry to bother you again, but someone could help me here? Any help in
>> this regard will be much appreciated.
>>
>> Regards,
>> Satya
>>
>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <satyaadi...@gmail.com>
>> <satyaadi...@gmail.com>
>> wrote:
>>
>> Hi Guys,
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> I need one help, any leads will be highly appreciated.I have written a
>> flink streaming job to read the data from s3 bucket and push to kafka.
>> Below is the working source that deal with single s3 path:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> TextInputFormat format = new TextInputFormat(new
>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>
>>
>>
>>
>> format.setNestedFileEnumeration(true);
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> DataStream<String> inputStream = environment.readFile(format,
>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>
>>
>>
>>
>> FilePathFilter.createDefaultFilter());
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> inputStream.addSink(kafka);
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> But my requirement is get the list of paths and pass them one by one to
>> this environment.readFile() method.How we can achieve this.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Thanks,
>>
>>
>>
>>
>> Satya
>>
>>
>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>>
>> --
>>
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>>
>>
>>
>>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>


-- 
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913

Reply via email to