Hi all... I have a data stream similar to the below going into a postgres table.
Flink 1.20.1 PostgreSql 12 flink-sql-connector-postgres-cdc-3.1.1.jar { "_id": "96d35eb2-dc7f-40df-8128-30c58b250692", "dob": "19/03/28", "name": "Shaun", "gender": "Male", "address": { "town": "Galway City", "county": "Galway", "country": "Ireland", "province": "Connacht", "street_1": "99 Fresh Street Street", "street_2": "", "parcel_id": "H91 Y9P7-22470", "postal_code": "H91 Y9P7", "country_code": "IE", "neighbourhood": "Salthill" }, "surname": "Doudigan", "uniqueId": "0002003P", "family_id": "4e6f3e02-91ac-42f9-a518-408e780a7c7b", "father_idNumber": "7934317B", "mother_idNumber": "0181947G" The above is pushed into the below table. The entire message into the data column uniqueID is extracted and pushed into the uniqueId column _id is extracted and pushed into the _id column and created_at is auto populated with the current time. If I inspect the data via pgAdmin all is good, as it is supposed to be, all columns got values. CREATE TABLE adults ( id SERIAL NOT NULL, uniqueId varchar(14) NOT NULL, data JSONB, created_at timestamptz DEFAULT NOW() NOT NULL, PRIMARY KEY (uniqueId) ) TABLESPACE pg_default; this is my Flink table create into which the data is suppose to be pushed via the CDC process. CREATE TABLE postgres_catalog.datasource.children ( `id` BIGINT, `uniqueId` VARCHAR(14), `data` STRING, `created_at` TIMESTAMP_LTZ(3), WATERMARK FOR `created_at` AS `created_at` - INTERVAL '5' SECOND PRIMARY KEY (uniqueId) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc' ,'hostname' = 'postgrescdc' ,'port' = '5432' -- NOTE: this is the port of the db on the container, not the external docker exported port via a port mapping. ,'username' = 'dbadmin' ,'password' = 'dbpassword' ,'database-name' = 'demog' ,'schema-name' = 'public' ,'table-name' = 'children' ,'slot.name' = 'children' ,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off) ,'scan.startup.mode' = 'initial' -- https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position ,'decoding.plugin.name' = 'pgoutput' ); when I query this I got data in id, data, and created_at. *uniqueId is NULL though.* Once I got it into the above children table I want to unpack the data string/column into a table that includes a nested structure. G For reference...once the above is made to work I need to replicate it for the below message payload. { "_id": "47868ae5-7266-4c08-baad-5cc9f09a29bf", "dob": "95/09/03", "name": "Cahir", "gender": "M", "status": "Living", "account": [ { "fspiId": "AIBKGB2", "accountId": "88126271", "memberName": "Allied Irish Banks plc", "accountType": "Savings/Deposit", "fspiAgentId": "AIBKGB2", "fspiAgentAccountId": "AIBKGB2-88126271" }, { "expDate": "09/25", "cardHolder": "C Skehan", "cardNumber": "4969843748181157", "cardNetwork": "Visa", "issuingBank": "Bank of Ireland plc" }, { "expDate": "10/25", "cardHolder": "C Skehan", "cardNumber": "4936893387414336", "cardNetwork": "Visa", "issuingBank": "Citibank Europe plc" } ], "address": { "town": "South Dublin", "county": "Dublin", "country": "Ireland", "province": "Leinster", "street_1": "77 Mageean Street Street", "street_2": "", "parcel_id": "D24-22536", "postal_code": "D24", "country_code": "IE", "neighbourhood": "Tallaght" }, "partner": "2337266M", "surname": "Skehan", "uniqueId": "7087973R", "family_id": "3dd62d47-cf0b-4e77-9d2d-7f58de28a404", "marital_status": "Married" } -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!