anyone...

G

On Mon, Sep 1, 2025 at 9:00 PM George <george...@gmail.com> wrote:

> Hi all
>
> Hope someone can assist me.
>
> I have the following Postgres table
>
> CREATE TABLE adults (
> id SERIAL NOT NULL,
> nationalid varchar(14) NOT NULL,
> data JSONB,
> created_at timestamptz DEFAULT NOW() NOT NULL,
> PRIMARY KEY (nationalid)
> ) TABLESPACE pg_default;
>
> That contains the following payload.
>
> *See attached.*
>
> -- Take note of the array of accounts, this is what's causing my problems
> at the moment.
>
> I then use Flink CDC to consume this source table into the following Flink
> Table.
>
> CREATE OR REPLACE TABLE postgres_catalog.inbound.adults (
> id BIGINT
> ,nationalid VARCHAR(14) --NOT NULL
> ,data STRING
> ,created_at TIMESTAMP_LTZ(3)
> ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
> ,PRIMARY KEY (nationalid) NOT ENFORCED
> ) WITH (
> 'connector' = 'postgres-cdc'
> ,'hostname' = 'postgrescdc'
> ,'port' = '5432' -- NOTE: this is the port of the db on the container,
> not the external docker exported port via a port mapping.
> ,'username' = 'dbadmin'
> ,'password' = 'dbpassword'
> ,'database-name' = 'demog'
> ,'schema-name' = 'public'
> ,'table-name' = 'adults'
> ,'slot.name' = 'adults0'
> ,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature:
> incremental snapshot (default off)
> ,'scan.startup.mode' = 'initial' --
> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position
> ,'decoding.plugin.name' = 'pgoutput'
> );
>
> I'd like to unpack this into a defined Flink Table, structured, something
> like below ?
>
> CREATE OR REPLACE TABLE c_paimon.outbound.adults (
> _id STRING
> ,name STRING
> ,surname STRING
> ,gender STRING
> ,nationalid STRING
> ,dob STRING
> ,marital_status STRING
> ,status STRING
> ,account ARRAY<ROW<
> fspiAgentAccountId STRING
> ,accountId STRING
> ,fspiId STRING
> ,memberName STRING
> ,accountType STRING
> ,fspiAgentId STRING
> ,expDate STRING
> ,cardHolder STRING
> ,cardNumber STRING
> ,cardNetwork STRING
> ,issuingBank STRING
> >>
> ,address ROW<
> street_1 STRING
> ,street_2 STRING
> ,neighbourhood STRING
> ,town STRING
> ,county STRING
> ,province STRING
> ,country STRING
> ,country_code STRING
> ,postal_code STRING
> ,parcel_id STRING
> >
> ,created_at TIMESTAMP_LTZ(3)
> ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
> ,PRIMARY KEY (nationalid) NOT ENFORCED
> ) WITH (
> 'file.format' = 'parquet'
> ,'bucket' = '2'
> ,'compaction.min.file-num' = '2'
> ,'compaction.early-max.file-num' = '50'
> ,'snapshot.time-retained' = '1h'
> ,'snapshot.num-retained.min' = '5'
> ,'snapshot.num-retained.max' = '20'
> ,'table.exec.sink.upsert-materialize'= 'NONE'
>
> )then use ;
>
> Anyone able to assist with the required insert statement.
>
> Intended destination table will be Apache Paimon.
>
> Oneplan was to split the inbound table into adults, address & accounts
> tables.
>
>    - adults I got working,
>    - address I need an upsert as I have another inbound stream that
>    pushes potentially the same address into the address table.
>    - I then fail on the accounts.
>
>
> Accounts can take 2 forms,
> Bank accounts or credit cards, it's pretty much based on which columns are
> completed in the accounts array item.
>
> George
>
> --
> You have the obligation to inform one honestly of the risk, and as a person
> you are committed to educate yourself to the total risk in any activity!
>
> Once informed & totally aware of the risk,
> every fool has the right to kill or injure themselves as they see fit!
>


-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Reply via email to