Hi all Anyone have any ideas here, that can come up with a working solution/sql for 1.20.2
G On Tue, Sep 16, 2025 at 9:04 AM George <george...@gmail.com> wrote: > Hi Dylon > > Not to worry about the CDC bit, was just background information. Basically > I'm posting the record/document into a column called data, Flinks CDC via > PostgreSQL then consumes that and exposes the PostgreSQL table inside > Flink. > For everyone's background information, the PostgreSQL table structure... > Just background > > CREATE TABLE adults ( > id SERIAL NOT NULL, > nationalid varchar(14) NOT NULL, > data JSONB, > created_at timestamptz DEFAULT NOW() NOT NULL, > PRIMARY KEY (nationalid) > ) TABLESPACE pg_default; > > For everyone's background information, the Apache Flink table that is > populated with this information, records. > > CREATE OR REPLACE TABLE postgres_catalog.inbound.adults ( > id BIGINT -- This is a postgresql Serial generated field > ,nationalid VARCHAR(14) -- NOT NULL > ,data STRING -- JSONB Payload > ,created_at TIMESTAMP_LTZ(3) > ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND > ,PRIMARY KEY (nationalid) NOT ENFORCED > ) WITH ( > 'connector' = 'postgres-cdc' > ,'hostname' = 'postgrescdc' > ,'port' = '5432' -- NOTE: this is the port of the db on the container, > not the external docker exported port via a port mapping. > ,'username' = 'dbadmin' > ,'password' = 'dbpassword' > ,'database-name' = 'demog' > ,'schema-name' = 'public' > ,'table-name' = 'adults' > ,'slot.name' = 'adults0' > ,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: > incremental snapshot (default off) > ,'scan.startup.mode' = 'initial' -- > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position > ,'decoding.plugin.name' = 'pgoutput' > ); > > > Flink SQL> SELECT > > JSON_VALUE(acc, '$.accountId') AS account_id, > > JSON_VALUE(acc, '$.accountType') AS account_type, > > JSON_VALUE(acc, '$.memberName') AS member_name > > FROM postgres_catalog.inbound.adults AS t > > CROSS JOIN UNNEST( > > JSON_QUERY(t.data, '$.account' RETURNING ARRAY<STRING>) > > ) AS acc; > > > *[ERROR] Could not execute SQL statement. > Reason:java.lang.ClassNotFoundException: > org.apache.flink.streaming.api.functions.source.SourceFunction* > > G > > On Tue, Sep 16, 2025 at 8:59 AM dylanhz <dyla...@163.com> wrote: > >> Hi George, >> >> Sorry I'm not familiar with the CDC connector details, but as long as >> your data is of STRING type, you can extract fields from elements of a JSON >> array with a query like this: >> >> SELECT >> JSON_VALUE(acc, '$.accountId') AS account_id, >> JSON_VALUE(acc, '$.accountType') AS account_type, >> JSON_VALUE(acc, '$.memberName') AS member_name >> FROM postgres_catalog.inbound.adults AS t >> CROSS JOIN UNNEST( >> JSON_QUERY(t.data, '$.account' RETURNING ARRAY<STRING>) >> ) AS acc; >> >> >> -- >> Best regards, >> dylanhz >> >> At 2025-09-16 13:34:53, "George" <george...@gmail.com> wrote: >> >> Hi there >> >> Thanks for the reply, appreciated. I'm really seeing double with this >> one, tried everything, yes it's simply a case of lack of experience on this >> side, Would really appreciate assistance to get this working... >> >> if we look at: *Since Flink 1.20.2, if you need to work with a JSON >> array for further processing, you can use:* >> >> *JSON_QUERY(jsonValue, path RETURNING ARRAY<STRING>)* >> >> implies me the following: >> >> select JSON_QUERY(data, '$.account') AS accounts FROM >> postgres_catalog.inbound.adults; >> >> I would expect this to give me an array of the account documents/objects, >> but at this point still as strings. to then unpack/unwind each, instead. >> >> Flink SQL> select JSON_QUERY(data, '$.account') AS accounts FROM >> postgres_catalog.inbound.adults; >> [ERROR] Could not execute SQL statement. Reason: >> java.lang.ClassNotFoundException: >> org.apache.flink.streaming.api.functions.source.SourceFunction >> >> >> Some background. I have that adults.json coming into a field called data >> as jsonb in a postgresql table, which is CDC consumed/pushed into flink. >> I'm trying to unpack this into a flink table, thats proper/complex >> structured. Once I have it there I can unwind the accounts and address >> objects into other tables, as I have accounts and addresses coming from >> other tables also, so will consolidate them all into one table to be >> sinkged to paimon and pushed to kafka for consumption as structured objects. >> >> >> >> On Tue, Sep 16, 2025 at 4:59 AM dylanhz <dyla...@163.com> wrote: >> >>> Hi george, >>> >>> >>> JSON_QUERY returns a STRING by default (the JSON fragment as text). It >>> is not supported to directly CAST this string into complex types such >>> as ARRAY<ROW<...>>, so attempting to do so will result in a type >>> conversion error. >>> >>> >>> Since Flink 1.20.2, if you need to work with a JSON array for further >>> processing, you can use: >>> >>> JSON_QUERY(jsonValue, path RETURNING ARRAY<STRING>) >>> >>> This returns the JSON array as an array of string elements, each >>> representing one array item. You can then UNNEST this array and apply >>> JSON_VALUE on each element to extract the required fields. >>> >>> For more details, please refer to the Flink documentation: >>> >>> https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/systemfunctions/ >>> >>> >>> -- >>> Best regards, >>> dylanhz >>> >>> ----- Original Message ----- >>> *From*: George <george...@gmail.com> >>> *To*: user@flink.apache.org >>> *Sent*: Mon, 15 Sep 2025 19:35:57 +0200 >>> *Subject*: Cast error and battling to get around it. >>> >>> >>> Stuck with this, every where I go, different ideas... >>> >>> data is a full string payload, as per attached. >>> >>> the data.account field have an array of accounts >>> >>> Flink 2.2 seem to be getting some new functionality from_json. but well, >>> it's not in 1.20 and 2.0 is not allowing me to CDC consume from a postgres >>> table, even when using the newest 3.4 library. >>> >>> >>> I've had this below error can't cast now on several attempts.... >>> >>> ==> Cast function cannot convert value of type VARCHAR(2147483647) to >>> type VARCHAR(2147483647) ARRAY >>> >>> Been stuck with this one for a while, hope someone can help. >>> >>> Example payload attached. >>> >>> Flink SQL> select >>> > JSON_VALUE(data, '$.nationalid') AS nationalid >>> > ,JSON_VALUE(data, '$._id') AS _id -- >>> UUID generated by app, inside 'data' / json payload >>> > ,JSON_VALUE(data, '$.name') AS name >>> > ,JSON_VALUE(data, '$.surname') AS surname >>> > ,JSON_VALUE(data, '$.gender') AS gender >>> > ,JSON_VALUE(data, '$.dob') AS dob >>> > ,JSON_VALUE(data, '$.marital_status') AS marital_status >>> > ,JSON_VALUE(data, '$.status') AS status >>> > ,JSON_QUERY(data, '$.address') AS address >>> > ,CAST( >>> > JSON_QUERY(data, '$.account') AS >>> > ARRAY<ROW< >>> > fspiagentaccountid STRING >>> > ,accountid STRING >>> > ,fspiid STRING >>> > ,fspiagentid STRING >>> > ,accounttype STRING >>> > ,membername STRING >>> > ,cardholder STRING >>> > ,cardnumber STRING >>> > ,expdate STRING >>> > ,cardnetwork STRING >>> > ,issuingbank STRING >>> > >> >>> > ) AS account >>> > ,created_at AS created_at >>> > FROM postgres_catalog.inbound.adults; >>> [ERROR] Could not execute SQL statement. Reason: >>> org.apache.calcite.sql.validate.SqlValidatorException: Cast function >>> cannot convert value of type VARCHAR(2147483647) to type >>> RecordType(VARCHAR(2147483647) fspiagentaccountid, VARCHAR(2147483647) >>> accountid, VARCHAR(2147483647) fspiid, VARCHAR(2147483647) fspiagentid, >>> VARCHAR(2147483647) accounttype, VARCHAR(2147483647) membername, >>> VARCHAR(2147483647) cardholder, VARCHAR(2147483647) cardnumber, >>> VARCHAR(2147483647) expdate, VARCHAR(2147483647) cardnetwork, >>> VARCHAR(2147483647) issuingbank) ARRAY >>> >>> Flink SQL> WITH unnested_accounts AS ( >>> > SELECT >>> > JSON_VALUE(account_item, '$.fspiAgentAccountId') AS >>> fspiagentaccountid >>> > ,JSON_VALUE(account_item, '$.accountId') AS accountid >>> > ,JSON_VALUE(account_item, '$.fspiId') AS fspiid >>> > ,JSON_VALUE(account_item, '$.fspiAgentId') AS fspiagentid >>> > ,JSON_VALUE(account_item, '$.accountType') AS accounttype >>> > ,JSON_VALUE(account_item, '$.memberName') AS membername >>> > ,JSON_VALUE(account_item, '$.cardHolder') AS cardholder >>> > ,JSON_VALUE(account_item, '$.cardNumber') AS cardnumber >>> > ,JSON_VALUE(account_item, '$.expDate') AS expdate >>> > ,JSON_VALUE(account_item, '$.cardNetwork') AS cardnetwork >>> > ,JSON_VALUE(account_item, '$.issuingBank') AS issuingbank >>> > FROM postgres_catalog.inbound.adults >>> > CROSS JOIN UNNEST( >>> > CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>) >>> > ) AS t(account_item) >>> > ) >>> > SELECT >>> > ARRAY_AGG( >>> > ROW( >>> > fspiagentaccountid >>> > ,accountid >>> > ,fspiid >>> > ,fspiagentid >>> > ,accounttype >>> > ,membername >>> > ,cardhol >>> >>> der >>> > ,cardnumber >>> > ,expdate >>> > ,cardnetwork >>> > ,issuingbank >>> > ) >>> > ) AS accounts >>> > FROM unnested_accounts; >>> [ERROR] Could not execute SQL statement. Reason: >>> org.apache.calcite.sql.validate.SqlValidatorException: Cast function >>> cannot convert value of type VARCHAR(2147483647) to type >>> VARCHAR(2147483647) ARRAY >>> >>> >>> >>> >>> -- >>> You have the obligation to inform one honestly of the risk, and as a >>> person >>> you are committed to educate yourself to the total risk in any activity! >>> >>> Once informed & totally aware of the risk, >>> every fool has the right to kill or injure themselves as they see fit! >>> >>> >> >> -- >> You have the obligation to inform one honestly of the risk, and as a >> person >> you are committed to educate yourself to the total risk in any activity! >> >> Once informed & totally aware of the risk, >> every fool has the right to kill or injure themselves as they see fit! >> >> > > -- > You have the obligation to inform one honestly of the risk, and as a person > you are committed to educate yourself to the total risk in any activity! > > Once informed & totally aware of the risk, > every fool has the right to kill or injure themselves as they see fit! > -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!