Unfortunately, no. At least not in the case of FlnkRunner. As already mentioned, Beam does not currently collect information about location of source splits, thus this information cannot be passed to Flink.

> If there is no locality aware processing the whole thing falls into its face.

1 Gibps network (current networks should be actually at least 10 Gibps) is quite "close" to a single spinning disk throughput. On the other hand if target is "seconds" you might want to have a look at some SQL-based distributed analytical engines, Flink startup times itself will likely add significant overhead on top of the processing time.

On 7/2/24 16:03, Balogh, György wrote:
Hi Jan,
I need to process hundreds of GBs of data within seconds. With local data processing I can properly size a hw infrastructure to meet this (a couple of years back i did this with hadoop, worked perfectly). If there is no locality aware processing the whole thing falls into its face.

This comment suggests flink might do this under the hood?

https://stackoverflow.com/questions/38672091/flink-batch-data-local-planning-on-hdfs
Br,
Gyorgy


On Tue, Jul 2, 2024 at 3:08 PM Jan Lukavský <[email protected]> wrote:

    Hi Gyorgy,

    there is no concept of 'data locality' in Beam that would be
    analogous to how MapReduce used to work. The fact that tasks
    (compute) are co-located with storage on input is not transferred
    to Beam Flink pipelines. The whole concept is kind of ill defined
    in terms of Beam model, where tasks can be (at least in theory,
    depending on a runner) moved between workers in a distributed
    environment. The reason for this is that throughput (and cost) is
    dominated mostly by the ability to (uniformly) scale, not the
    costs associated with network transfers (this is actually most
    visible in the streaming case, where the data is already 'in
    motion'). The most common case in Beam is that compute is
    completely separated from storage (possible even in the extreme
    cases where streaming state is stored outside the compute of
    streaming pipeline - but cached locally). The resulting
    'stateless' nature of workers generally enables easier and more
    flexible scaling.

    Having said that, although Beam currently does not (AFAIK) try to
    leverage local reads, it _could_ be possible by a reasonable
    extension to how splittable DoFn [1] works so that it could make
    use of data locality. It would be non-trivial, tough and would
    definitely require support from the runner (Flink in this case).

    My general suggestion would be to implement a prototype and
    measure throughput and part of it possible related to networking
    before attempting to dig deeper into how to implement this in Beam
    Flink.

    Best,

     Jan

    [1] https://beam.apache.org/blog/splittable-do-fn/

    On 7/2/24 10:46, Balogh, György wrote:
    Hi Jan,
    Separating live and historic storage makes sense. I need a
    historic storage that can ensure data local processing using the
    beam - flink stack.
    Can I surely achieve this with HDFS? I can colocate hdfs nodes
    with flink workers. What exactly enforces that flink nodes will
    read local and not remote data?
    Thank you,
    Gyorgy

    On Mon, Jul 1, 2024 at 3:42 PM Jan Lukavský <[email protected]> wrote:

        Hi Gyorgy,

        comments inline.

        On 7/1/24 15:10, Balogh, György wrote:
        Hi Jan,
        Let me add a few more details to show the full picture. We
        have live datastreams (video analysis metadata) and we would
        like to run both live and historic pipelines on the metadata
        (eg.: live alerts, historic video searches).
        This should be fine due to Beam's unified model. You can
        write a PTransform that handles PCollection<...> without the
        need to worry if the PCollection was created from Kafka or
        some bounded source.
        We planned to use kafka to store the streaming data and
        directly run both types of queries on top. You are
        suggesting to consider having kafka with small retention to
        server the live queries and store the historic data
        somewhere else which scales better for historic queries? We
        need to have on prem options here. What options should we
        consider that scales nicely (in terms of IO parallelization)
        with beam? (eg. hdfs?)

        Yes, I would not say necessarily "small" retention, but
        probably "limited" retention. Running on premise you can
        choose from HDFS or maybe S3 compatible minio or some other
        distributed storage, depends on the scale and deployment
        options (e.g. YARN or k8s).

        I also happen to work on a system which targets exactly these
        streaming-batch workloads (persisting upserts from stream to
        batch for reprocessing), see [1]. Please feel free to contact
        me directly if this sounds interesting.

        Best,

         Jan

        [1] https://github.com/O2-Czech-Republic/proxima-platform

        Thank you,
        Gyorgy

        On Mon, Jul 1, 2024 at 9:21 AM Jan Lukavský
        <[email protected]> wrote:

            H Gyorgy,

            I don't think it is possible to co-locate tasks as you
            describe it. Beam has no information about location of
            'splits'. On the other hand, if batch throughput is the
            main concern, then reading from Kafka might not be the
            optimal choice. Although Kafka provides tiered storage
            for offloading historical data, it still somewhat limits
            scalability (and thus throughput), because the data have
            to be read by a broker and only then passed to a
            consumer. The parallelism is therefore limited by the
            number of Kafka partitions and not parallelism of the
            Flink job. A more scalable approach could be to persist
            data from Kafka to a batch storage (e.g. S3 or GCS) and
            reprocess it from there.

            Best,

             Jan

            On 6/29/24 09:12, Balogh, György wrote:
            Hi,
            I'm planning a distributed system with multiple kafka
            brokers co located with flink workers.
            Data processing throughput for historic queries is a
            main KPI. So I want to make sure all flink workers read
            local data and not remote. I'm defining my pipelines in
            beam using java.
            Is it possible? What are the critical config elements
            to achieve this?
            Thank you,
            Gyorgy

--
            György Balogh
            CTO
            E   [email protected]
            <mailto:[email protected]>
            M   +36 30 270 8342 <tel:+36%2030%20270%208342>
            A   HU, 1117 Budapest, Budafoki út 209.
            W   www.ultinous.com <http://www.ultinous.com>



--
        György Balogh
        CTO
        E       [email protected]
        <mailto:[email protected]>
        M       +36 30 270 8342 <tel:+36%2030%20270%208342>
        A       HU, 1117 Budapest, Budafoki út 209.
        W       www.ultinous.com <http://www.ultinous.com>



--
    György Balogh
    CTO
    E   [email protected] <mailto:[email protected]>
    M   +36 30 270 8342 <tel:+36%2030%20270%208342>
    A   HU, 1117 Budapest, Budafoki út 209.
    W   www.ultinous.com <http://www.ultinous.com>



--

György Balogh
CTO
E       [email protected] <mailto:[email protected]>
M       +36 30 270 8342 <tel:+36%2030%20270%208342>
A       HU, 1117 Budapest, Budafoki út 209.
W       www.ultinous.com <http://www.ultinous.com>

Reply via email to