Hi Caizhi, Thank you for your reply. The heap size is 512m. Fetching from the DB table is the only costly operation. After fetching from DB, I simply ingested a kafka topic. That should not be the bottleneck. Here is the jdbc configuration. Is that correct config?
val query = String.format("SELECT * FROM %s", tableName) val options = JdbcOptions.builder() .setDBUrl(url) .setTableName(tableName) .setDriverName(DRIVER_NAME) .setUsername(userName) .setPassword(password) .build() val readOptions = JdbcReadOptions.builder() .setQuery(query) .setPartitionColumnName(PARTITION_KEY) .setPartitionLowerBound(dbLowerBound) .setPartitionUpperBound(dbUpperBound) .setNumPartitions(50) .setFetchSize(20) .build() val lookupOptions = JdbcLookupOptions.builder() .setCacheMaxSize(-1) .setCacheExpireMs(1000) .setMaxRetryTimes(2) .build() val rawSource = JdbcTableSource.builder() .setOptions(options) .setReadOptions(readOptions) .setLookupOptions(lookupOptions) .setSchema(schema) .build().getDataStream(env) On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > This is not the desired behavior. As you have set fetchSize to 20 there > will be only 20 records in each parallelism of the source. How large is > your heap size? Does your job have any other operations which consume a lot > of heap memory? > > Qihua Yang <yang...@gmail.com> 于2022年1月19日周三 15:27写道: > >> Here is the errors >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "server-timer" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "I/O dispatcher 16" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "HTTP-Dispatcher" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "I/O dispatcher 11" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "I/O dispatcher 9" >> >> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang <yang...@gmail.com> wrote: >> >>> Hi, >>> >>> I have a flink cluster(50 hosts, each host runs a task manager). >>> I am using Flink JDBC to consume data from a database. The db table is >>> pretty large, around 187340000 rows. I configured the JDBC number of >>> partitions to 50. fetchSize is 20. Flink application has 50 task managers. >>> Anyone know why I got OutOfMemoryError? How should I config it? >>> >>> Thanks, >>> Qihua >>> >>>