Hi Caizhi,

Thank you for your reply. The heap size is 512m. Fetching from the DB table
is the only costly operation. After fetching from DB, I simply ingested a
kafka topic. That should not be the bottleneck.
Here is the jdbc configuration. Is that correct config?

val query = String.format("SELECT * FROM %s", tableName)

val options = JdbcOptions.builder()
    .setDBUrl(url)
    .setTableName(tableName)
    .setDriverName(DRIVER_NAME)
    .setUsername(userName)
    .setPassword(password)
    .build()
val readOptions = JdbcReadOptions.builder()
    .setQuery(query)
    .setPartitionColumnName(PARTITION_KEY)
    .setPartitionLowerBound(dbLowerBound)
    .setPartitionUpperBound(dbUpperBound)
    .setNumPartitions(50)
    .setFetchSize(20)
    .build()
val lookupOptions = JdbcLookupOptions.builder()
    .setCacheMaxSize(-1)
    .setCacheExpireMs(1000)
    .setMaxRetryTimes(2)
    .build()
val rawSource = JdbcTableSource.builder()
    .setOptions(options)
    .setReadOptions(readOptions)
    .setLookupOptions(lookupOptions)
    .setSchema(schema)
    .build().getDataStream(env)


On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> This is not the desired behavior. As you have set fetchSize to 20 there
> will be only 20 records in each parallelism of the source. How large is
> your heap size? Does your job have any other operations which consume a lot
> of heap memory?
>
> Qihua Yang <yang...@gmail.com> 于2022年1月19日周三 15:27写道:
>
>> Here is the errors
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "server-timer"
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "I/O dispatcher 16"
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "HTTP-Dispatcher"
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "I/O dispatcher 11"
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "I/O dispatcher 9"
>>
>> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang <yang...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a flink cluster(50 hosts, each host runs a task manager).
>>> I am using Flink JDBC to consume data from a database. The db table is
>>> pretty large, around 187340000 rows. I configured the JDBC number of
>>> partitions to 50. fetchSize is 20. Flink application has 50 task managers.
>>> Anyone know why I got OutOfMemoryError? How should I config it?
>>>
>>> Thanks,
>>> Qihua
>>>
>>>

Reply via email to