in the context of mongodb - there are already configuration pieces: https://github.com/apache/beam/blob/7136380c4a79f8dea9b42a42ee7569b665edf431/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L230 bucketAuto or numSplits.
exact logic how it would split it is written here: https://github.com/apache/beam/blob/7136380c4a79f8dea9b42a42ee7569b665edf431/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L500 On Wed, Apr 9, 2025 at 5:25 PM Jonathan Hope < jonathan.douglas.h...@gmail.com> wrote: > I think I'm still struggling a bit... > > Let's stick with a bounded example for now. I would be reading from a > single mongo cluster/database/collection/partition that has billions of > things in it. I read through the mongoio code a bit and it seems to: > > > 1. Get the min ID > 2. Get the max ID > 3. Split that range by bundle size > 4. Read those ranges > > In the case of billions I imagine that would be a large number of splits. > My (possibly incorrect) understanding is that Beam would try to go parallel > here. Ideally I would have some way to say only read x bundles at once. > > Thanks! > > On Wed, Apr 9, 2025 at 5:32 AM Radek Stankiewicz <radosl...@google.com> > wrote: > >> hey Jonathan, >> >> parallelism for read and write is directly related to the amount of keys >> you are processing in the current stage. >> As an example - Imagine you have KafkaIO with 1 partition - and after >> reading from KafkaIO you have a mapping step to JDBC entity and then you >> have a step writing to the database. >> If you keep all those steps in the same stage, you will have single >> threaded write to the database. >> If you add a Redistribute transform with numKeys set to N, you may have >> up to N parallel writes. More and more IOs are leveraging this transform to >> control the parallelism. >> Rate limiting is per worker, if you have a backlog and unlimited keys, >> runner may decide to add more workers which will eventually increase >> amount of calls. >> >> If you are using java and JDBCIO, you can specify the amount of splits by >> setting numPartitions and specifying the partitioning column. >> >> What I would recommend investigating is connection pooling as this is >> also problematic with operational databases. For JDBC, hikariCP is pretty >> easy to integrate. >> >> On Thu, Apr 3, 2025 at 12:32 AM Jonathan Hope < >> jonathan.douglas.h...@gmail.com> wrote: >> >>> Hello all, >>> >>> The pipeline I'm working on will be run against two different databases >>> that are both online so I will have to control the amount of data being >>> read/written to maintain QoS. To do so I would have to control the number >>> of workers that are being executed in parallel for a given step. >>> >>> I saw this issue on Github: https://github.com/apache/beam/issues/17835. >>> However it has been closed and I don't see any related PRs/comments/etc. >>> Did that work get done, or did it just get cancelled? >>> >>> I also saw this post: >>> https://medium.com/art-of-data-engineering/steady-as-she-flows-rate-limiting-in-apache-beam-pipelines-42cab0b7f31d. >>> That would definitely work, but it would result in workers spinning up and >>> waiting on locks (which costs money). >>> >>> Perhaps this is more of a runner concern though, and I did see a way to >>> limit the maximum number of workers here: >>> https://cloud.google.com/dataflow/docs/reference/pipeline-options. I >>> believe that would also work, but then the max number of workers would be >>> dictated by whatever step has the highest performance cost which would >>> result in the pipelines being slower overall. >>> >>> Thanks! >>> >>