Hi Gyorgy,
there is no concept of 'data locality' in Beam that would be analogous
to how MapReduce used to work. The fact that tasks (compute) are
co-located with storage on input is not transferred to Beam Flink
pipelines. The whole concept is kind of ill defined in terms of Beam
model, where tasks can be (at least in theory, depending on a runner)
moved between workers in a distributed environment. The reason for this
is that throughput (and cost) is dominated mostly by the ability to
(uniformly) scale, not the costs associated with network transfers (this
is actually most visible in the streaming case, where the data is
already 'in motion'). The most common case in Beam is that compute is
completely separated from storage (possible even in the extreme cases
where streaming state is stored outside the compute of streaming
pipeline - but cached locally). The resulting 'stateless' nature of
workers generally enables easier and more flexible scaling.
Having said that, although Beam currently does not (AFAIK) try to
leverage local reads, it _could_ be possible by a reasonable extension
to how splittable DoFn [1] works so that it could make use of data
locality. It would be non-trivial, tough and would definitely require
support from the runner (Flink in this case).
My general suggestion would be to implement a prototype and measure
throughput and part of it possible related to networking before
attempting to dig deeper into how to implement this in Beam Flink.
Best,
Jan
[1] https://beam.apache.org/blog/splittable-do-fn/
On 7/2/24 10:46, Balogh, György wrote:
Hi Jan,
Separating live and historic storage makes sense. I need a historic
storage that can ensure data local processing using the beam - flink
stack.
Can I surely achieve this with HDFS? I can colocate hdfs nodes with
flink workers. What exactly enforces that flink nodes will read local
and not remote data?
Thank you,
Gyorgy
On Mon, Jul 1, 2024 at 3:42 PM Jan Lukavský <[email protected]> wrote:
Hi Gyorgy,
comments inline.
On 7/1/24 15:10, Balogh, György wrote:
Hi Jan,
Let me add a few more details to show the full picture. We have
live datastreams (video analysis metadata) and we would like to
run both live and historic pipelines on the metadata (eg.: live
alerts, historic video searches).
This should be fine due to Beam's unified model. You can write a
PTransform that handles PCollection<...> without the need to worry
if the PCollection was created from Kafka or some bounded source.
We planned to use kafka to store the streaming data and
directly run both types of queries on top. You are suggesting to
consider having kafka with small retention to server the live
queries and store the historic data somewhere else which scales
better for historic queries? We need to have on prem options
here. What options should we consider that scales nicely (in
terms of IO parallelization) with beam? (eg. hdfs?)
Yes, I would not say necessarily "small" retention, but probably
"limited" retention. Running on premise you can choose from HDFS
or maybe S3 compatible minio or some other distributed storage,
depends on the scale and deployment options (e.g. YARN or k8s).
I also happen to work on a system which targets exactly these
streaming-batch workloads (persisting upserts from stream to batch
for reprocessing), see [1]. Please feel free to contact me
directly if this sounds interesting.
Best,
Jan
[1] https://github.com/O2-Czech-Republic/proxima-platform
Thank you,
Gyorgy
On Mon, Jul 1, 2024 at 9:21 AM Jan Lukavský <[email protected]> wrote:
H Gyorgy,
I don't think it is possible to co-locate tasks as you
describe it. Beam has no information about location of
'splits'. On the other hand, if batch throughput is the main
concern, then reading from Kafka might not be the optimal
choice. Although Kafka provides tiered storage for offloading
historical data, it still somewhat limits scalability (and
thus throughput), because the data have to be read by a
broker and only then passed to a consumer. The parallelism is
therefore limited by the number of Kafka partitions and not
parallelism of the Flink job. A more scalable approach could
be to persist data from Kafka to a batch storage (e.g. S3 or
GCS) and reprocess it from there.
Best,
Jan
On 6/29/24 09:12, Balogh, György wrote:
Hi,
I'm planning a distributed system with multiple kafka
brokers co located with flink workers.
Data processing throughput for historic queries is a main
KPI. So I want to make sure all flink workers read local
data and not remote. I'm defining my pipelines in beam using
java.
Is it possible? What are the critical config elements to
achieve this?
Thank you,
Gyorgy
--
György Balogh
CTO
E [email protected]
<mailto:[email protected]>
M +36 30 270 8342 <tel:+36%2030%20270%208342>
A HU, 1117 Budapest, Budafoki út 209.
W www.ultinous.com <http://www.ultinous.com>
--
György Balogh
CTO
E [email protected] <mailto:[email protected]>
M +36 30 270 8342 <tel:+36%2030%20270%208342>
A HU, 1117 Budapest, Budafoki út 209.
W www.ultinous.com <http://www.ultinous.com>
--
György Balogh
CTO
E [email protected] <mailto:[email protected]>
M +36 30 270 8342 <tel:+36%2030%20270%208342>
A HU, 1117 Budapest, Budafoki út 209.
W www.ultinous.com <http://www.ultinous.com>