Hi all,

0
<https://stackoverflow.com/posts/74569632/timeline>

I'm using io.debezium.connector.postgresql.PostgresConnector and
io.confluent.connect.jdbc.JdbcSinkConnector to sync data between two
PostgreSQL databases. And I set time.precision.mode=adaptive in Debezium
config
<https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types>.
which would serialize PostgreSQL time data type to Integer or Long and it's
incompatible with JdbcSinkConnector. So I wrote an SMT to transform these
data from numeric types to strings.

Say I have the following table:

CREATE TABLE pk_created_at (
    created_at timestamp without time zone DEFAULT current_timestamp not null,
    PRIMARY KEY (created_at)
);
insert into pk_created_at values(current_timestamp);

source connector configration:

{
    "name": "test-connector",
    "config": {
        "snapshot.mode": "always",
        "plugin.name": "pgoutput",
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "source",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "test",
        "database.server.name": "test",
        "slot.name" : "test",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enabled": true,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enabled": true,
        "decimal.handling.mode": "string",
        "time.precision.mode": "adaptive",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }
}

And the messages in kafka topic test.public.pk_created_at would be:

# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic
test.public.pk_created_at --from-beginning
{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int64",
            "optional":false,
            "name":"io.debezium.time.MicroTimestamp",
            "version":1,
            "field":"created_at"
         }
      ],
      "optional":false,
      "name":"test.public.pk_created_at.Value"
   },
   "payload":{
      "created_at":1669354751764130
   }
}

But after applying my SMT, the messages would be like:

# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic
test.public.pk_created_at --from-beginning
{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"string",
            "optional":true,
            "field":"created_at"
         }
      ],
      "optional":false,
      "name":"test.public.pk_created_at.Value"
   },
   "payload":{
      "created_at":"2022-11-25T05:39:11.764130Z"
   }
}

It worked great if created_at is not a primary key. No error occurred. But
I have a table that the primary keys are composed of id and created_at like
this: PRIMARY KEY (id, created_at). Then it will raise an exception in
JdbcSinkConnector as below:

2022-11-25 06:57:01,450 INFO   ||  Attempting to open connection #1 to
PostgreSql   [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2022-11-25 06:57:01,459 INFO   ||  Maximum table name length for
database is 63 bytes
[io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect]
2022-11-25 06:57:01,459 INFO   ||  JdbcDbWriter Connected
[io.confluent.connect.jdbc.sink.JdbcDbWriter]
2022-11-25 06:57:01,472 INFO   ||  Checking PostgreSql dialect for
existence of TABLE "pk_created_at"
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
2022-11-25 06:57:01,484 INFO   ||  Using PostgreSql dialect TABLE
"pk_created_at" present
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
2022-11-25 06:57:01,505 INFO   ||  Checking PostgreSql dialect for
type of TABLE "pk_created_at"
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
2022-11-25 06:57:01,508 INFO   ||  Setting metadata for table
"pk_created_at" to Table{name='"pk_created_at"', type=TABLE
columns=[Column{'created_at', isPrimaryKey=true, allowsNull=false,
sqlType=timestamp}]}
[io.confluent.connect.jdbc.util.TableDefinitions]
2022-11-25 06:57:01,510 WARN   ||  Write of 2 records failed,
remainingRetries=0   [io.confluent.connect.jdbc.sink.JdbcSinkTask]
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO
"pk_created_at" ("created_at") VALUES (1669359291990398) ON CONFLICT
("created_at") DO NOTHING was aborted: ERROR: column "created_at" is
of type timestamp without time zone but expression is of type bigint
  Hint: You will need to rewrite or cast the expression.
  Position: 52  Call getNextException to see other errors in the batch.
    at 
org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
    at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:871)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:910)
    at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1638)
    at 
io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196)
    at 
io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.postgresql.util.PSQLException: ERROR: column
"created_at" is of type timestamp without time zone but expression is
of type bigint
  Hint: You will need to rewrite or cast the expression.
  Position: 52
    at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
    at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
    at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355)
    at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315)
    at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:868)
    ... 17 more

The error seems like the sink connector was still trying to insert
created_at with a numeric 1669359291990398. but I verified that the
messages in the kafka topic have been transformed into strings. It worked
if created_at is not a primary key.

I just don't know why SMT does not work for primary key columns. How can I
fix it? Could someone help? much appreciated.

my SMT:
https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java

my sink configuration:

{
    "name": "test-sinker",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics.regex": "test.public.pk_created_at",
        "table.name.format": "${topic}",
        "connection.url":
"jdbc:postgresql://target:5432/test?stringtype=unspecified&user=postgres&password=postgres",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enabled": true,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enabled": true,
        "transforms": "dropPrefix",
        "transforms.dropPrefix.type":
"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.dropPrefix.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.dropPrefix.replacement": "$3",
        "auto.create": "false",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "delete.enabled": true
    }
}

Reply via email to