Hi all...

I have a data stream similar to the below going into a postgres table.

Flink 1.20.1
PostgreSql 12
flink-sql-connector-postgres-cdc-3.1.1.jar

{
"_id": "96d35eb2-dc7f-40df-8128-30c58b250692",
"dob": "19/03/28",
"name": "Shaun",
"gender": "Male",
"address": {
"town": "Galway City",
"county": "Galway",
"country": "Ireland",
"province": "Connacht",
"street_1": "99 Fresh Street Street",
"street_2": "",
"parcel_id": "H91 Y9P7-22470",
"postal_code": "H91 Y9P7",
"country_code": "IE",
"neighbourhood": "Salthill"
},
"surname": "Doudigan",
"uniqueId": "0002003P",
"family_id": "4e6f3e02-91ac-42f9-a518-408e780a7c7b",
"father_idNumber": "7934317B",
"mother_idNumber": "0181947G"

The above is pushed into the below table.
The entire message into the data column
uniqueID is extracted and pushed into the uniqueId column
_id is extracted and pushed into the _id column and
created_at is auto populated with the current time.

If I inspect the data via pgAdmin all is good, as it is supposed to be, all
columns got values.

CREATE TABLE adults (
id SERIAL NOT NULL,
uniqueId varchar(14) NOT NULL,
data JSONB,
created_at timestamptz DEFAULT NOW() NOT NULL,
PRIMARY KEY (uniqueId)
) TABLESPACE pg_default;

this is my Flink table create into which the data is suppose to be pushed
via the CDC process.

CREATE TABLE postgres_catalog.datasource.children (
`id` BIGINT,
`uniqueId` VARCHAR(14),
`data` STRING,
`created_at` TIMESTAMP_LTZ(3),
WATERMARK FOR `created_at` AS `created_at` - INTERVAL '5' SECOND
PRIMARY KEY (uniqueId) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432' -- NOTE: this is the port of the db on the container, not
the external docker exported port via a port mapping.
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'children'
,'slot.name' = 'children'
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature:
incremental snapshot (default off)
,'scan.startup.mode' = 'initial' --
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position
,'decoding.plugin.name' = 'pgoutput'
);

when I query this I got data in id, data, and created_at.

*uniqueId is NULL though.*

Once I got it into the above children table I want to unpack the data
string/column into a table that includes a nested structure.

G

For reference...once the above is made to work I need to replicate it for
the below message payload.

{
"_id": "47868ae5-7266-4c08-baad-5cc9f09a29bf",
"dob": "95/09/03",
"name": "Cahir",
"gender": "M",
"status": "Living",
"account": [
{
"fspiId": "AIBKGB2",
"accountId": "88126271",
"memberName": "Allied Irish Banks plc",
"accountType": "Savings/Deposit",
"fspiAgentId": "AIBKGB2",
"fspiAgentAccountId": "AIBKGB2-88126271"
},
{
"expDate": "09/25",
"cardHolder": "C Skehan",
"cardNumber": "4969843748181157",
"cardNetwork": "Visa",
"issuingBank": "Bank of Ireland plc"
},
{
"expDate": "10/25",
"cardHolder": "C Skehan",
"cardNumber": "4936893387414336",
"cardNetwork": "Visa",
"issuingBank": "Citibank Europe plc"
}
],
"address": {
"town": "South Dublin",
"county": "Dublin",
"country": "Ireland",
"province": "Leinster",
"street_1": "77 Mageean Street Street",
"street_2": "",
"parcel_id": "D24-22536",
"postal_code": "D24",
"country_code": "IE",
"neighbourhood": "Tallaght"
},
"partner": "2337266M",
"surname": "Skehan",
"uniqueId": "7087973R",
"family_id": "3dd62d47-cf0b-4e77-9d2d-7f58de28a404",
"marital_status": "Married"
}


-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Reply via email to