[ https://issues.apache.org/jira/browse/FLINK-9813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16545255#comment-16545255 ]
François Lacombe commented on FLINK-9813: ----------------------------------------- Hi Fabien, I am suggesting to build a CsvTableSource (or any other TableSource) from Avro schemas (not Avro data format). The point isn't to support more input format, but to describe all structures with one common "language". Avro schema ([https://avro.apache.org/docs/1.8.1/spec.html#schemas)] define the structure of each record, just like CsvTableSource Builder do when you call .field() method. The schema doesn't specify csv separator or row separator for instance, but only expected columns and their type. That's why it's only about structure but not whole format. Avro schemas are highly versatile and get more and more supported. It's really convenient to write such a structure descriptor whatever the file format. I think Flink can get strong benefit to add support of such schemas by building its sources from it. Here is what I'm currently doing, which may be a bit awkward {{import org.apache.avro.Schema;}} {{import org.apache.flink.table.api.Types;}} {{import org.apache.flink.table.sources.CsvTableSource}} {{public static CsvTableSource getFromSchema(String path, Schema sch) {}} {{ HashMap<Schema.Type, TypeInformation<?>> primitiveTypes = new HashMap<Schema.Type, TypeInformation<?>>();}} {{ primitiveTypes.put(Schema.Type.BOOLEAN, Types.BOOLEAN());}} {{ primitiveTypes.put(Schema.Type.INT, Types.INT());}} {{ primitiveTypes.put(Schema.Type.LONG, Types.LONG());}} {{ primitiveTypes.put(Schema.Type.FLOAT, Types.FLOAT());}} {{ primitiveTypes.put(Schema.Type.DOUBLE, Types.DOUBLE());}} {{ primitiveTypes.put(Schema.Type.BYTES, Types.BYTE());}} {{ primitiveTypes.put(Schema.Type.STRING, Types.STRING());}} {{ }} {{ Builder src_builder = CsvTableSource.builder().path(path);}} {{ }} {{ for (Schema field_nfo : sch.getTypes()){}} {{ src_builder.field(field_nfo.getName(), primitiveTypes.get(field_nfo.getType()));}} {{ }}} {{ }} {{ return src_builder.build();}} {{}}} With FLINK-9814, I would be able to build sources from a schema description and get Exception when the input file doesn't match the format specification All the best > Build xTableSource from Avro schemas > ------------------------------------ > > Key: FLINK-9813 > URL: https://issues.apache.org/jira/browse/FLINK-9813 > Project: Flink > Issue Type: Wish > Components: Table API & SQL > Affects Versions: 1.5.0 > Reporter: François Lacombe > Priority: Trivial > Original Estimate: 48h > Remaining Estimate: 48h > > As Avro provide efficient data schemas formalism, it may be great to be able > to build Flink Tables Sources with such files. > More info about Avro schemas > :[https://avro.apache.org/docs/1.8.1/spec.html#schemas] > For instance, with CsvTableSource : > Parser schemaParser = new Schema.Parser(); > Schema tableSchema = schemaParser.parse("avro.json"); > Builder bld = CsvTableSource.builder().schema(tableSchema); > > This would give me a fully available CsvTableSource with columns defined in > avro.json > It may be possible to do so for every TableSources since avro format is > really common and versatile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)