No, you would do: CREATE TABLE table1( `text` VARCHAR, -- each CSV row is just a single text column `path` VARCHAR METADATA VIRTUAL, -- where path is a name declared by the filesystem source `size` VARCHAR METADATA VIRTUAL -- where size is a name declared by the filesystem source ) WITH ( 'connector' = 'filesystem`, 'path' = 'file://Users/ecerulm/mycsvfiles/', 'format' = 'csv' );
On 30/10/2020 14:25, Ruben Laguna wrote: > I've written [FLINK-19903][1]. > > I just read [FLIP-107][2] and [FLINK-15869][3] and I need to ask.... > > So assuming that FLIP-107 / FLINK-15869 is implemented and > Filesystem SQL connector modified to expose metadata (including path, > and possible other stuff) , then to use it I would need to write: > > CREATE TABLE table1( > `text` VARCHAR, -- each CSV row is just a single text column > ) WITH ( > 'connector' = 'filesystem`, > 'path' = 'file://Users/ecerulm/mycsvfiles/', > 'format' = 'csv', > 'include.metadata' = 'path,size' -- tell filesystem connector to > add 2 extra columns called `flink-filesystem-metadata.path` and > `flink-filesystem-metadata.size` > ); > > Is that right? > > > > > > [1]: https://issues.apache.org/jira/browse/FLINK-19903 > [2]: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors > [3]: https://issues.apache.org/jira/browse/FLINK-15869 > > > On Fri, Oct 30, 2020 at 1:29 PM Ruben Laguna <ruben.lag...@gmail.com > <mailto:ruben.lag...@gmail.com>> wrote: > > Sure, I’ll write the JIRA issue > > On Fri, 30 Oct 2020 at 13:27, Dawid Wysakowicz > <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote: > > I am afraid there is no such functionality available yet. > > I think though it is a valid request. I think we can use the > upcoming > FLIP-107 metadata columns for this purpose and expose the file > name as > metadata column of a filesystem source. > > Would you like to create a JIRA issue for it? > > Best, > > Dawid > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors > > On 30/10/2020 13:21, Ruben Laguna wrote: > > I've asked this already on [stackoverflow][1] > > > > Is there anything equivalent to Spark's > `f.input_file_name()` ? I > > don't see anything that could be used in [system functions][2] > > > > I have a dataset where they embedded some information in the > filenames > > (200k files) and I need to extract that as a new column. > > > > In Spark I could ` > > > > .withColumn("id",f.split(f.reverse(f.split(f.input_file_name(),'/'))[0],'\.')[0])` > > but I don't see how can I do the same with Flink. Is it > possible? > > > > I don't see [any JIRA issue about it either][3]. Is it > something that > > has already been discussed? > > > > > > [1]: > > https://stackoverflow.com/questions/64607839/is-there-an-equivalent-to-sparks-f-input-file-name-function-in-apache-flink > > [2]: > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html > > [3]: > > https://issues.apache.org/jira/browse/FLINK-8275?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22filename%22 > > -- > /Rubén > > > > -- > /Rubén
signature.asc
Description: OpenPGP digital signature