Thank you for the explanation and creating the JIRA issue. Appreciate your help.
Regards Aniket Sule From: Shammon FY <zjur...@gmail.com> Sent: Thursday, May 18, 2023 10:28 PM To: Aniket Sule <aniket.s...@netwitness.com> Cc: user@flink.apache.org Subject: Re: Getting exception when writing to parquet file with generic types disabled You don't often get email from zjur...@gmail.com<mailto:zjur...@gmail.com>. Learn why this is important<https://aka.ms/LearnAboutSenderIdentification> CAUTION:External email. Do not click or open attachments unless you know and trust the sender. Hi Aniket, Currently the filesystem connector does not support option 'pipeline.generic-types'='false', because the connector will output `PartitionCommitInfo` messages for the downstream partition committer operator even when there are no partitions in the sink table. There is a `List<String> partitions` field in `PartitionCommitInfo` which will cause the exception you mentioned in the thread. I have created an issue [1] for this. [1] https://issues.apache.org/jira/browse/FLINK-32129 Best, Shammon FY On Thu, May 18, 2023 at 9:20 PM Aniket Sule <aniket.s...@netwitness.com<mailto:aniket.s...@netwitness.com>> wrote: Hi, I am trying to write data to parquet files using SQL insert statements. Generic types are disabled in the execution environment. There are other queries running in the same job that are counting/aggregating data. Generic types are disabled as a performance optimization for those queries. In this scenario, whenever I try to insert data into parquet files, I get an exception - Caused by: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type. I get the above exception even when I test with a simple table that has no array or list data types. Is there any way to write parquet files with generic types disabled? Thanks and regards, Aniket Sule. Here is a way to reproduce what I am seeing. My actual source is Kafka with data that is in json format. Datagen is simply to quickly reproduce the scenario. The environment is Flink 1.17.0. I am using the SQL cli. set 'sql-client.verbose'='true'; set 'table.exec.source.idle-timeout'='1000'; set 'table.optimizer.join-reorder-enabled'='true'; set 'table.exec.mini-batch.enabled'='true'; set 'table.exec.mini-batch.allow-latency'='5 s'; set 'table.exec.mini-batch.size'='5000'; set 'table.optimizer.agg-phase-strategy'='TWO_PHASE'; set 'table.optimizer.distinct-agg.split.enabled'='true'; set 'table.exec.state.ttl'='360 s'; set 'pipeline.object-reuse'='true'; set 'pipeline.generic-types'='false'; set 'table.exec.deduplicate.mini-batch.compact-changes-enabled'='true'; CREATE TABLE source_t ( order_number BIGINT, order_name string, risk float, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'datagen' ); CREATE TABLE file_t ( order_number BIGINT, order_name string, risk float, `year` string,`month` string,`day` string,`hour` string ) WITH ( 'connector'='filesystem', 'path' = '/tmp/data', 'format'='parquet' ); insert into file_t select order_number,order_name,risk , date_format(order_time,'yyyy') as `year`, date_format(order_time,'MM') as `month`,date_format(order_time,'dd')as `day`,date_format(order_time,'HH') as `hour` from source_t; Resulting exception: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.table.gateway.api.utils.SqlGatewayException: org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to fetchResults. at org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:85) at org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84) at org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52) at org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83) at java.base/java.util.Optional.ifPresent(Optional.java:183) at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208) at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to fetchResults. at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:229) at org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:83) ... 48 more Caused by: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to execute the operation 91796fc6-f257-4093-ab0c-1b4addf11e5b. at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more Caused by: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type. at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342) at org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391) at org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator.translateInternal(AbstractOneInputTransformationTranslator.java:64) at org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:65) at org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:37) at org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:849) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:870) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:828) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph(StreamExecutionEnvironment.java:2276) at org.apache.flink.table.planner.delegation.DefaultExecutor.createPipeline(DefaultExecutor.java:83) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) at org.apache.flink.table.gateway.service.operation.OperationExecutor.callModifyOperations(OperationExecutor.java:515) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:200) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) ... 7 more End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516) at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caution: External email. Do not click or open attachments unless you know and trust the sender. Caution: External email. Do not click or open attachments unless you know and trust the sender.