Hi, I am trying to use postgres DB as the stream data source and push to kafka topic. Here is how I config database source. Looks like it didn't read out any data. But I didn't see any error from the flink log. I did a test, tried to insert wrong data to database, I saw flink throw below error. Looks like flink try to read data from database. *java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.lang.Integer (java.lang.Long and java.lang.Integer are in module java.base of loader 'bootstrap')*
I saw job manager shows switched from DEPLOYING to RUNNING. and switched from RUNNING to FINISHED immediately. Can anyone help understand why? Did I config anything wrong? Or I missed anything? *2021-10-28 02:49:52.777 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: test-sink (2/2) (7aa24e97a11bbd831941d636910fe84f) switched from DEPLOYING to RUNNING.2021-10-28 02:49:53.245 [flink-akka.actor.default-dispatcher-15] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: JdbcTableSource(id, name, address) -> Map -> Map -> Filter (1/2) (558afdc9dd911684833d0e51943eef92) switched from RUNNING to FINISHED.* val options = JdbcOptions.builder() // .setDBUrl("jdbc:derby:memory:mydb") .setDBUrl("") .setTableName("test_store") .setDriverName("org.postgresql.Driver") .setUsername("dbUser") .setPassword("123") .build() val readOptions = JdbcReadOptions.builder() .setPartitionColumnName("id") .setPartitionLowerBound(-1) .setPartitionUpperBound(DB_SIZE) .setNumPartitions(PARTITIONS) //.setFetchSize(0) .build() val lookupOptions = JdbcLookupOptions.builder() .setCacheMaxSize(-1) .setCacheExpireMs(CACHE_SIZE) .setMaxRetryTimes(2) .build() val dataSource = JdbcTableSource.builder() .setOptions(options) .setReadOptions(readOptions) .setLookupOptions(lookupOptions) .setSchema(storeSchema) .build().getDataStream(env)