Cool, thanks! On Fri, Mar 12, 2021, 13:15 Arvid Heise <ar...@apache.org> wrote:
> Hi Avi, > > thanks for clarifying. > > It seems like it's not possible to parse Parquet in Flink without knowing > the schema. What i'd do is to parse the metadata while setting up the job > and then pass it to the input format: > > ParquetMetadata parquetMetadata = MetadataReader.readFooter(inputStream, > path, fileSize);FileMetaData fileMetaData = > parquetMetadata.getFileMetaData();MessageType fileSchema = > fileMetaData.getSchema > <https://www.codota.com/code/java/methods/parquet.hadoop.metadata.FileMetaData/getSchema>(); > > Quite possibly that's what Spark is doing under hood. If you open a ticket > with a feature request, we will add it in the future. > > On Thu, Mar 11, 2021 at 6:26 PM Avi Levi <a...@theneura.com> wrote: > >> Hi Arvid, >> assuming that I have A0,B0,C0 parquet files with different schema and a >> common field *ID*, I want to write them to A1,B2,C3 files respectively. >> My problem is that in my code I do not want to know the full schema just by >> filtering using the ID field and writing the unfiltered lines to the >> destination file. each source file should have a matching destination file >> I tried to implement it using the ParquetInputFormat but I need to define >> the schema in advance (MessageType) . >> >> class ParquetInput(path: Path, messageType: MessageType) extends >> ParquetInputFormat[Row](path, messageType){ >> >> I am looking for a way that my code will be agnostic to the schema and >> will only know the "ID" field (just like in spark) e.g *val filtered = >> rawsDF.filter(col("id") != "123")* >> >> Thanks >> Avi >> >> On Thu, Mar 11, 2021 at 2:53 PM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Avi, >>> >>> I'm not entirely sure I understand the question. Let's say you have >>> source A, B, C all with different schema but all have an id. You could use >>> the ParquetMapInputFormat that provides a map of the records and just use a >>> map-lookup. >>> >>> However, I'm not sure how you want to write these records with different >>> schema into the same parquet file. Maybe, you just want to extract the >>> common fields of A, B, C? Then you can also use Table API and just declare >>> the fields that are common. >>> >>> Or do you have sink A, B, C and actually 3 separate topologies? >>> >>> On Wed, Mar 10, 2021 at 10:50 AM Avi Levi <a...@theneura.com> wrote: >>> >>>> Hi all, >>>> I am trying to filter lines from parquet files, the problem is that >>>> they have different schemas, however the field that I am using to filter >>>> exists in all schemas. >>>> in spark this is quite straight forward : >>>> >>>> *val filtered = rawsDF.filter(col("id") != "123")* >>>> >>>> I tried to do it in flink by extending the ParquetInputFormat but in >>>> this case I need to schema (message type) and implement Convert method >>>> which I want to avoid since I do not want to convert the line (I want to >>>> write is as is to other parquet file) >>>> >>>> Any ideas ? >>>> >>>> Cheers >>>> Avi >>>> >>>>