Hi Alexey,

Yes, I have tried changing the fetch size for my implementation. What I
observed through the Flink dashboard was the reading transform gets
completed quickly and one of the other transforms takes a much longer time
(due to some logic).

Even if Apache Beam processes data in bundles when reading from a data
source like a database it would not wait till a single bundle reaches the
end of the pipeline. Is that understanding correct? So it will eventually
read the entire dataset, loading it into memory.

I haven't tried the 2nd option you suggested. Will try it out.

Thank you

On Mon, Jul 17, 2023 at 10:08 PM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> Hi Yomal,
>
> Actually, usually all data in Beam pipeline is processed by bundles (or
> chunks) if it processed by DoFn. The size of the bundle is up to your
> processing engine and, iirc, there is no way in Beam to change it.
>
> Talking about your case -  did you try to change a fetch size for Beam’s
> JdbcIO connector or for your own one?
> Normally, it just gives a hint for the JDBC driver as to the number of
> rows that should be fetched from the database [1].
>
> Another option could be to try to read data with
> JdbcIO.readWithPartitions() that will execute several instances of the
> query on the same table
> using ranges [2].
>
> —
> Alexey
>
> [1]
> https://github.com/apache/beam/blob/c8f68f92097de33fe2c6863344404a1b9922ae27/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1524
> [2]
> https://beam.apache.org/releases/javadoc/2.49.0/org/apache/beam/sdk/io/jdbc/JdbcIO.html#readWithPartitions-org.apache.beam.sdk.values.TypeDescriptor-
>
> On 17 Jul 2023, at 13:33, Yomal de Silva <yomal.prav...@gmail.com> wrote:
>
> Hi all,
>
> I have a pipeline which reads data from a database(postgresql), enrich the
> data through a side input and finally publish the results to Kafka.
> Currently I am not using the builtin JDBCIO to read the data but I think
> there wont be any difference in using that. With my implementation I have
> set the fetchsize and pass the data to the next transform to process. I
> have 2 questions here,
>
> 1. For batch based processing pipelines is there a way to process elements
> in chunks rather than reading the entire dataset and loading that to
> memory? What I have observed is that it occupies a significant amount of
> memory and may even cause OOM exceptions. I am looking for sort of a
> backpressure implementation or any other way to stop reading all the data
> into memory until some of the records gets processed. I have found the
> following answer [1] which states thats not possible, since this answer was
> provided some time ago wanted to check if it is still the case.
>
> 2. When dealing with side inputs, again does it loads everything into
> memory and use the appropriate window to carry out the operation inside a
> transform?
>
> Please let me know if you have any solutions for this.
>
> [1]
> https://stackoverflow.com/questions/57580362/how-to-manage-backpressure-with-apache-beam
>
> Thank you.
>
>
>

Reply via email to