Thank you for the explanation and creating the JIRA issue. Appreciate your help.

Regards
Aniket Sule

From: Shammon FY <zjur...@gmail.com>
Sent: Thursday, May 18, 2023 10:28 PM
To: Aniket Sule <aniket.s...@netwitness.com>
Cc: user@flink.apache.org
Subject: Re: Getting exception when writing to parquet file with generic types 
disabled

You don't often get email from zjur...@gmail.com<mailto:zjur...@gmail.com>. 
Learn why this is important<https://aka.ms/LearnAboutSenderIdentification>
CAUTION:External email. Do not click or open attachments unless you know and 
trust the sender.

Hi Aniket,

Currently the filesystem connector does not support option 
'pipeline.generic-types'='false', because the connector will output 
`PartitionCommitInfo` messages for the downstream partition committer operator 
even when there are no partitions in the sink table. There is a `List<String> 
partitions` field in `PartitionCommitInfo` which will cause the exception you 
mentioned in the thread. I have created an issue [1] for this.

[1] https://issues.apache.org/jira/browse/FLINK-32129

Best,
Shammon FY


On Thu, May 18, 2023 at 9:20 PM Aniket Sule 
<aniket.s...@netwitness.com<mailto:aniket.s...@netwitness.com>> wrote:
Hi,
I am trying to write data to parquet files using SQL insert statements. Generic 
types are disabled in the execution environment.
There are other queries running in the same job that are counting/aggregating 
data. Generic types are disabled as a performance optimization for those 
queries.

In this scenario, whenever I try to insert data into parquet files, I get an 
exception -
Caused by: java.lang.UnsupportedOperationException: Generic types have been 
disabled in the ExecutionConfig and type java.util.List is treated as a generic 
type.

I get the above exception even when I test with a simple table that has no 
array or list data types.

Is there any way to write parquet files with generic types disabled?

Thanks and regards,
Aniket Sule.


Here is a way to reproduce what I am seeing.
My actual source is Kafka with data that is in json format.
Datagen is simply to quickly reproduce the scenario.
The environment is Flink 1.17.0.
I am using the SQL cli.

set 'sql-client.verbose'='true';
set 'table.exec.source.idle-timeout'='1000';
set 'table.optimizer.join-reorder-enabled'='true';
set 'table.exec.mini-batch.enabled'='true';
set 'table.exec.mini-batch.allow-latency'='5 s';
set 'table.exec.mini-batch.size'='5000';
set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';
set 'table.optimizer.distinct-agg.split.enabled'='true';
set 'table.exec.state.ttl'='360 s';
set 'pipeline.object-reuse'='true';
set 'pipeline.generic-types'='false';
set 'table.exec.deduplicate.mini-batch.compact-changes-enabled'='true';

CREATE TABLE source_t (
          order_number BIGINT,
          order_name string,
          risk float,
          order_time   TIMESTAMP(3)
      ) WITH (
        'connector' = 'datagen'
      );

CREATE TABLE file_t (
          order_number BIGINT,
          order_name string,
          risk float,
          `year` string,`month` string,`day` string,`hour` string
      ) WITH (
        'connector'='filesystem',
'path' = '/tmp/data',
'format'='parquet'
      );

insert into file_t
select order_number,order_name,risk ,
date_format(order_time,'yyyy') as `year`, date_format(order_time,'MM') as 
`month`,date_format(order_time,'dd')as `day`,date_format(order_time,'HH') as 
`hour`
from source_t;

Resulting exception:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., <Exception on server side:
org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to 
fetchResults.
                at 
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:85)
                at 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
                at 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
                at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
                at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
                at java.base/java.util.Optional.ifPresent(Optional.java:183)
                at 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
                at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
                at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
                at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
                at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
                at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
                at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
                at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)
                at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
                at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336)
                at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
                at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
                at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
                at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
                at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed 
to fetchResults.
                at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:229)
                at 
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:83)
                ... 48 more
Caused by: org.apache.flink.table.gateway.service.utils.SqlExecutionException: 
Failed to execute the operation 91796fc6-f257-4093-ab0c-1b4addf11e5b.
                at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414)
                at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267)
                at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
                at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
                at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                ... 1 more
Caused by: java.lang.UnsupportedOperationException: Generic types have been 
disabled in the ExecutionConfig and type java.util.List is treated as a generic 
type.
                at 
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
                at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350)
                at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342)
                at 
org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037)
                at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419)
                at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391)
                at 
org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator.translateInternal(AbstractOneInputTransformationTranslator.java:64)
                at 
org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:65)
                at 
org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:37)
                at 
org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
                at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:849)
                at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
                at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:870)
                at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:828)
                at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
                at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319)
                at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph(StreamExecutionEnvironment.java:2276)
                at 
org.apache.flink.table.planner.delegation.DefaultExecutor.createPipeline(DefaultExecutor.java:83)
                at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918)
                at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883)
                at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callModifyOperations(OperationExecutor.java:515)
                at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431)
                at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:200)
                at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
                at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
                at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
                ... 7 more

End of exception on server side>]
                at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536)
                at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516)
                at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
                at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
                at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                at java.base/java.lang.Thread.run(Thread.java:829)
Caution: External email. Do not click or open attachments unless you know and 
trust the sender.
Caution: External email. Do not click or open attachments unless you know and 
trust the sender.

Reply via email to