Hi Alexey, Yes, I have tried changing the fetch size for my implementation. What I observed through the Flink dashboard was the reading transform gets completed quickly and one of the other transforms takes a much longer time (due to some logic).
Even if Apache Beam processes data in bundles when reading from a data source like a database it would not wait till a single bundle reaches the end of the pipeline. Is that understanding correct? So it will eventually read the entire dataset, loading it into memory. I haven't tried the 2nd option you suggested. Will try it out. Thank you On Mon, Jul 17, 2023 at 10:08 PM Alexey Romanenko <aromanenko....@gmail.com> wrote: > Hi Yomal, > > Actually, usually all data in Beam pipeline is processed by bundles (or > chunks) if it processed by DoFn. The size of the bundle is up to your > processing engine and, iirc, there is no way in Beam to change it. > > Talking about your case - did you try to change a fetch size for Beam’s > JdbcIO connector or for your own one? > Normally, it just gives a hint for the JDBC driver as to the number of > rows that should be fetched from the database [1]. > > Another option could be to try to read data with > JdbcIO.readWithPartitions() that will execute several instances of the > query on the same table > using ranges [2]. > > — > Alexey > > [1] > https://github.com/apache/beam/blob/c8f68f92097de33fe2c6863344404a1b9922ae27/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1524 > [2] > https://beam.apache.org/releases/javadoc/2.49.0/org/apache/beam/sdk/io/jdbc/JdbcIO.html#readWithPartitions-org.apache.beam.sdk.values.TypeDescriptor- > > On 17 Jul 2023, at 13:33, Yomal de Silva <yomal.prav...@gmail.com> wrote: > > Hi all, > > I have a pipeline which reads data from a database(postgresql), enrich the > data through a side input and finally publish the results to Kafka. > Currently I am not using the builtin JDBCIO to read the data but I think > there wont be any difference in using that. With my implementation I have > set the fetchsize and pass the data to the next transform to process. I > have 2 questions here, > > 1. For batch based processing pipelines is there a way to process elements > in chunks rather than reading the entire dataset and loading that to > memory? What I have observed is that it occupies a significant amount of > memory and may even cause OOM exceptions. I am looking for sort of a > backpressure implementation or any other way to stop reading all the data > into memory until some of the records gets processed. I have found the > following answer [1] which states thats not possible, since this answer was > provided some time ago wanted to check if it is still the case. > > 2. When dealing with side inputs, again does it loads everything into > memory and use the appropriate window to carry out the operation inside a > transform? > > Please let me know if you have any solutions for this. > > [1] > https://stackoverflow.com/questions/57580362/how-to-manage-backpressure-with-apache-beam > > Thank you. > > >