Hi francois, > I see that CsvTableSource allows to define csv fields. Then, will it check if columns actually exists in the file and throw Exception if not ? Currently, CsvTableSource doesn't support Avro. CsvTableSource uses fieldDelim and rowDelim to parse data. But there is a workaround: read each line from data as a single big column, i.e., the source table only has one column. Afterward, you can use udtf[1] to split each line. You can throw away data or throw exceptions in udtf as you wish.
> I want to check if files structure is right before processing them. If you want to skip the whole file when the schema is erroneous. You can write a user defined table source and probably have to write a user defined InputFormat. You can refer to the AvroInputFormat[2] as an example. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions [2] https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java On Fri, Jul 6, 2018 at 11:32 PM, françois lacombe < francois.laco...@dcbrain.com> wrote: > Hi Hequn, > > The Table-API is really great. > I will use and certainly love it to solve the issues I mentioned before > > One subsequent question regarding Table-API : > I've got my csv files and avro schemas that describe them. > As my users can send erroneous files, inconsistent with schemas, I want to > check if files structure is right before processing them. > I see that CsvTableSource allows to define csv fields. Then, will it check > if columns actually exists in the file and throw Exception if not ? > > Or is there any other way in Apache Avro to check if a csv file is > consistent with a given schema? > > Big thank to put on the table-api's way :) > > Best R > > François Lacombe > > > > 2018-07-06 16:53 GMT+02:00 Hequn Cheng <chenghe...@gmail.com>: > >> Hi francois, >> >> If I understand correctly, you can use sql or table-api to solve you >> problem. >> As you want to project part of columns from source, a columnar storage >> like parquet/orc would be efficient. Currently, ORC table source is >> supported in flink, you can find more details here[1]. Also, there are many >> other table sources[2] you can choose. With a TableSource, you can read the >> data and register it as a Table and do table operations through sql[3] or >> table-api[4]. >> >> To make a json string from several columns, you can write a user defined >> function[5]. >> >> I also find a OrcTableSourceITCase[6] which I think may be helpful for >> you. >> >> Best, Hequn >> >> [1] https://ci.apache.org/projects/flink/flink-docs-master/ >> dev/table/sourceSinks.html#orctablesource >> [2] https://ci.apache.org/projects/flink/flink-docs-master/ >> dev/table/sourceSinks.html#table-sources-sinks >> [3] https://ci.apache.org/projects/flink/flink-docs-master/ >> dev/table/sql.html >> [4] https://ci.apache.org/projects/flink/flink-docs-master/ >> dev/table/tableApi.html >> [5] https://ci.apache.org/projects/flink/flink-docs-master/ >> dev/table/udfs.html >> [6] https://github.com/apache/flink/blob/master/flink-connec >> tors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableS >> ourceITCase.java >> >> >> On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe < >> francois.laco...@dcbrain.com> wrote: >> >>> Hi all, >>> >>> I'm a new user to Flink community. This tool sounds great to achieve >>> some data loading of millions-rows files into a pgsql db for a new project. >>> >>> As I read docs and examples, a proper use case of csv loading into pgsql >>> can't be found. >>> The file I want to load isn't following the same structure than the >>> table, I have to delete some columns and make a json string from several >>> others too prior to load to pgsql >>> >>> I plan to use Flink 1.5 Java API and a batch process. >>> Does the DataSet class is able to strip some columns out of the records >>> I load or should I iterate over each record to delete the columns? >>> >>> Same question to make a json string from several columns of the same >>> record? >>> E.g json_column =3D {"field1":col1, "field2":col2...} >>> >>> I work with 20 millions length files and it sounds pretty ineffective to >>> iterate over each records. >>> Can someone tell me if it's possible or if I have to change my mind >>> about this? >>> >>> >>> Thanks in advance, all the best >>> >>> François Lacombe >>> >>> >> >