I've written [FLINK-19903][1].

I just read [FLIP-107][2] and [FLINK-15869][3] and I need to ask....

So assuming that  FLIP-107 / FLINK-15869  is implemented and Filesystem SQL
connector modified to expose metadata (including path, and possible other
stuff) , then  to use it I would need to write:

    CREATE TABLE table1(
      `text` VARCHAR,  -- each CSV row is just a single text column
    ) WITH (
      'connector' = 'filesystem`,
      'path' = 'file://Users/ecerulm/mycsvfiles/',
      'format' = 'csv',
      'include.metadata' = 'path,size' -- tell filesystem connector to add
2 extra columns called `flink-filesystem-metadata.path` and
`flink-filesystem-metadata.size`
    );

Is that right?





[1]: https://issues.apache.org/jira/browse/FLINK-19903
[2]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
[3]: https://issues.apache.org/jira/browse/FLINK-15869


On Fri, Oct 30, 2020 at 1:29 PM Ruben Laguna <ruben.lag...@gmail.com> wrote:

> Sure, I’ll write the JIRA issue
>
> On Fri, 30 Oct 2020 at 13:27, Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> I am afraid there is no such functionality available yet.
>>
>> I think though it is a valid request. I think we can use the upcoming
>> FLIP-107 metadata columns for this purpose and expose the file name as
>> metadata column of a filesystem source.
>>
>> Would you like to create a JIRA issue for it?
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>>
>> On 30/10/2020 13:21, Ruben Laguna wrote:
>> > I've asked this already on [stackoverflow][1]
>> >
>> > Is there anything equivalent to Spark's `f.input_file_name()` ?  I
>> > don't see anything that could be used in [system functions][2]
>> >
>> > I have a dataset where they embedded some information in the filenames
>> > (200k files) and I need to extract that as a new column.
>> >
>> > In Spark I could `
>> >
>> .withColumn("id",f.split(f.reverse(f.split(f.input_file_name(),'/'))[0],'\.')[0])`
>> >  but I don't see how can I do the same with Flink. Is it possible?
>> >
>> > I don't see [any JIRA issue about it either][3]. Is it something that
>> > has already been discussed?
>> >
>> >
>> > [1]:
>> https://stackoverflow.com/questions/64607839/is-there-an-equivalent-to-sparks-f-input-file-name-function-in-apache-flink
>> > [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
>> > [3]:
>> https://issues.apache.org/jira/browse/FLINK-8275?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22filename%22
>>
>> --
> /Rubén
>


-- 
/Rubén

Reply via email to