[ https://issues.apache.org/jira/browse/FLINK-18862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
YUJIANBO updated FLINK-18862: ----------------------------- Description: 1、环境:flinksql、 版本是1.11.1,perjob模式 2、报错:* org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData* 3、产生的背景: (1)从kafka建一张表: {code:java} CREATE TABLE kafka( x String, y String )with( 'connector' = 'kafka', ...... ) {code} (2)建一张view表: {code:java} CREATE VIEW view1 AS SELECT x, y, * CAST(COUNT(1) AS VARCHAR) *AS ct FROM kafka GROUP BY x, y {code} (3)然后利用这个view再做一次agg操作: {code:java} select x, LISTAGG(CONCAT_WS('=', y, ct), ',') AS lists FROM view1 GROUP BY x {code} 然后就报错了:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData (4)但是我尝试不做agg操作,没有报错,说明count(1)的结果值是可以转成String。 {code:java} select x, CONCAT_WS('=', y, ct) from test {code} *但是再经过一次agg的操作为什么会报错呢?????????* 4、问题:通过上面的对比说明count(1) 是能够被cast 成string的,可是再经过一次agg的操作怎么就不行了, 请问有什么比较好的办法解决这个问题? 5、稍微详细点的报错: {code:java} *java.lang.ClassCastException: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData* at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) ~[flink-table-blink_2.11-1.11.1.jar:?] at org.apache.flink.table.data.RowData.get(RowData.java:273) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [ad_features_auto-1.0-SNAPSHOT.jar:?] {code} was: 1、环境:flinksql、 版本是1.11.1,perjob模式 2、报错: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData 3、产生的背景: (1)从kafka建一张表: {code:java} CREATE TABLE kafka( x String, y String )with( 'connector' = 'kafka', ...... ) {code} (2)建一张view表: {code:java} CREATE VIEW view1 AS SELECT x, y, CAST(COUNT(1) AS VARCHAR) AS ct FROM kafka GROUP BY x, y {code} (3)然后利用这个view再做一次agg操作: {code:java} select x, LISTAGG(CONCAT_WS('=', y, ct), ',') AS lists FROM view1 GROUP BY x {code} 然后就报错了:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData (4)但是我尝试不做agg操作,没有报错,说明count(1)的结果值是可以转成String。 {code:java} select x, CONCAT_WS('=', y, ct) from test {code} *但是再经过一次agg的操作为什么会报错呢?????????* 4、问题:通过上面的对比说明count(1) 是能够被cast 成string的,可是再经过一次agg的操作怎么就不行了, 请问有什么比较好的办法解决这个问题? 5、稍微详细点的报错: {code:java} java.lang.ClassCastException: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) ~[flink-table-blink_2.11-1.11.1.jar:?] at org.apache.flink.table.data.RowData.get(RowData.java:273) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [ad_features_auto-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [ad_features_auto-1.0-SNAPSHOT.jar:?] {code} > sql 中agg算子的 count(1) 的结果强转成String类型再经过一次agg的操作就不能成功 > --------------------------------------------------- > > Key: FLINK-18862 > URL: https://issues.apache.org/jira/browse/FLINK-18862 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.11.1 > Reporter: YUJIANBO > Priority: Major > Attachments: sql 中agg算子的 count(1) 的结果强转成String类型再经过一次agg的操作就不能成功.txt > > > 1、环境:flinksql、 版本是1.11.1,perjob模式 > 2、报错:* org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast > to org.apache.flink.table.data.StringData* > 3、产生的背景: > (1)从kafka建一张表: > {code:java} > CREATE TABLE kafka( > x String, > y String > )with( > 'connector' = 'kafka', > ...... > ) > {code} > (2)建一张view表: > {code:java} > CREATE VIEW view1 AS > SELECT > x, > y, > * CAST(COUNT(1) AS VARCHAR) *AS ct > FROM kafka > GROUP BY > x, y > {code} > (3)然后利用这个view再做一次agg操作: > {code:java} > select > x, > LISTAGG(CONCAT_WS('=', y, ct), ',') AS lists > FROM view1 > GROUP BY x > {code} > 然后就报错了:org.apache.flink.table.data.binary.BinaryRawValueData cannot be > cast to org.apache.flink.table.data.StringData > > (4)但是我尝试不做agg操作,没有报错,说明count(1)的结果值是可以转成String。 > {code:java} > select > x, > CONCAT_WS('=', y, ct) > from test > {code} > *但是再经过一次agg的操作为什么会报错呢?????????* > 4、问题:通过上面的对比说明count(1) 是能够被cast 成string的,可是再经过一次agg的操作怎么就不行了, > 请问有什么比较好的办法解决这个问题? > 5、稍微详细点的报错: > {code:java} > *java.lang.ClassCastException: > org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to > org.apache.flink.table.data.StringData* > at > org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) > ~[flink-table-blink_2.11-1.11.1.jar:?] > at org.apache.flink.table.data.RowData.get(RowData.java:273) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) > ~[ad_features_auto-1.0-SNAPSHOT.jar:?] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > [ad_features_auto-1.0-SNAPSHOT.jar:?] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > [ad_features_auto-1.0-SNAPSHOT.jar:?] > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)