Hi all

Anyone have any ideas here, that can come up with a working solution/sql
for 1.20.2

G

On Tue, Sep 16, 2025 at 9:04 AM George <george...@gmail.com> wrote:

> Hi Dylon
>
> Not to worry about the CDC bit, was just background information. Basically
> I'm posting the record/document into a column called data, Flinks CDC via
> PostgreSQL then consumes that and exposes the PostgreSQL table inside
> Flink.
> For everyone's background information, the PostgreSQL table structure...
> Just background
>
> CREATE TABLE adults (
> id SERIAL NOT NULL,
> nationalid varchar(14) NOT NULL,
> data JSONB,
> created_at timestamptz DEFAULT NOW() NOT NULL,
> PRIMARY KEY (nationalid)
> ) TABLESPACE pg_default;
>
> For everyone's background information, the Apache Flink table that is
> populated with this information, records.
>
> CREATE OR REPLACE TABLE postgres_catalog.inbound.adults (
> id BIGINT -- This is a postgresql Serial generated field
> ,nationalid VARCHAR(14) -- NOT NULL
> ,data STRING -- JSONB Payload
> ,created_at TIMESTAMP_LTZ(3)
> ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
> ,PRIMARY KEY (nationalid) NOT ENFORCED
> ) WITH (
> 'connector' = 'postgres-cdc'
> ,'hostname' = 'postgrescdc'
> ,'port' = '5432' -- NOTE: this is the port of the db on the container,
> not the external docker exported port via a port mapping.
> ,'username' = 'dbadmin'
> ,'password' = 'dbpassword'
> ,'database-name' = 'demog'
> ,'schema-name' = 'public'
> ,'table-name' = 'adults'
> ,'slot.name' = 'adults0'
> ,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature:
> incremental snapshot (default off)
> ,'scan.startup.mode' = 'initial' --
> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position
> ,'decoding.plugin.name' = 'pgoutput'
> );
>
>
> Flink SQL> SELECT
> >     JSON_VALUE(acc, '$.accountId') AS account_id,
> >     JSON_VALUE(acc, '$.accountType') AS account_type,
> >     JSON_VALUE(acc, '$.memberName') AS member_name
> > FROM postgres_catalog.inbound.adults AS t
> > CROSS JOIN UNNEST(
> >     JSON_QUERY(t.data, '$.account' RETURNING ARRAY<STRING>)
> > ) AS acc;
>
>
> *[ERROR] Could not execute SQL statement.
> Reason:java.lang.ClassNotFoundException:
> org.apache.flink.streaming.api.functions.source.SourceFunction*
>
> G
>
> On Tue, Sep 16, 2025 at 8:59 AM dylanhz <dyla...@163.com> wrote:
>
>> Hi George,
>>
>> Sorry I'm not familiar with the CDC connector details, but as long as
>> your data is of STRING type, you can extract fields from elements of a JSON
>> array with a query like this:
>>
>> SELECT
>>     JSON_VALUE(acc, '$.accountId') AS account_id,
>>     JSON_VALUE(acc, '$.accountType') AS account_type,
>>     JSON_VALUE(acc, '$.memberName') AS member_name
>> FROM postgres_catalog.inbound.adults AS t
>> CROSS JOIN UNNEST(
>>     JSON_QUERY(t.data, '$.account' RETURNING ARRAY<STRING>)
>> ) AS acc;
>>
>>
>> --
>> Best regards,
>> dylanhz
>>
>> At 2025-09-16 13:34:53, "George" <george...@gmail.com> wrote:
>>
>> Hi there
>>
>> Thanks for the reply, appreciated. I'm really seeing double with this
>> one, tried everything, yes it's simply a case of lack of experience on this
>> side, Would really appreciate assistance to get this working...
>>
>> if we look at:  *Since Flink 1.20.2, if you need to work with a JSON
>> array for further processing, you can use:*
>>
>> *JSON_QUERY(jsonValue, path RETURNING ARRAY<STRING>)*
>>
>> implies me the following:
>>
>> select JSON_QUERY(data, '$.account') AS accounts FROM
>> postgres_catalog.inbound.adults;
>>
>> I would expect this to give me an array of the account documents/objects,
>> but at this point still as strings. to then unpack/unwind each, instead.
>>
>> Flink SQL> select JSON_QUERY(data, '$.account')       AS accounts FROM
>> postgres_catalog.inbound.adults;
>> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.ClassNotFoundException:
>> org.apache.flink.streaming.api.functions.source.SourceFunction
>>
>>
>> Some background. I have that adults.json coming into a field called data
>> as jsonb in a postgresql table, which is CDC consumed/pushed into flink.
>> I'm trying to unpack this into a flink table, thats proper/complex
>> structured. Once I have it there I can unwind the accounts and address
>> objects into other tables, as I have accounts and addresses coming from
>> other tables also, so will consolidate them all into one table to be
>> sinkged to paimon and pushed to kafka for consumption as structured objects.
>>
>>
>>
>> On Tue, Sep 16, 2025 at 4:59 AM dylanhz <dyla...@163.com> wrote:
>>
>>> Hi george,
>>>
>>>
>>> JSON_QUERY returns a STRING by default (the JSON fragment as text). It
>>> is not supported to directly CAST this string into complex types such
>>> as ARRAY<ROW<...>>, so attempting to do so will result in a type
>>> conversion error.
>>>
>>>
>>> Since Flink 1.20.2, if you need to work with a JSON array for further
>>> processing, you can use:
>>>
>>> JSON_QUERY(jsonValue, path RETURNING ARRAY<STRING>)
>>>
>>> This returns the JSON array as an array of string elements, each
>>> representing one array item. You can then UNNEST this array and apply
>>> JSON_VALUE on each element to extract the required fields.
>>>
>>> For more details, please refer to the Flink documentation:
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/systemfunctions/
>>>
>>>
>>> --
>>> Best regards,
>>> dylanhz
>>>
>>> ----- Original Message -----
>>> *From*: George <george...@gmail.com>
>>> *To*: user@flink.apache.org
>>> *Sent*: Mon, 15 Sep 2025 19:35:57 +0200
>>> *Subject*: Cast error and battling to get around it.
>>>
>>>
>>> Stuck with this, every where I go, different ideas...
>>>
>>> data is a full string payload, as per attached.
>>>
>>> the data.account field have an array of accounts
>>>
>>> Flink 2.2 seem to be getting some new functionality from_json. but well,
>>> it's not in 1.20 and 2.0 is not allowing me to CDC consume from a postgres
>>> table, even when using the newest 3.4 library.
>>>
>>>
>>> I've had this below error can't cast now on several attempts....
>>>
>>> ==>   Cast function cannot convert value of type VARCHAR(2147483647) to
>>> type VARCHAR(2147483647) ARRAY
>>>
>>> Been stuck with this one for a while, hope someone can help.
>>>
>>> Example payload attached.
>>>
>>> Flink SQL> select
>>> >      JSON_VALUE(data, '$.nationalid')       AS nationalid
>>> >     ,JSON_VALUE(data, '$._id')              AS _id              --
>>> UUID generated by app, inside 'data' / json payload
>>> >     ,JSON_VALUE(data, '$.name')             AS name
>>> >     ,JSON_VALUE(data, '$.surname')          AS surname
>>> >     ,JSON_VALUE(data, '$.gender')           AS gender
>>> >     ,JSON_VALUE(data, '$.dob')              AS dob
>>> >     ,JSON_VALUE(data, '$.marital_status')   AS marital_status
>>> >     ,JSON_VALUE(data, '$.status')           AS status
>>> >     ,JSON_QUERY(data, '$.address')          AS address
>>> >     ,CAST(
>>> >         JSON_QUERY(data, '$.account')       AS
>>> >         ARRAY<ROW<
>>> >              fspiagentaccountid STRING
>>> >             ,accountid   STRING
>>> >             ,fspiid      STRING
>>> >             ,fspiagentid STRING
>>> >             ,accounttype STRING
>>> >             ,membername  STRING
>>> >             ,cardholder  STRING
>>> >             ,cardnumber  STRING
>>> >             ,expdate     STRING
>>> >             ,cardnetwork STRING
>>> >             ,issuingbank STRING
>>> >         >>
>>> >     ) AS account
>>> >     ,created_at                             AS created_at
>>> > FROM postgres_catalog.inbound.adults;
>>> [ERROR] Could not execute SQL statement. Reason:
>>> org.apache.calcite.sql.validate.SqlValidatorException: Cast function
>>> cannot convert value of type VARCHAR(2147483647) to type
>>> RecordType(VARCHAR(2147483647) fspiagentaccountid, VARCHAR(2147483647)
>>> accountid, VARCHAR(2147483647) fspiid, VARCHAR(2147483647) fspiagentid,
>>> VARCHAR(2147483647) accounttype, VARCHAR(2147483647) membername,
>>> VARCHAR(2147483647) cardholder, VARCHAR(2147483647) cardnumber,
>>> VARCHAR(2147483647) expdate, VARCHAR(2147483647) cardnetwork,
>>> VARCHAR(2147483647) issuingbank) ARRAY
>>>
>>> Flink SQL> WITH unnested_accounts AS (
>>> >     SELECT
>>> >          JSON_VALUE(account_item, '$.fspiAgentAccountId') AS
>>> fspiagentaccountid
>>> >         ,JSON_VALUE(account_item, '$.accountId')         AS accountid
>>> >         ,JSON_VALUE(account_item, '$.fspiId')            AS fspiid
>>> >         ,JSON_VALUE(account_item, '$.fspiAgentId')       AS fspiagentid
>>> >         ,JSON_VALUE(account_item, '$.accountType')       AS accounttype
>>> >         ,JSON_VALUE(account_item, '$.memberName')        AS membername
>>> >         ,JSON_VALUE(account_item, '$.cardHolder')        AS cardholder
>>> >         ,JSON_VALUE(account_item, '$.cardNumber')        AS cardnumber
>>> >         ,JSON_VALUE(account_item, '$.expDate')           AS expdate
>>> >         ,JSON_VALUE(account_item, '$.cardNetwork')       AS cardnetwork
>>> >         ,JSON_VALUE(account_item, '$.issuingBank')       AS issuingbank
>>> >     FROM postgres_catalog.inbound.adults
>>> >     CROSS JOIN UNNEST(
>>> >         CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>)
>>> >     ) AS t(account_item)
>>> > )
>>> > SELECT
>>> >     ARRAY_AGG(
>>> >         ROW(
>>> >              fspiagentaccountid
>>> >             ,accountid
>>> >             ,fspiid
>>> >             ,fspiagentid
>>> >             ,accounttype
>>> >             ,membername
>>> >             ,cardhol
>>>
>>> der
>>> >             ,cardnumber
>>> >             ,expdate
>>> >             ,cardnetwork
>>> >             ,issuingbank
>>> >         )
>>> >     ) AS accounts
>>> > FROM unnested_accounts;
>>> [ERROR] Could not execute SQL statement. Reason:
>>> org.apache.calcite.sql.validate.SqlValidatorException: Cast function
>>> cannot convert value of type VARCHAR(2147483647) to type
>>> VARCHAR(2147483647) ARRAY
>>>
>>>
>>>
>>>
>>> --
>>> You have the obligation to inform one honestly of the risk, and as a
>>> person
>>> you are committed to educate yourself to the total risk in any activity!
>>>
>>> Once informed & totally aware of the risk,
>>> every fool has the right to kill or injure themselves as they see fit!
>>>
>>>
>>
>> --
>> You have the obligation to inform one honestly of the risk, and as a
>> person
>> you are committed to educate yourself to the total risk in any activity!
>>
>> Once informed & totally aware of the risk,
>> every fool has the right to kill or injure themselves as they see fit!
>>
>>
>
> --
> You have the obligation to inform one honestly of the risk, and as a person
> you are committed to educate yourself to the total risk in any activity!
>
> Once informed & totally aware of the risk,
> every fool has the right to kill or injure themselves as they see fit!
>


-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Reply via email to