Cool, thanks!

On Fri, Mar 12, 2021, 13:15 Arvid Heise <ar...@apache.org> wrote:

> Hi Avi,
>
> thanks for clarifying.
>
> It seems like it's not possible to parse Parquet in Flink without knowing
> the schema. What i'd do is to parse the metadata while setting up the job
> and then pass it to the input format:
>
> ParquetMetadata parquetMetadata = MetadataReader.readFooter(inputStream, 
> path, fileSize);FileMetaData fileMetaData = 
> parquetMetadata.getFileMetaData();MessageType fileSchema = 
> fileMetaData.getSchema 
> <https://www.codota.com/code/java/methods/parquet.hadoop.metadata.FileMetaData/getSchema>();
>
> Quite possibly that's what Spark is doing under hood. If you open a ticket
> with a feature request, we will add it in the future.
>
> On Thu, Mar 11, 2021 at 6:26 PM Avi Levi <a...@theneura.com> wrote:
>
>> Hi Arvid,
>> assuming that I have A0,B0,C0 parquet files with different schema and a
>> common field *ID*, I want to write them to A1,B2,C3 files respectively.
>> My problem is that in my code I do not want to know the full schema just by
>> filtering using the ID field and writing the unfiltered lines to the
>> destination file. each source file should have a matching destination file
>> I tried to implement it using the ParquetInputFormat but I need to define
>> the schema in advance (MessageType) .
>>
>> class ParquetInput(path: Path,  messageType: MessageType) extends 
>> ParquetInputFormat[Row](path, messageType){
>>
>> I am looking for a way that my code will be agnostic to the schema and
>> will only know the "ID" field (just like in spark) e.g *val filtered =
>> rawsDF.filter(col("id") != "123")*
>>
>> Thanks
>> Avi
>>
>> On Thu, Mar 11, 2021 at 2:53 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Avi,
>>>
>>> I'm not entirely sure I understand the question. Let's say you have
>>> source A, B, C all with different schema but all have an id. You could use
>>> the ParquetMapInputFormat that provides a map of the records and just use a
>>> map-lookup.
>>>
>>> However, I'm not sure how you want to write these records with different
>>> schema into the same parquet file. Maybe, you just want to extract the
>>> common fields of A, B, C? Then you can also use Table API and just declare
>>> the fields that are common.
>>>
>>> Or do you have sink A, B, C and actually 3 separate topologies?
>>>
>>> On Wed, Mar 10, 2021 at 10:50 AM Avi Levi <a...@theneura.com> wrote:
>>>
>>>> Hi all,
>>>> I am trying to filter lines from parquet files, the problem is that
>>>> they have different schemas, however the field that I am using to filter
>>>> exists in all schemas.
>>>> in spark this is quite straight forward :
>>>>
>>>> *val filtered = rawsDF.filter(col("id") != "123")*
>>>>
>>>> I tried to do it in flink by extending the ParquetInputFormat but in
>>>> this case I need to schema (message type) and implement Convert method
>>>> which I want to avoid since I do not want to convert the line (I want to
>>>> write is as is to other parquet file)
>>>>
>>>> Any ideas ?
>>>>
>>>> Cheers
>>>> Avi
>>>>
>>>>

Reply via email to