Hi, Qihua JDBC connector support Postgres dailect, but it is implemented as a bounded source which means it only captures the snapshot data(the existed records) and then finished its work, the new adding transaction log records (as known as MySql bin-log) won’t be captured. You should receive all the snapshot data If your program works fine.
BTW, if you want capture both snapshot + transaction log events, you can try `postgres-cdc` connector[1], it offers SQL API and DataStream API, you can refer the documentation[2] for quick start. Best, Leonard [1] https://github.com/ververica/flink-cdc-connectors [2] https://ververica.github.io/flink-cdc-connectors/release-2.0/content/connectors/postgres-cdc.html > 在 2021年10月28日,13:24,Qihua Yang <yang...@gmail.com> 写道: > > Hi, > > I am trying to use postgres DB as the stream data source and push to kafka > topic. Here is how I config database source. Looks like it didn't read out > any data. But I didn't see any error from the flink log. > I did a test, tried to insert wrong data to database, I saw flink throw below > error. Looks like flink try to read data from database. > java.lang.ClassCastException: class java.lang.Long cannot be cast to class > java.lang.Integer (java.lang.Long and java.lang.Integer are in module > java.base of loader 'bootstrap') > > I saw job manager shows switched from DEPLOYING to RUNNING. and switched > from RUNNING to FINISHED immediately. > Can anyone help understand why? > Did I config anything wrong? Or I missed anything? > > 2021-10-28 02:49:52.777 [flink-akka.actor.default-dispatcher-3] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: test-sink > (2/2) (7aa24e97a11bbd831941d636910fe84f) switched from DEPLOYING to RUNNING. > > 2021-10-28 02:49:53.245 [flink-akka.actor.default-dispatcher-15] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > JdbcTableSource(id, name, address) -> Map -> Map -> Filter (1/2) > (558afdc9dd911684833d0e51943eef92) switched from RUNNING to FINISHED. > > val options = JdbcOptions.builder() > // .setDBUrl("jdbc:derby:memory:mydb") > .setDBUrl("") > .setTableName("test_store") > .setDriverName("org.postgresql.Driver") > .setUsername("dbUser") > .setPassword("123") > .build() > val readOptions = JdbcReadOptions.builder() > .setPartitionColumnName("id") > .setPartitionLowerBound(-1) > .setPartitionUpperBound(DB_SIZE) > .setNumPartitions(PARTITIONS) > //.setFetchSize(0) > .build() > val lookupOptions = JdbcLookupOptions.builder() > .setCacheMaxSize(-1) > .setCacheExpireMs(CACHE_SIZE) > .setMaxRetryTimes(2) > .build() > val dataSource = JdbcTableSource.builder() > .setOptions(options) > .setReadOptions(readOptions) > .setLookupOptions(lookupOptions) > .setSchema(storeSchema) > .build().getDataStream(env)