[ https://issues.apache.org/jira/browse/FLINK-25238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Praneeth Ramesh updated FLINK-25238: ------------------------------------ Description: I have a stream with iceberg table as a source. I have few columns of array types in the table. I try to read using iceberg connector. Flink Version : 1.13.2 Iceberg Flink Version: 0.12.1 I see the error as below. java.lang.ClassCastException: class org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData cannot be cast to class org.apache.flink.table.data.ColumnarArrayData (org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData and org.apache.flink.table.data.ColumnarArrayData are in unnamed module of loader 'app') at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411) at org.apache.iceberg.flink.source.StreamingReaderOperator.processSplits(StreamingReaderOperator.java:155) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:359) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:323) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Thread.java:834) Could be same issue as https://issues.apache.org/jira/browse/FLINK-21247 except it happening for another type. I see that Iceberg use custom types other than the types from org.apache.flink.table.data like org.apache.iceberg.flink.data.FlinkParquetReaders.ReusableArrayData and these types are not handled in org.apache.flink.table.runtime.typeutils.ArrayDataSerializer !Screen Shot 2021-12-09 at 6.58.56 PM.png! Just to try I changed the above code to handle the iceberg type as a binary Array and built it locally and used in my application and that worked. !Screen Shot 2021-12-09 at 7.04.10 PM.png! Not sure if this is already handled in some newer versions. was: I have a stream with iceberg table as a source. I have few columns of array types in the table. I try to read using iceberg connector. Flink Version : 1.13.2 Iceberg Flink Version: 0.12.1 I see the error as below. java.lang.ClassCastException: class org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData cannot be cast to class org.apache.flink.table.data.ColumnarArrayData (org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData and org.apache.flink.table.data.ColumnarArrayData are in unnamed module of loader 'app') at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411) at org.apache.iceberg.flink.source.StreamingReaderOperator.processSplits(StreamingReaderOperator.java:155) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:359) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:323) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Thread.java:834) Could be same issue as https://issues.apache.org/jira/browse/FLINK-21247 except it happening for another type. I see that Iceberg use custom types other than the types from org.apache.flink.table.data like org.apache.iceberg.flink.data.FlinkParquetReaders.ReusableArrayData and these types are not handled in org.apache.flink.table.runtime.typeutils.ArrayDataSerializer !Screen Shot 2021-12-09 at 6.58.56 PM.png! Just to try I changed the above code to handle the iceberg type as a binary Array and built it locally and used in my application and that worked. !Screen Shot 2021-12-09 at 7.04.10 PM.png! Not sure if this is already handled in some newer versions. > flink iceberg source reading array types fail with Cast Exception > ----------------------------------------------------------------- > > Key: FLINK-25238 > URL: https://issues.apache.org/jira/browse/FLINK-25238 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.13.2 > Reporter: Praneeth Ramesh > Priority: Major > Attachments: Screen Shot 2021-12-09 at 6.58.56 PM.png, Screen Shot > 2021-12-09 at 7.04.10 PM.png > > > I have a stream with iceberg table as a source. I have few columns of array > types in the table. > I try to read using iceberg connector. > Flink Version : 1.13.2 > Iceberg Flink Version: 0.12.1 > > I see the error as below. > java.lang.ClassCastException: class > org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData cannot be > cast to class org.apache.flink.table.data.ColumnarArrayData > (org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData and > org.apache.flink.table.data.ColumnarArrayData are in unnamed module of loader > 'app') > at > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90) > at > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411) > at > org.apache.iceberg.flink.source.StreamingReaderOperator.processSplits(StreamingReaderOperator.java:155) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:359) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:323) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.base/java.lang.Thread.run(Thread.java:834) > > Could be same issue as https://issues.apache.org/jira/browse/FLINK-21247 > except it happening for another type. > I see that Iceberg use custom types other than the types from > org.apache.flink.table.data like > org.apache.iceberg.flink.data.FlinkParquetReaders.ReusableArrayData and these > types are not handled in > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer > !Screen Shot 2021-12-09 at 6.58.56 PM.png! > Just to try I changed the above code to handle the iceberg type as a binary > Array and built it locally and used in my application and that worked. > > !Screen Shot 2021-12-09 at 7.04.10 PM.png! > Not sure if this is already handled in some newer versions. -- This message was sent by Atlassian Jira (v8.20.1#820001)