Hi Avi,

thanks for clarifying.

It seems like it's not possible to parse Parquet in Flink without knowing
the schema. What i'd do is to parse the metadata while setting up the job
and then pass it to the input format:

ParquetMetadata parquetMetadata =
MetadataReader.readFooter(inputStream, path, fileSize);FileMetaData
fileMetaData = parquetMetadata.getFileMetaData();MessageType
fileSchema = fileMetaData.getSchema

Quite possibly that's what Spark is doing under hood. If you open a ticket
with a feature request, we will add it in the future.

On Thu, Mar 11, 2021 at 6:26 PM Avi Levi <a...@theneura.com> wrote:

> Hi Arvid,
> assuming that I have A0,B0,C0 parquet files with different schema and a
> common field *ID*, I want to write them to A1,B2,C3 files respectively.
> My problem is that in my code I do not want to know the full schema just by
> filtering using the ID field and writing the unfiltered lines to the
> destination file. each source file should have a matching destination file
> I tried to implement it using the ParquetInputFormat but I need to define
> the schema in advance (MessageType) .
> class ParquetInput(path: Path,  messageType: MessageType) extends 
> ParquetInputFormat[Row](path, messageType){
> I am looking for a way that my code will be agnostic to the schema and
> will only know the "ID" field (just like in spark) e.g *val filtered =
> rawsDF.filter(col("id") != "123")*
> Thanks
> Avi
> On Thu, Mar 11, 2021 at 2:53 PM Arvid Heise <ar...@apache.org> wrote:
>> Hi Avi,
>> I'm not entirely sure I understand the question. Let's say you have
>> source A, B, C all with different schema but all have an id. You could use
>> the ParquetMapInputFormat that provides a map of the records and just use a
>> map-lookup.
>> However, I'm not sure how you want to write these records with different
>> schema into the same parquet file. Maybe, you just want to extract the
>> common fields of A, B, C? Then you can also use Table API and just declare
>> the fields that are common.
>> Or do you have sink A, B, C and actually 3 separate topologies?
>> On Wed, Mar 10, 2021 at 10:50 AM Avi Levi <a...@theneura.com> wrote:
>>> Hi all,
>>> I am trying to filter lines from parquet files, the problem is that they
>>> have different schemas, however the field that I am using to filter
>>> exists in all schemas.
>>> in spark this is quite straight forward :
>>> *val filtered = rawsDF.filter(col("id") != "123")*
>>> I tried to do it in flink by extending the ParquetInputFormat but in
>>> this case I need to schema (message type) and implement Convert method
>>> which I want to avoid since I do not want to convert the line (I want to
>>> write is as is to other parquet file)
>>> Any ideas ?
>>> Cheers
>>> Avi

