I've written [FLINK-19903][1]. I just read [FLIP-107][2] and [FLINK-15869][3] and I need to ask....
So assuming that FLIP-107 / FLINK-15869 is implemented and Filesystem SQL connector modified to expose metadata (including path, and possible other stuff) , then to use it I would need to write: CREATE TABLE table1( `text` VARCHAR, -- each CSV row is just a single text column ) WITH ( 'connector' = 'filesystem`, 'path' = 'file://Users/ecerulm/mycsvfiles/', 'format' = 'csv', 'include.metadata' = 'path,size' -- tell filesystem connector to add 2 extra columns called `flink-filesystem-metadata.path` and `flink-filesystem-metadata.size` ); Is that right? [1]: https://issues.apache.org/jira/browse/FLINK-19903 [2]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors [3]: https://issues.apache.org/jira/browse/FLINK-15869 On Fri, Oct 30, 2020 at 1:29 PM Ruben Laguna <ruben.lag...@gmail.com> wrote: > Sure, I’ll write the JIRA issue > > On Fri, 30 Oct 2020 at 13:27, Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> I am afraid there is no such functionality available yet. >> >> I think though it is a valid request. I think we can use the upcoming >> FLIP-107 metadata columns for this purpose and expose the file name as >> metadata column of a filesystem source. >> >> Would you like to create a JIRA issue for it? >> >> Best, >> >> Dawid >> >> [1] >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors >> >> On 30/10/2020 13:21, Ruben Laguna wrote: >> > I've asked this already on [stackoverflow][1] >> > >> > Is there anything equivalent to Spark's `f.input_file_name()` ? I >> > don't see anything that could be used in [system functions][2] >> > >> > I have a dataset where they embedded some information in the filenames >> > (200k files) and I need to extract that as a new column. >> > >> > In Spark I could ` >> > >> .withColumn("id",f.split(f.reverse(f.split(f.input_file_name(),'/'))[0],'\.')[0])` >> > but I don't see how can I do the same with Flink. Is it possible? >> > >> > I don't see [any JIRA issue about it either][3]. Is it something that >> > has already been discussed? >> > >> > >> > [1]: >> https://stackoverflow.com/questions/64607839/is-there-an-equivalent-to-sparks-f-input-file-name-function-in-apache-flink >> > [2]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html >> > [3]: >> https://issues.apache.org/jira/browse/FLINK-8275?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22filename%22 >> >> -- > /Rubén > -- /Rubén