Hi Gyorgy,

there is no concept of 'data locality' in Beam that would be analogous to how MapReduce used to work. The fact that tasks (compute) are co-located with storage on input is not transferred to Beam Flink pipelines. The whole concept is kind of ill defined in terms of Beam model, where tasks can be (at least in theory, depending on a runner) moved between workers in a distributed environment. The reason for this is that throughput (and cost) is dominated mostly by the ability to (uniformly) scale, not the costs associated with network transfers (this is actually most visible in the streaming case, where the data is already 'in motion'). The most common case in Beam is that compute is completely separated from storage (possible even in the extreme cases where streaming state is stored outside the compute of streaming pipeline - but cached locally). The resulting 'stateless' nature of workers generally enables easier and more flexible scaling.

Having said that, although Beam currently does not (AFAIK) try to leverage local reads, it _could_ be possible by a reasonable extension to how splittable DoFn [1] works so that it could make use of data locality. It would be non-trivial, tough and would definitely require support from the runner (Flink in this case).

My general suggestion would be to implement a prototype and measure throughput and part of it possible related to networking before attempting to dig deeper into how to implement this in Beam Flink.

Best,

 Jan

[1] https://beam.apache.org/blog/splittable-do-fn/

On 7/2/24 10:46, Balogh, György wrote:
Hi Jan,
Separating live and historic storage makes sense. I need a historic storage that can ensure data local processing using the beam - flink stack. Can I surely achieve this with HDFS? I can colocate hdfs nodes with flink workers. What exactly enforces that flink nodes will read local and not remote data?
Thank you,
Gyorgy

On Mon, Jul 1, 2024 at 3:42 PM Jan Lukavský <[email protected]> wrote:

    Hi Gyorgy,

    comments inline.

    On 7/1/24 15:10, Balogh, György wrote:
    Hi Jan,
    Let me add a few more details to show the full picture. We have
    live datastreams (video analysis metadata) and we would like to
    run both live and historic pipelines on the metadata (eg.: live
    alerts, historic video searches).
    This should be fine due to Beam's unified model. You can write a
    PTransform that handles PCollection<...> without the need to worry
    if the PCollection was created from Kafka or some bounded source.
    We planned to use kafka to store the streaming data and
    directly run both types of queries on top. You are suggesting to
    consider having kafka with small retention to server the live
    queries and store the historic data somewhere else which scales
    better for historic queries? We need to have on prem options
    here. What options should we consider that scales nicely (in
    terms of IO parallelization) with beam? (eg. hdfs?)

    Yes, I would not say necessarily "small" retention, but probably
    "limited" retention. Running on premise you can choose from HDFS
    or maybe S3 compatible minio or some other distributed storage,
    depends on the scale and deployment options (e.g. YARN or k8s).

    I also happen to work on a system which targets exactly these
    streaming-batch workloads (persisting upserts from stream to batch
    for reprocessing), see [1]. Please feel free to contact me
    directly if this sounds interesting.

    Best,

     Jan

    [1] https://github.com/O2-Czech-Republic/proxima-platform

    Thank you,
    Gyorgy

    On Mon, Jul 1, 2024 at 9:21 AM Jan Lukavský <[email protected]> wrote:

        H Gyorgy,

        I don't think it is possible to co-locate tasks as you
        describe it. Beam has no information about location of
        'splits'. On the other hand, if batch throughput is the main
        concern, then reading from Kafka might not be the optimal
        choice. Although Kafka provides tiered storage for offloading
        historical data, it still somewhat limits scalability (and
        thus throughput), because the data have to be read by a
        broker and only then passed to a consumer. The parallelism is
        therefore limited by the number of Kafka partitions and not
        parallelism of the Flink job. A more scalable approach could
        be to persist data from Kafka to a batch storage (e.g. S3 or
        GCS) and reprocess it from there.

        Best,

         Jan

        On 6/29/24 09:12, Balogh, György wrote:
        Hi,
        I'm planning a distributed system with multiple kafka
        brokers co located with flink workers.
        Data processing throughput for historic queries is a main
        KPI. So I want to make sure all flink workers read local
        data and not remote. I'm defining my pipelines in beam using
        java.
        Is it possible? What are the critical config elements to
        achieve this?
        Thank you,
        Gyorgy

--
        György Balogh
        CTO
        E       [email protected]
        <mailto:[email protected]>
        M       +36 30 270 8342 <tel:+36%2030%20270%208342>
        A       HU, 1117 Budapest, Budafoki út 209.
        W       www.ultinous.com <http://www.ultinous.com>



--
    György Balogh
    CTO
    E   [email protected] <mailto:[email protected]>
    M   +36 30 270 8342 <tel:+36%2030%20270%208342>
    A   HU, 1117 Budapest, Budafoki út 209.
    W   www.ultinous.com <http://www.ultinous.com>



--

György Balogh
CTO
E       [email protected] <mailto:[email protected]>
M       +36 30 270 8342 <tel:+36%2030%20270%208342>
A       HU, 1117 Budapest, Budafoki út 209.
W       www.ultinous.com <http://www.ultinous.com>

Reply via email to