Hi Joost,

I'm looping in Leonard and Jark who might be able to help out here.

Best regards,

Martijn

On Mon, 2 May 2022 at 16:01, Joost Molenaar <j.j.molen...@gmail.com> wrote:

> Hello all,
>
> I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by
> Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason,
> Flink-SQL shows a new row when I update the boolean field, but updates the
> row in place when I update the text field, and I'm not understanding why
> this happens. My ultimate goal is to use Flink-SQL to do a join on records
> that come from both sides of a 1:N relation in the foreign database, to
> expose a more ready to consume JSON object to downstream consumers.
>
> The source table is defined like this in MS-SQL:
>
>     CREATE TABLE todo_list (
>         id int IDENTITY NOT NULL,
>         done bit NOT NULL DEFAULT 0,
>         name varchar(MAX) NOT NULL,
>         CONSTRAINT PK_todo_list PRIMARY KEY (id)
>     );
>
> This is the configuration I'm sending to Debezium, note that I'm not
> including the
> JSON-schema in both keys and values:
>
>     {
>         "name": "todo-connector",
>         "config": {
>             "connector.class":
> "io.debezium.connector.sqlserver.SqlServerConnector",
>             "tasks.max": "1",
>             "database.server.name": "mssql",
>             "database.hostname": "10.88.10.1",
>             "database.port": "1433",
>             "database.user": "sa",
>             "database.password": "...",
>             "database.dbname": "todo",
>             "database.history.kafka.bootstrap.servers": "10.88.10.10:9092
> ",
>             "database.history.kafka.topic": "schema-changes.todo",
>             "key.converter": "org.apache.kafka.connect.json.JsonConverter",
>             "key.converter.schemas.enable": false,
>             "value.converter":
> "org.apache.kafka.connect.json.JsonConverter",
>             "value.converter.schemas.enable": false
>         }
>     }
>
> So Debezium is publishing events to Kafka with keys like this:
>
>     {"id":3}
>
> And values like this (whitespace added for readability), this is updating
> the
> value of the 'name' field:
>
>     {
>       "before": {
>         "id": 3,
>         "done": false,
>         "name": "test"
>       },
>       "after": {
>         "id": 3,
>         "done": false,
>         "name": "test2"
>       },
>       "source": {
>         "version": "1.9.0.Final",
>         "connector": "sqlserver",
>         "name": "mssql",
>         "ts_ms": 1651497653043,
>         "snapshot": "false",
>         "db": "todo",
>         "sequence": null,
>         "schema": "dbo",
>         "table": "todo_list",
>         "change_lsn": "00000025:00000d58:0002",
>         "commit_lsn": "00000025:00000d58:0003",
>         "event_serial_no": 2
>       },
>       "op": "u",
>       "ts_ms": 1651497654127,
>       "transaction": null
>     }
>
> (I verified this using a Python script that follows the relevant Kafka
> topic.)
>
> Next, I'm trying to follow this CDC stream in Flink by adding the
> Kafka connector
> for Flink SQL, defining a source table and starting a job in the Flink-SQL
> CLI:
>
>     ADD JAR '/opt/flink/opt/flink-sql-connector-kafka_2.11-1.14.4.jar';
>
>     CREATE TABLE todo_list (
>         k_id BIGINT,
>         done BOOLEAN,
>         name STRING
>     )
>     WITH (
>         'connector'='kafka',
>         'topic'='mssql.dbo.todo_list',
>         'properties.bootstrap.servers'='10.88.10.10:9092',
>         'properties.group.id'='flinksql-todo-list',
>         'scan.startup.mode'='earliest-offset',
>         'key.format'='json',
>         'key.fields-prefix'='k_',
>         'key.fields'='k_id',
>         'value.format'='debezium-json',
>         'value.debezium-json.schema-include'='false',
>         'value.fields-include'='EXCEPT_KEY'
>     );
>
>     SELECT * FROM todo_list;
>
> Now, when I perform a query like this in the MS-SQL database:
>
>     UPDATE todo_list SET name='test2' WHERE id=3;
>
> Now I see that the Flink-SQL client updates the row with id=3 to have the
> new
> value "test2" for the 'name' field, as I was expecting. However, when I
> duplicate the 'done' field to have a different value, Flink-SQL seems to
> leave
> the old row with values (3, False, 'test2') intact, and shows a new row
> with
> values (3, True, 'test2').
>
> I tried to append a `PRIMARY KEY (k_id) NOT ENFORCED` line between the
> first
> parentheses in the CREATE TABLE statement, but this seems to make no
> difference, except when running `DESCRIBE todo_list` in Flink-SQL.
>
> I have no idea why the boolean field would cause different behavior than
> the
> text field. Am I missing some piece of configuration, are my expectations
> wrong?
>
>
> Regards,
> Joost Molenaar
>

Reply via email to