Hi,

I am trying to use postgres DB as the stream data source and push to kafka
topic. Here is how I config database source. Looks like it didn't read out
any data. But I didn't see any error from the flink log.
I did a test, tried to insert wrong data to database, I saw flink throw
below error. Looks like flink try to read data from database.
*java.lang.ClassCastException: class java.lang.Long cannot be cast to class
java.lang.Integer (java.lang.Long and java.lang.Integer are in module
java.base of loader 'bootstrap')*

I saw  job manager shows switched from DEPLOYING to RUNNING. and switched
from RUNNING to FINISHED immediately.
Can anyone help understand why?
Did I config anything wrong? Or I missed anything?



*2021-10-28 02:49:52.777 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Sink: test-sink
(2/2) (7aa24e97a11bbd831941d636910fe84f) switched from DEPLOYING to
RUNNING.2021-10-28 02:49:53.245 [flink-akka.actor.default-dispatcher-15]
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
JdbcTableSource(id, name, address) -> Map -> Map -> Filter (1/2)
(558afdc9dd911684833d0e51943eef92) switched from RUNNING to FINISHED.*

val options = JdbcOptions.builder()
    // .setDBUrl("jdbc:derby:memory:mydb")
    .setDBUrl("")
    .setTableName("test_store")
    .setDriverName("org.postgresql.Driver")
    .setUsername("dbUser")
    .setPassword("123")
    .build()
val readOptions = JdbcReadOptions.builder()
    .setPartitionColumnName("id")
    .setPartitionLowerBound(-1)
    .setPartitionUpperBound(DB_SIZE)
    .setNumPartitions(PARTITIONS)
    //.setFetchSize(0)
    .build()
val lookupOptions = JdbcLookupOptions.builder()
    .setCacheMaxSize(-1)
    .setCacheExpireMs(CACHE_SIZE)
    .setMaxRetryTimes(2)
    .build()
val dataSource = JdbcTableSource.builder()
    .setOptions(options)
    .setReadOptions(readOptions)
    .setLookupOptions(lookupOptions)
    .setSchema(storeSchema)
    .build().getDataStream(env)

Reply via email to