[ 
https://issues.apache.org/jira/browse/FLINK-28375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hehuiyuan updated FLINK-28375:
------------------------------
    Description: 
 
{code:java}
CREATE TABLE kafkaTableSource (
 keyField INTEGER,
 timestampField INTEGER,
 arrayField ARRAY<String>,
 ws as TO_TIMESTAMP(FROM_UNIXTIME(timestampField)),
 WATERMARK FOR ws AS ws
) WITH (
    'connector' = 'kafka',
    'topic' = 'hehuiyuan1',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.client.id' = 'test-consumer-group',
    'properties.group.id' = 'test-consumer-group',
    'format' = 'json'
);

CREATE TABLE kafkaTableSink(
    keyField INTEGER,
    timestampField INTEGER,
    arrayLastValue ARRAY<String>
)
WITH (
    'connector' = 'print'
);

insert into kafkaTableSink
select keyField,timestampField, last_value(arrayField) over (partition by 
keyField order by ws) from kafkaTableSource;
{code}
{color:#FF0000}Exception in thread "main" 
org.apache.flink.table.api.TableException: LAST_VALUE aggregate function does 
not support type: ''ARRAY''.{color}
{color:#FF0000}Please re-check the data type.{color}

 

I have a  modification to support this, but why does the community not support 
it? 

Is there any special reason that i do not considered?

 

The test the array data type can run:

mock data:

!image-2022-07-04-16-21-28-198.png!

result:

!image-2022-07-04-16-20-08-661.png|width=626,height=146!

  was:
 
{code:java}

CREATE TABLE kafkaTableSource (
 keyField INTEGER,
 timestampField INTEGER,
 arrayField ARRAY<String>,
 ws as TO_TIMESTAMP(FROM_UNIXTIME(timestampField)),
 WATERMARK FOR ws AS ws
) WITH (
    'connector' = 'kafka',
    'topic' = 'hehuiyuan1',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.client.id' = 'test-consumer-group',
    'properties.group.id' = 'test-consumer-group',
    'format' = 'json'
);

CREATE TABLE kafkaTableSink(
    keyField INTEGER,
    timestampField INTEGER,
    arrayLastValue ARRAY<String>
)
WITH (
    'connector' = 'print'
);

insert into kafkaTableSink
select keyField,timestampField, last_value(arrayField) over (partition by 
keyField order by ws) from kafkaTableSource;
{code}
Exception in thread "main" org.apache.flink.table.api.TableException: 
LAST_VALUE aggregate function does not support type: ''ARRAY''.
Please re-check the data type.

 

I have a  modification to support this, but why does the community not support 
it? 

Is there any special reason that i do not considered?

 

The test the array data type can run:

mock data:

!image-2022-07-04-16-21-28-198.png!

result:

!image-2022-07-04-16-20-08-661.png|width=626,height=146!


> Whether to consider adding other data type to support for last_value function
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-28375
>                 URL: https://issues.apache.org/jira/browse/FLINK-28375
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / API
>            Reporter: hehuiyuan
>            Priority: Minor
>         Attachments: image-2022-07-04-16-20-08-661.png, 
> image-2022-07-04-16-21-28-198.png
>
>
>  
> {code:java}
> CREATE TABLE kafkaTableSource (
>  keyField INTEGER,
>  timestampField INTEGER,
>  arrayField ARRAY<String>,
>  ws as TO_TIMESTAMP(FROM_UNIXTIME(timestampField)),
>  WATERMARK FOR ws AS ws
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'hehuiyuan1',
>     'scan.startup.mode' = 'latest-offset',
>     'properties.bootstrap.servers' = 'localhost:9092',
>     'properties.client.id' = 'test-consumer-group',
>     'properties.group.id' = 'test-consumer-group',
>     'format' = 'json'
> );
> CREATE TABLE kafkaTableSink(
>     keyField INTEGER,
>     timestampField INTEGER,
>     arrayLastValue ARRAY<String>
> )
> WITH (
>     'connector' = 'print'
> );
> insert into kafkaTableSink
> select keyField,timestampField, last_value(arrayField) over (partition by 
> keyField order by ws) from kafkaTableSource;
> {code}
> {color:#FF0000}Exception in thread "main" 
> org.apache.flink.table.api.TableException: LAST_VALUE aggregate function does 
> not support type: ''ARRAY''.{color}
> {color:#FF0000}Please re-check the data type.{color}
>  
> I have a  modification to support this, but why does the community not 
> support it? 
> Is there any special reason that i do not considered?
>  
> The test the array data type can run:
> mock data:
> !image-2022-07-04-16-21-28-198.png!
> result:
> !image-2022-07-04-16-20-08-661.png|width=626,height=146!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to