in the context of mongodb - there are already configuration pieces:
https://github.com/apache/beam/blob/7136380c4a79f8dea9b42a42ee7569b665edf431/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L230
bucketAuto or numSplits.

exact logic how it would split it is written here:
https://github.com/apache/beam/blob/7136380c4a79f8dea9b42a42ee7569b665edf431/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L500




On Wed, Apr 9, 2025 at 5:25 PM Jonathan Hope <
jonathan.douglas.h...@gmail.com> wrote:

> I think I'm still struggling a bit...
>
> Let's stick with a bounded example for now. I would be reading from a
> single mongo cluster/database/collection/partition that has billions of
> things in it. I read through the mongoio code a bit and it seems to:
>
>
>    1. Get the min ID
>    2. Get the max ID
>    3. Split that range by bundle size
>    4. Read those ranges
>
> In the case of billions I imagine that would be a large number of splits.
> My (possibly incorrect) understanding is that Beam would try to go parallel
> here. Ideally I would have some way to say only read x bundles at once.
>
> Thanks!
>
> On Wed, Apr 9, 2025 at 5:32 AM Radek Stankiewicz <radosl...@google.com>
> wrote:
>
>> hey Jonathan,
>>
>> parallelism for read and write is directly related to the amount of keys
>> you are processing in the current stage.
>> As an example - Imagine you have KafkaIO with 1 partition - and after
>> reading from KafkaIO you have a mapping step to JDBC entity and then you
>> have a step writing to the database.
>> If you keep all those steps in the same stage, you will have single
>> threaded write to the database.
>> If you add a Redistribute transform with numKeys set to N, you may have
>> up to N parallel writes. More and more IOs are leveraging this transform to
>> control the parallelism.
>> Rate limiting is per worker, if you have a backlog and unlimited keys,
>> runner may decide to add more workers which will eventually increase
>> amount of calls.
>>
>> If you are using java and JDBCIO, you can specify the amount of splits by
>> setting numPartitions and specifying the partitioning column.
>>
>> What I would recommend investigating is connection pooling as this is
>> also problematic with operational databases. For JDBC, hikariCP is pretty
>> easy to integrate.
>>
>> On Thu, Apr 3, 2025 at 12:32 AM Jonathan Hope <
>> jonathan.douglas.h...@gmail.com> wrote:
>>
>>> Hello all,
>>>
>>> The pipeline I'm working on will be run against two different databases
>>> that are both online so I will have to control the amount of data being
>>> read/written to maintain QoS. To do so I would have to control the number
>>> of workers that are being executed in parallel for a given step.
>>>
>>> I saw this issue on Github: https://github.com/apache/beam/issues/17835.
>>> However it has been closed and I don't see any related PRs/comments/etc.
>>> Did that work get done, or did it just get cancelled?
>>>
>>> I also saw this post:
>>> https://medium.com/art-of-data-engineering/steady-as-she-flows-rate-limiting-in-apache-beam-pipelines-42cab0b7f31d.
>>> That would definitely work, but it would result in workers spinning up and
>>> waiting on locks (which costs money).
>>>
>>> Perhaps this is more of a runner concern though, and I did see a way to
>>> limit the maximum number of workers here:
>>> https://cloud.google.com/dataflow/docs/reference/pipeline-options. I
>>> believe that would also work, but then the max number of workers would be
>>> dictated by whatever step has the highest performance cost which would
>>> result in the pipelines being slower overall.
>>>
>>> Thanks!
>>>
>>

Reply via email to