[ 
https://issues.apache.org/jira/browse/FLINK-18862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YUJIANBO updated FLINK-18862:
-----------------------------
    Description: 
1、环境:flinksql、 版本是1.11.1,perjob模式
2、报错: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to 
org.apache.flink.table.data.StringData

3、产生的背景:
(1)从kafka建一张表:
{code:java}
    CREATE TABLE kafka(
        x String,
        y String
    )with(
       'connector' = 'kafka',
        ......
    )
{code}


(2)建一张view表:
{code:java}
   CREATE VIEW view1 AS
   SELECT 
       x, 
       y, 
       CAST(COUNT(1) AS VARCHAR) AS ct
   FROM kafka
   GROUP BY 
       x, y
{code}


(3)然后利用这个view再做一次agg操作:
{code:java}
    select 
         x, 
         LISTAGG(CONCAT_WS('=', y, ct), ',') AS lists
    FROM view1
    GROUP BY x
{code}
   然后就报错了:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast 
to org.apache.flink.table.data.StringData

   
(4)但是我尝试不做agg操作,没有报错,说明count(1)的结果值是可以转成String。
{code:java}
    select 
            x, 
            CONCAT_WS('=', y, ct)
    from test
{code}
*但是再经过一次agg的操作为什么会报错呢?????????*

4、问题:通过上面的对比说明count(1) 是能够被cast 成string的,可是再经过一次agg的操作怎么就不行了,
         请问有什么比较好的办法解决这个问题?


5、稍微详细点的报错:
{code:java}
java.lang.ClassCastException: 
org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to 
org.apache.flink.table.data.StringData
        at 
org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) 
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) 
~[flink-table-blink_2.11-1.11.1.jar:?]
        at org.apache.flink.table.data.RowData.get(RowData.java:273) 
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) 
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[ad_features_auto-1.0-SNAPSHOT.jar:?]
{code}


  was:
1、环境:flinksql、 版本是1.11.1,perjob模式
2、报错: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to 
org.apache.flink.table.data.StringData

3、产生的背景:
(1)从kafka建一张表:
{code:java}
    CREATE TABLE kafka(
        x String,
        y String
    )with(
       'connector' = 'kafka',
        ......
    )
{code}


(2)建一张view表:
{code:java}
   CREATE VIEW view1 AS
   SELECT 
       x, 
       y, 
       CAST(COUNT(1) AS VARCHAR) AS ct
   FROM kafka
   GROUP BY 
       x, y
{code}


(3)然后利用这个view再做一次agg操作:
{code:java}
    select 
         x, 
         LISTAGG(CONCAT_WS('=', y, ct), ',') AS lists
    FROM view1
    GROUP BY x
{code}
   然后就报错了:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast 
to org.apache.flink.table.data.StringData

   
(4)但是我尝试不做agg操作,直接将count(1)的结果值转成String是可以的:
{code:java}
    select 
            x, 
            CONCAT_WS('=', y, ct)
    from test
{code}


4、问题:通过上面的对比说明count(1) 是能够被cast 成string的,可是再经过一次agg的操作怎么就不行了,
         请问有什么比较好的办法解决这个问题?


5、稍微详细点的报错:
{code:java}
java.lang.ClassCastException: 
org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to 
org.apache.flink.table.data.StringData
        at 
org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) 
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) 
~[flink-table-blink_2.11-1.11.1.jar:?]
        at org.apache.flink.table.data.RowData.get(RowData.java:273) 
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
 ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) 
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[ad_features_auto-1.0-SNAPSHOT.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[ad_features_auto-1.0-SNAPSHOT.jar:?]
{code}



> sql 中agg算子的 count(1) 的结果强转成String类型再经过一次agg的操作就不能成功
> ---------------------------------------------------
>
>                 Key: FLINK-18862
>                 URL: https://issues.apache.org/jira/browse/FLINK-18862
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.11.1
>            Reporter: YUJIANBO
>            Priority: Major
>         Attachments: sql 中agg算子的 count(1) 的结果强转成String类型再经过一次agg的操作就不能成功.txt
>
>
> 1、环境:flinksql、 版本是1.11.1,perjob模式
> 2、报错: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to 
> org.apache.flink.table.data.StringData
> 3、产生的背景:
> (1)从kafka建一张表:
> {code:java}
>     CREATE TABLE kafka(
>         x String,
>         y String
>     )with(
>        'connector' = 'kafka',
>         ......
>     )
> {code}
> (2)建一张view表:
> {code:java}
>    CREATE VIEW view1 AS
>    SELECT 
>        x, 
>        y, 
>        CAST(COUNT(1) AS VARCHAR) AS ct
>    FROM kafka
>    GROUP BY 
>        x, y
> {code}
> (3)然后利用这个view再做一次agg操作:
> {code:java}
>     select 
>          x, 
>          LISTAGG(CONCAT_WS('=', y, ct), ',') AS lists
>     FROM view1
>     GROUP BY x
> {code}
>    然后就报错了:org.apache.flink.table.data.binary.BinaryRawValueData cannot be 
> cast to org.apache.flink.table.data.StringData
>    
> (4)但是我尝试不做agg操作,没有报错,说明count(1)的结果值是可以转成String。
> {code:java}
>     select 
>           x, 
>           CONCAT_WS('=', y, ct)
>     from test
> {code}
> *但是再经过一次agg的操作为什么会报错呢?????????*
> 4、问题:通过上面的对比说明count(1) 是能够被cast 成string的,可是再经过一次agg的操作怎么就不行了,
>          请问有什么比较好的办法解决这个问题?
> 5、稍微详细点的报错:
> {code:java}
> java.lang.ClassCastException: 
> org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to 
> org.apache.flink.table.data.StringData
>       at 
> org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) 
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) 
> ~[flink-table-blink_2.11-1.11.1.jar:?]
>       at org.apache.flink.table.data.RowData.get(RowData.java:273) 
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>  ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>       at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>  ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> [ad_features_auto-1.0-SNAPSHOT.jar:?]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> [ad_features_auto-1.0-SNAPSHOT.jar:?]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to