anyone... G
On Mon, Sep 1, 2025 at 9:00 PM George <george...@gmail.com> wrote: > Hi all > > Hope someone can assist me. > > I have the following Postgres table > > CREATE TABLE adults ( > id SERIAL NOT NULL, > nationalid varchar(14) NOT NULL, > data JSONB, > created_at timestamptz DEFAULT NOW() NOT NULL, > PRIMARY KEY (nationalid) > ) TABLESPACE pg_default; > > That contains the following payload. > > *See attached.* > > -- Take note of the array of accounts, this is what's causing my problems > at the moment. > > I then use Flink CDC to consume this source table into the following Flink > Table. > > CREATE OR REPLACE TABLE postgres_catalog.inbound.adults ( > id BIGINT > ,nationalid VARCHAR(14) --NOT NULL > ,data STRING > ,created_at TIMESTAMP_LTZ(3) > ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND > ,PRIMARY KEY (nationalid) NOT ENFORCED > ) WITH ( > 'connector' = 'postgres-cdc' > ,'hostname' = 'postgrescdc' > ,'port' = '5432' -- NOTE: this is the port of the db on the container, > not the external docker exported port via a port mapping. > ,'username' = 'dbadmin' > ,'password' = 'dbpassword' > ,'database-name' = 'demog' > ,'schema-name' = 'public' > ,'table-name' = 'adults' > ,'slot.name' = 'adults0' > ,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: > incremental snapshot (default off) > ,'scan.startup.mode' = 'initial' -- > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position > ,'decoding.plugin.name' = 'pgoutput' > ); > > I'd like to unpack this into a defined Flink Table, structured, something > like below ? > > CREATE OR REPLACE TABLE c_paimon.outbound.adults ( > _id STRING > ,name STRING > ,surname STRING > ,gender STRING > ,nationalid STRING > ,dob STRING > ,marital_status STRING > ,status STRING > ,account ARRAY<ROW< > fspiAgentAccountId STRING > ,accountId STRING > ,fspiId STRING > ,memberName STRING > ,accountType STRING > ,fspiAgentId STRING > ,expDate STRING > ,cardHolder STRING > ,cardNumber STRING > ,cardNetwork STRING > ,issuingBank STRING > >> > ,address ROW< > street_1 STRING > ,street_2 STRING > ,neighbourhood STRING > ,town STRING > ,county STRING > ,province STRING > ,country STRING > ,country_code STRING > ,postal_code STRING > ,parcel_id STRING > > > ,created_at TIMESTAMP_LTZ(3) > ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND > ,PRIMARY KEY (nationalid) NOT ENFORCED > ) WITH ( > 'file.format' = 'parquet' > ,'bucket' = '2' > ,'compaction.min.file-num' = '2' > ,'compaction.early-max.file-num' = '50' > ,'snapshot.time-retained' = '1h' > ,'snapshot.num-retained.min' = '5' > ,'snapshot.num-retained.max' = '20' > ,'table.exec.sink.upsert-materialize'= 'NONE' > > )then use ; > > Anyone able to assist with the required insert statement. > > Intended destination table will be Apache Paimon. > > Oneplan was to split the inbound table into adults, address & accounts > tables. > > - adults I got working, > - address I need an upsert as I have another inbound stream that > pushes potentially the same address into the address table. > - I then fail on the accounts. > > > Accounts can take 2 forms, > Bank accounts or credit cards, it's pretty much based on which columns are > completed in the accounts array item. > > George > > -- > You have the obligation to inform one honestly of the risk, and as a person > you are committed to educate yourself to the total risk in any activity! > > Once informed & totally aware of the risk, > every fool has the right to kill or injure themselves as they see fit! > -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!