To make sure we are on the same page. The end goal is to have the
CatalogTable#getTableSchema/TableSource#getTableSchema return a schema that is compatible with TableSource#getProducedDataType. If you want to use the new types, you should not implement the TableSource#getReturnType. Moreover you should also not use any Flink utilities that convert from TypeInformation to DataTypes as those produce legacy types. I am aware there is a lot of corner cases and we worked hard to improve the situation with the new sources and sinks interfaces. Below I add an example how you could pass different array types: |StreamExecutionEnvironment exec = StreamExecutionEnvironment.getExecutionEnvironment();|| ||StreamTableEnvironment tEnv = StreamTableEnvironment.create(exec);|| ||tEnv.registerTableSource(|| || "T",|| || new StreamTableSource<Row>() {|| || @Override|| || public TableSchema getTableSchema() {|| || return TableSchema.builder()|| || .field("f0", DataTypes.ARRAY(DataTypes.BIGINT().notNull()))|| || .field("f1", DataTypes.ARRAY(DataTypes.BIGINT()))|| || .field("f2", DataTypes.ARRAY(DataTypes.STRING()))|| || .build();|| || }||| | @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { return execEnv.fromCollection( Arrays.asList(Row.of(new long[]{1}, new Long[]{new Long(1)}, new String[]{"ABCDE"})), // this is necessary for STRING array, cause otherwise DataStream produces a different // TypeInformation than the planner expects (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType()) ); }|| || || @Override|| || public DataType getProducedDataType() {|| || return DataTypes.ROW(|| || DataTypes.FIELD(|| || "f0",|| || DataTypes.ARRAY(DataTypes.BIGINT().notNull().bridgedTo(long.class))|| || .bridgedTo(long[].class)),|| || DataTypes.FIELD(|| || "f1",|| || DataTypes.ARRAY(DataTypes.BIGINT())),|| || DataTypes.FIELD(|| || "f2",|| || DataTypes.ARRAY(DataTypes.STRING()))|| || );|| || }|| || });|| || ||Table table = tEnv.sqlQuery("SELECT f0, f1, f2 FROM T");|| ||DataStream<Row> result = tEnv.toAppendStream(|| || table,|| || Types.ROW(|| || Types.PRIMITIVE_ARRAY(Types.LONG),|| || ObjectArrayTypeInfo.getInfoFor(Types.LONG),|| || ObjectArrayTypeInfo.getInfoFor(Types.STRING)));|| ||result.print();|| ||env.execute();| Hope this will help and that it will be much easier in Flink 1.11 Best, Dawid On 05/06/2020 13:33, Ramana Uppala wrote: > Hi Dawid, > > We are using a custom connector that is very similar to Flink Kafka > Connector and instantiating TableSchema using a custom class which > maps Avro types to Flink's DataTypes using TableSchema.Builder. > > For Array type, we have below mapping: > > case ARRAY: > return > DataTypes.ARRAY(toFlinkType(schema.getElementType())); > > > We are using Hive Catalog and creating tables > using CatalogTableImpl with TableSchema. > > As you mentioned, if we create TableSchema with legacy types, our > connectors works without any issues. But, we want to use the new Flink > DataTypes API but having issues. > > Also, one more observation is if we use legacy types in TableSource > creation, application not working using Blink Planner. We are getting > the same error physical type not matching. > > Looking forward to the 1.11 changes. > > > On Fri, Jun 5, 2020 at 3:34 AM Dawid Wysakowicz > <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote: > > Hi Ramana, > > What connector do you use or how do you instantiate the TableSource? > Also which catalog do you use and how do you register your table > in that > catalog? > > The problem is that conversion from TypeInformation to DataType > produces > legacy types (because they cannot be mapped exactyl 1-1 to the new > types). > > If you can change the code of the TableSource you can return in the > TableSource#getProducedType the tableSchema.toRowDataType, where the > tableSchema is the schema coming from catalog. Or you can make > sure that > the catalog table produces the legacy type: > > TableSchema.field("field", Types.OBJECT_ARRAY(Types.STRING)); > > In 1.11 we will introduce new sources and formats already working > entirely with the new type system > (AvroRowDataDeserializationSchema and > KafkaDynamicTable). > > Hope this helps a bit. > > Best, > > Dawid > > On 04/06/2020 13:43, Ramana Uppala wrote: > > Hi, > > Avro schema contains Array<String> type and we created > TableSchema out of the AvroSchema and created a table in catalog. > In the catalog, this specific filed type shown as > ARRAY<VARCHAR(2147483647)>. We are using > AvroRowDeserializationSchema with the connector and returnType of > TableSource showing Array<String> mapped to LEGACY('ARRAY', > 'ANY<[Ljava.lang.String;, by AvroSchemaConverter > > > > when we are running the application, planner validating physical > types and logical types and we are getting below error. > > > > of table field 'XYZ' does not match with the physical type ROW< > > > > Any suggestions on how to resolve this ? is this a bug ? > > ------------------------------------------------------------------------ > > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The > information transmitted herewith is intended only for use by the > individual or entity to which it is addressed. If the reader of this > message is not the intended recipient, you are hereby notified that > any review, retransmission, dissemination, distribution, copying or > other use of, or taking of any action in reliance upon this > information is strictly prohibited. If you have received this > communication in error, please contact the sender and delete the > material from your computer. > >
signature.asc
Description: OpenPGP digital signature