1) There's no mechanism in the API to restrict the number of  number of readers across several sources. I can't quite think of a way to achieve this; maybe Kostas has an idea.

2) You're mixing  up the Java Streams and Finks DataStream API.

Try this:

s3PathList.stream()
.map(...)
.reduce(...)
.map(joinedStream -> stream.map(new FlatMapFunction...))
.map(joinedStream-> joinedStream.addSink...)

On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
Hi Team,

Could you please help me here. I’m sorry for asking on such short notice but my work has stopped due to this.


Regards,
Satya

On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <satyaadi...@gmail.com <mailto:satyaadi...@gmail.com>> wrote:

    Hi Shesnay/Team,

    Thank you so much for the reply.In the continuation of the
    previous email, below is the block diagram where I am reading the
    file  from s3 and pushing it to kafka.Now with the current setup,
    I have total 4 directory based on the readfile method  from flink
    environment ,we are  creating 4 readers parallely to process the
    data from s3 .

    Below are my Questions:
    1. Can we restrict the no. of readers to process the data
    parallely.  e.g let's say if  we have a thousand of directory , in
    that case i want to restrict the no. of readers to 10 and ten
    parallel threads will continue with 100 sequential reading of the
    directory per thread to consume the data .

    2.In between the two flink operators i.e s3 reader and kafka sink
    , i just want to implement one more operator in order to
    transform the data which i am reading from s3 bucket and then want
    to push into the kafka sink. Below is my working code.Here i am
    finding  difficulties to implement  map operator in order to
    transform the union of datastreams  by applying union method over
    each directory's reader before pushing to kafka.

    List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);

    s3PathList.stream()
    .map(directory -> S3Service.customInputStream(environment,
    directory, readerParallelism))
    .reduce(DataStream::union)
    .map(joinedStream ->
    joinedStream.addSink(kafkaProducer).name("Publish to " +
    kafkaTopicName));


    *Something like this I'm trying to do in order to achieve the
    above use case by applying FlatMap, it could be map as well:*
    s3PathList.stream()
    .map(directory -> S3Service.customInputStream(environment,
    directory, readerParallelism))
    .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
    String>() {
       @Override
       public void flatMap(String value, Collector<String> out) throws
    Exception {
        FinalJsonMessage m=objectMapper.readValue(value,
    FinalJsonMessage.class);
        System.out.println("Json string:: ------"+m);
       //transformation logic
         out.collect(value);
       }
     })
    .map(joinedStream ->
    joinedStream.addSink(kafkaProducer).name("Publish to " +
    kafkaTopicName));
    image.png
    Request your support on the same.
    Regards,
    Satya

    On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit
    <satyaadi...@gmail.com <mailto:satyaadi...@gmail.com>> wrote:

        Hi @ches...@apache.org <mailto:ches...@apache.org> ,

        Thanks for your support, it was really helpful.
        Do you know the list of directories when you submit the
        job? [Yes we do have]
        The impletemation is progress and will get back to you if any
        further challenges we may face.
        Appreciate your support in this regard.

        Regards,
        Satya

        On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit
        <satyaadi...@gmail.com <mailto:satyaadi...@gmail.com>> wrote:

            Thank you @Chesnay let me try this change .

            On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler
            <ches...@apache.org <mailto:ches...@apache.org>> wrote:

                You could also try using streams to make it a little
                more concise:

                directories.stream()
                    .map(directory ->createInputStream(environment, directory))
                    .reduce(DataStream::union)
                    .map(joinedStream -> joinedStream.addSink(kafka));


                On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
                Do you know the list of directories when you submit
                the job?

                If so, then you can iterate over them, create a
                source for each directory, union them, and apply the
                sink to the union.

                private static
                DataStream<String>createInputStream(StreamExecutionEnvironment
                environment, String directory) {
                   TextInputFormat format =new TextInputFormat(new
                org.apache.flink.core.fs.Path(directory));
                format.setNestedFileEnumeration(true); return
                environment.readFile(format, directory,
                FileProcessingMode.PROCESS_ONCE, -1,
                FilePathFilter.createDefaultFilter()); }

                public static void runJob()throws Exception {
                   StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
                List<String> directories =getDirectories();
                DataStream<String> joinedStreams =null; for (String
                directory : directories) {
                      DataStream<String> inputStream
                =createInputStream(environment, directory); if
                (joinedStreams ==null) {
                         joinedStreams = inputStream; }else {
                         joinedStreams.union(inputStream); }
                   }
                   // add a sanity check that there was at least 1
                directory

                   joinedStreams.addSink(kafka); }



                On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
                Hi Guys,

                Got stuck with it please help me here
                Regards,
                Satya

                On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
                <satyaadi...@gmail.com>
                <mailto:satyaadi...@gmail.com> wrote:

                Hi Guys,

                Sorry to bother you again, but someone could help
                me here? Any help in
                this regard will be much appreciated.

                Regards,
                Satya

                On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit
                <satyaadi...@gmail.com> <mailto:satyaadi...@gmail.com>
                wrote:

                Hi Guys,












                I need one help, any leads will be highly
                appreciated.I have written a
                flink streaming job to read the data from s3
                bucket and push to kafka.
                Below is the working source that deal with single
                s3 path:












                TextInputFormat format = new TextInputFormat(new
                org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));





                format.setNestedFileEnumeration(true);












                DataStream<String> inputStream =
                environment.readFile(format,
                "s3a://directory/2020-09-03/",
                FileProcessingMode.PROCESS_ONCE, -1,




                FilePathFilter.createDefaultFilter());












                inputStream.addSink(kafka);









                But my requirement is get the list of paths and
                pass them one by one to
                this environment.readFile() method.How we can
                achieve this.

















                Thanks,




                Satya



-- --------------------------
                Best Regards
                Satya Prakash
                (M)+91-9845111913


--
                --------------------------
                Best Regards
                Satya Prakash
                (M)+91-9845111913






-- --------------------------
            Best Regards
            Satya Prakash
            (M)+91-9845111913



-- --------------------------
        Best Regards
        Satya Prakash
        (M)+91-9845111913



-- --------------------------
    Best Regards
    Satya Prakash
    (M)+91-9845111913

--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913


Reply via email to