Hi francois,

> I see that CsvTableSource allows to define csv fields. Then, will it
check if columns actually exists in the file and throw Exception if not ?
Currently, CsvTableSource doesn't support Avro. CsvTableSource
uses fieldDelim and rowDelim to parse data. But there is a workaround: read
each line from data as a single big column, i.e., the source table only has
one column. Afterward, you can use udtf[1] to split each line. You can
throw away data or throw exceptions in udtf as you wish.

>  I want to check if files structure is right before processing them.
If you want to skip the whole file when the schema is erroneous. You can
write a user defined table source and probably have to write a user defined
InputFormat. You can refer to the AvroInputFormat[2] as an example.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions

[2]
https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java

On Fri, Jul 6, 2018 at 11:32 PM, françois lacombe <
francois.laco...@dcbrain.com> wrote:

> Hi Hequn,
>
> The Table-API is really great.
> I will use and certainly love it to solve the issues I mentioned before
>
> One subsequent question regarding Table-API :
> I've got my csv files and avro schemas that describe them.
> As my users can send erroneous files, inconsistent with schemas, I want to
> check if files structure is right before processing them.
> I see that CsvTableSource allows to define csv fields. Then, will it check
> if columns actually exists in the file and throw Exception if not ?
>
> Or is there any other way in Apache Avro to check if a csv file is
> consistent with a given schema?
>
> Big thank to put on the table-api's way :)
>
> Best R
>
> François Lacombe
>
>
>
> 2018-07-06 16:53 GMT+02:00 Hequn Cheng <chenghe...@gmail.com>:
>
>> Hi francois,
>>
>> If I understand correctly, you can use sql or table-api to solve you
>> problem.
>> As you want to project part of columns from source, a columnar storage
>> like parquet/orc would be efficient. Currently, ORC table source is
>> supported in flink, you can find more details here[1]. Also, there are many
>> other table sources[2] you can choose. With a TableSource, you can read the
>> data and register it as a Table and do table operations through sql[3] or
>> table-api[4].
>>
>> To make a json string from several columns, you can write a user defined
>> function[5].
>>
>> I also find a OrcTableSourceITCase[6] which I think may be helpful for
>> you.
>>
>> Best, Hequn
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/sourceSinks.html#orctablesource
>> [2] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/sourceSinks.html#table-sources-sinks
>> [3] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/sql.html
>> [4] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/tableApi.html
>> [5] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/udfs.html
>> [6] https://github.com/apache/flink/blob/master/flink-connec
>> tors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableS
>> ourceITCase.java
>>
>>
>> On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <
>> francois.laco...@dcbrain.com> wrote:
>>
>>> Hi all,
>>>
>>> I'm a new user to Flink community. This tool sounds great to achieve
>>> some data loading of millions-rows files into a pgsql db for a new project.
>>>
>>> As I read docs and examples, a proper use case of csv loading into pgsql
>>> can't be found.
>>> The file I want to load isn't following the same structure than the
>>> table, I have to delete some columns and make a json string from several
>>> others too prior to load to pgsql
>>>
>>> I plan to use Flink 1.5 Java API and a batch process.
>>> Does the DataSet class is able to strip some columns out of the records
>>> I load or should I iterate over each record to delete the columns?
>>>
>>> Same question to make a json string from several columns of the same
>>> record?
>>> E.g json_column =3D {"field1":col1, "field2":col2...}
>>>
>>> I work with 20 millions length files and it sounds pretty ineffective to
>>> iterate over each records.
>>> Can someone tell me if it's possible or if I have to change my mind
>>> about this?
>>>
>>>
>>> Thanks in advance, all the best
>>>
>>> François Lacombe
>>>
>>>
>>
>

Reply via email to