[ https://issues.apache.org/jira/browse/FLINK-29729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser updated FLINK-29729: ----------------------------------- Fix Version/s: (was: 1.17.0) > Support including the configured properties from flink-conf.yaml during > create ParquetReader > -------------------------------------------------------------------------------------------- > > Key: FLINK-29729 > URL: https://issues.apache.org/jira/browse/FLINK-29729 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Reporter: Rascal Wu > Priority: Major > Labels: pull-request-available > Attachments: image-2022-10-22-17-41-38-084.png > > > Hi, I'm thinking if we can include the configured properties from > flink-conf.yaml during create ParquetReader in `ParquetVectorizedInputformat` > besides hadoop configuration. > > I meet a use case that I want to query a table from S3 bucket with parquet > format via filesystem connector, and I configured the AWS credential info in > the `flink-conf.yaml`, e.g. fs.s3a.access.key, fs.s3a.secret.key, etc. > > The JobManager(SourceCoordinator) works well about "getFileStatus" of S3 > objects and generate splits, but TaskManager(SourceOperator -> > ParquetVectorizedInputFormat -> ParquetReader) doesn't work since missing AWS > credential info. > > After taking a deep analysis at the source code about creating ParquetReader > to reader footer, I found that the AWS credential info is not passed during > create & initialize S3AFileSystem, the detail info as showing in the bellow > snapshot. !image-2022-10-22-17-41-38-084.png! > > The `hadoopConfig` only contains the properties from table format options and > default hadoop properties from core-site.xml, hdfs-site.xml and etc. Because > the `hadoopConfig` is injected by > `ParquetFileFormatFactory#createRuntimeDecoder` -> > `ParquetColumnarRowInputFormat.createPartitionedFormat` -> > `ParquetFileFormatFactory.generateParquetConfiguration` > > {code:java} > @Override > public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder( > DynamicTableSource.Context sourceContext, > DataType producedDataType, > int[][] projections) { > return ParquetColumnarRowInputFormat.createPartitionedFormat( > getParquetConfiguration(formatOptions), > (RowType) > Projection.of(projections).project(producedDataType).getLogicalType(), > sourceContext.createTypeInformation(producedDataType), > Collections.emptyList(), > null, > VectorizedColumnBatch.DEFAULT_SIZE, > formatOptions.get(UTC_TIMEZONE), > true); > } > > private static Configuration getParquetConfiguration(ReadableConfig options) { > Configuration conf = new Configuration(); > Properties properties = new Properties(); > ((org.apache.flink.configuration.Configuration) > options).addAllToProperties(properties); > properties.forEach((k, v) -> conf.set(IDENTIFIER + "." + k, v.toString())); > return conf; > } > {code} > > I know that I can add the AWS credential info into core-site.xml or > hdfs-site.xml, so that the `ParquetReader` can get the credential, but I > think it might not a good practice, especially different flink jobs will use > different AWS credential, so I'm thinking if we can combine the default > hadoop configuration(static) and the properties from > `flink-conf.yaml`(dynamic) during create `ParquetReader`. > For example, just like how this PR doing? > https://github.com/apache/flink/pull/21130 > > BTW, I'm using Flink 1.15.1 in a standalone cluster to validate the whole > process, but I think not only 1.15.1 version meet this problem, and not only > access the objects/files from AWS S3 bucket, any other cloud object storage > might also meet this problem. > > Besides change the code, is there any other solution can help me to handle > this problem? thanks. -- This message was sent by Atlassian Jira (v8.20.10#820010)