Hi Joost, I'm looping in Leonard and Jark who might be able to help out here.
Best regards, Martijn On Mon, 2 May 2022 at 16:01, Joost Molenaar <j.j.molen...@gmail.com> wrote: > Hello all, > > I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by > Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason, > Flink-SQL shows a new row when I update the boolean field, but updates the > row in place when I update the text field, and I'm not understanding why > this happens. My ultimate goal is to use Flink-SQL to do a join on records > that come from both sides of a 1:N relation in the foreign database, to > expose a more ready to consume JSON object to downstream consumers. > > The source table is defined like this in MS-SQL: > > CREATE TABLE todo_list ( > id int IDENTITY NOT NULL, > done bit NOT NULL DEFAULT 0, > name varchar(MAX) NOT NULL, > CONSTRAINT PK_todo_list PRIMARY KEY (id) > ); > > This is the configuration I'm sending to Debezium, note that I'm not > including the > JSON-schema in both keys and values: > > { > "name": "todo-connector", > "config": { > "connector.class": > "io.debezium.connector.sqlserver.SqlServerConnector", > "tasks.max": "1", > "database.server.name": "mssql", > "database.hostname": "10.88.10.1", > "database.port": "1433", > "database.user": "sa", > "database.password": "...", > "database.dbname": "todo", > "database.history.kafka.bootstrap.servers": "10.88.10.10:9092 > ", > "database.history.kafka.topic": "schema-changes.todo", > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable": false, > "value.converter": > "org.apache.kafka.connect.json.JsonConverter", > "value.converter.schemas.enable": false > } > } > > So Debezium is publishing events to Kafka with keys like this: > > {"id":3} > > And values like this (whitespace added for readability), this is updating > the > value of the 'name' field: > > { > "before": { > "id": 3, > "done": false, > "name": "test" > }, > "after": { > "id": 3, > "done": false, > "name": "test2" > }, > "source": { > "version": "1.9.0.Final", > "connector": "sqlserver", > "name": "mssql", > "ts_ms": 1651497653043, > "snapshot": "false", > "db": "todo", > "sequence": null, > "schema": "dbo", > "table": "todo_list", > "change_lsn": "00000025:00000d58:0002", > "commit_lsn": "00000025:00000d58:0003", > "event_serial_no": 2 > }, > "op": "u", > "ts_ms": 1651497654127, > "transaction": null > } > > (I verified this using a Python script that follows the relevant Kafka > topic.) > > Next, I'm trying to follow this CDC stream in Flink by adding the > Kafka connector > for Flink SQL, defining a source table and starting a job in the Flink-SQL > CLI: > > ADD JAR '/opt/flink/opt/flink-sql-connector-kafka_2.11-1.14.4.jar'; > > CREATE TABLE todo_list ( > k_id BIGINT, > done BOOLEAN, > name STRING > ) > WITH ( > 'connector'='kafka', > 'topic'='mssql.dbo.todo_list', > 'properties.bootstrap.servers'='10.88.10.10:9092', > 'properties.group.id'='flinksql-todo-list', > 'scan.startup.mode'='earliest-offset', > 'key.format'='json', > 'key.fields-prefix'='k_', > 'key.fields'='k_id', > 'value.format'='debezium-json', > 'value.debezium-json.schema-include'='false', > 'value.fields-include'='EXCEPT_KEY' > ); > > SELECT * FROM todo_list; > > Now, when I perform a query like this in the MS-SQL database: > > UPDATE todo_list SET name='test2' WHERE id=3; > > Now I see that the Flink-SQL client updates the row with id=3 to have the > new > value "test2" for the 'name' field, as I was expecting. However, when I > duplicate the 'done' field to have a different value, Flink-SQL seems to > leave > the old row with values (3, False, 'test2') intact, and shows a new row > with > values (3, True, 'test2'). > > I tried to append a `PRIMARY KEY (k_id) NOT ENFORCED` line between the > first > parentheses in the CREATE TABLE statement, but this seems to make no > difference, except when running `DESCRIBE todo_list` in Flink-SQL. > > I have no idea why the boolean field would cause different behavior than > the > text field. Am I missing some piece of configuration, are my expectations > wrong? > > > Regards, > Joost Molenaar >